# Queues

Queues enable sophisticated multi-step data extraction workflows where one endpoint collects data (such as records or identifiers )that is used by subsequent endpoints. This is essential for APIs that require separate calls to fetch related data.

## Queue Architecture

```mermaid
graph TD
    A[Endpoint 1: List Items] --> B[Extract IDs]
    B --> C[Queue: item_ids]
    C --> D[Endpoint 2: Item Details]
    D --> E[Iterate over Queue]
    E --> F[Make Detail Requests]

    style A fill:#ff8c42,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style B fill:#7cb342,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style C fill:#5c6bc0,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style D fill:#ff8c42,stroke:#ffffff,stroke-width:2px,color:#ffffff
    style E fill:#ffd54f,stroke:#ffffff,stroke-width:2px,color:#000000
    style F fill:#ab47bc,stroke:#ffffff,stroke-width:2px,color:#ffffff
```

## Queue Usage

Queues are **auto-detected** from your endpoint definitions — Sling discovers them by scanning processor `output:` and `iterate.over:` expressions. You do not need to declare them.

> ⚠️ **Deprecated:** The top-level `queues:` list is no longer required and is ignored at runtime. Old specs that still include it continue to load, but Sling emits a one-time deprecation warning. Remove the field when you next touch the spec.

At load time, Sling enforces that every consumed queue has at least one producer, so typos surface immediately as a `queue(s) with no producer` error rather than at runtime.

### 1. Sending Data to Queues

Use processors to send data from one endpoint to a queue:

```yaml
endpoints:
  list_customers:
    description: "Get list of customers"
    
    request:
      url: "customers"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Send customer IDs to queue for detailed processing
        - expression: "record.id"
          output: "queue.customer_ids"
```

### 2. Consuming Data from Queues

Use the `iterate` section to process each item from a queue:

```yaml
endpoints:
  customer_details:
    description: "Get detailed customer information"
    
    # Process each customer ID from the queue
    iterate:
      over: "queue.customer_ids"
      into: "state.current_customer_id"
    
    request:
      url: "customers/{state.current_customer_id}"
    
    response:
      records:
        jmespath: "[*]"  # Single customer object
        primary_key: ["id"]
      
      processors:
        # Add the customer ID to each record for reference
        - expression: "state.current_customer_id"
          output: "record.customer_id"
```

## Queue-Only Endpoints

Some producer endpoints exist only to fan IDs into a queue for a downstream consumer — they don't produce records anyone wants to read. Mark them with `queue_only: true` and Sling will:

* run the endpoint and drain its records into the queue(s) it populates via processors,
* skip emitting a record stream (no target write, no row count),
* hide the endpoint from `*` wildcard discovery and from `sling conns discover`,
* still schedule the endpoint to run before any consumer that iterates over its queue (dependency order is auto-detected).

The endpoint is still selectable by explicit name when you need to debug it.

### Example: Search → Detail

```yaml
endpoints:
  # Producer: queue_only — no record output, just populates queue.imdb_ids
  search:
    queue_only: true
    request:
      url: "{state.base_url}/"
      method: GET
      parameters:
        apikey: "{env.OMDB_API_KEY}"
        s: "batman"
        type: "movie"
    response:
      records:
        jmespath: "Search"
      processors:
        - expression: "record.imdbID"
          output: "queue.imdb_ids"

  # Consumer: iterates the queue populated by `search`
  details:
    iterate:
      over: "queue.imdb_ids"
      into: "state.imdb_id"
      concurrency: 2
    request:
      url: "{state.base_url}/"
      method: GET
      parameters:
        apikey: "{env.OMDB_API_KEY}"
        i: "{state.imdb_id}"
    response:
      records:
        jmespath: "@"
        primary_key: ["imdbID"]
```

In a replication that targets `*`, Sling will run `search` first (filling `queue.imdb_ids`) and then `details` (writing one row per movie). `search` does not appear in `conns discover` output or in the wildcard endpoint list — list it explicitly by name to inspect it.

> 💡 **Tip:** `queue_only` replaces the older pattern of writing throwaway records from a producer endpoint just to satisfy the "every endpoint must produce a stream" assumption. Combined with auto-detected queues, your producer/consumer wiring stays declared in exactly one place: the endpoints themselves.

## Queue Functions

Queues can be used with built-in functions for advanced processing:

### Chunking Queue Data

Process queue items in batches for more efficient API calls:

```yaml
endpoints:
  batch_customer_details:
    description: "Process customers in batches"
    
    iterate:
      # Process 50 customer IDs at a time
      over: "chunk(queue.customer_ids, 50)"
      into: "state.customer_batch"          # as an array
      concurrency: 3  # Process 3 batches concurrently
    
    request:
      url: "customers/batch"
      method: "POST"
      payload:
        ids: '{join(state.customer_batch, ","}'
    
    response:
      records:
        jmespath: "customers[]"
        primary_key: ["id"]
```

## Real-World Example: Stripe API

This example from the Stripe API demonstrates a complete queue-based workflow:

```yaml
endpoints:
  # Step 1: Collect customer IDs
  customer:
    description: "Retrieve list of customers"
    
    request:
      url: "customers"
    
    pagination:
      next_state:
        starting_after: '{response.records[-1].id}'
      stop_condition: "response.json.has_more == false"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Send each customer ID to the queue
        - expression: "record.id"
          output: "queue.customer_ids"

  # Step 2: Get customer balance transactions
  customer_balance_transaction:
    description: "Retrieve customer balance transactions"
    
    iterate:
      over: "queue.customer_ids"
      into: "state.customer_id"
    
    request:
      url: "customers/{state.customer_id}/balance_transactions"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Add customer_id to each transaction record
        - expression: "state.customer_id"
          output: "record.customer_id"

  # Step 3: Collect invoice IDs while getting invoice data
  invoice:
    description: "Retrieve invoices and queue IDs for line items"
    
    request:
      url: "invoices"
    
    pagination:
      next_state:
        starting_after: '{response.records[-1].id}'
      stop_condition: 'jmespath(response.json, "has_more") == false'
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Queue invoice IDs for line item extraction
        - expression: "record.id"
          output: "queue.invoice_ids"

  # Step 4: Get line items for each invoice
  invoice_line_item:
    description: "Retrieve invoice line items"
    
    iterate:
      over: "queue.invoice_ids"
      into: "state.invoice_id"
    
    request:
      url: "invoices/{state.invoice_id}/lines"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Link line items back to their invoice
        - expression: "state.invoice_id"
          output: "record.invoice_id"
```

## Queue Properties and Behavior

### Queue Characteristics

| Property              | Description                                       | Example                                    |
| --------------------- | ------------------------------------------------- | ------------------------------------------ |
| **Temporary Storage** | Queues are backed by temporary files              | Automatically cleaned up after run         |
| **FIFO Order**        | Items are processed in first-in, first-out order  | IDs processed in the order they were added |
| **JSON Encoding**     | All data is JSON-encoded for safe storage         | Handles strings, numbers, objects, arrays  |
| **Single Run Scope**  | Queues exist only within a single Sling execution | Cannot persist between separate runs       |

### Queue Lifecycle

```mermaid
sequenceDiagram
    participant E1 as Producer Endpoint
    participant Q as Queue (temp file)
    participant E2 as Consumer Endpoint

    Note over E1: Start processing records
    E1->>Q: Append ID 1
    E1->>Q: Append ID 2
    E1->>Q: Append ID 3
    Note over E1: Finish writing

    Note over Q: Switch to read mode
    E2->>Q: Read ID 1
    E2->>Q: Read ID 2
    E2->>Q: Read ID 3
    Note over E2: Process each ID

    Note over Q: Auto cleanup on completion
```

## Direct Queue-to-Records Pattern

You can pipe queue data **directly to records** without making HTTP requests using the special syntax `iterate.into: "response.records"`.

### Basic Syntax

```yaml
endpoints:
  deduplicate_customers:
    description: "Remove duplicate customer IDs"

    iterate:
      over: "queue.raw_customer_ids"
      into: "response.records"  # Special: Direct to records, no HTTP call

    # No request block needed!

    response:
      records:
        primary_key: ["customer_id"]  # Deduplicate

      processors:
        - expression: "record"
          output: "queue.clean_customer_ids"
```

### When to Use

* **Deduplicate** queue items before further processing
* **Enrich** queue data with state variables
* **Transform** queue structure
* **Export** queue contents as a separate dataset

### Example: Deduplication Workflow

```yaml
endpoints:
  # Step 1: Collect IDs (may have duplicates)
  list_orders:
    request:
      url: "{state.base_url}/orders"
    response:
      processors:
        - expression: "record.customer_id"
          output: "queue.raw_customer_ids"

  # Step 2: Deduplicate using direct queue-to-records
  deduplicate_customers:
    iterate:
      over: "queue.raw_customer_ids"
      into: "response.records"
    response:
      records:
        primary_key: ["customer_id"]
      processors:
        - expression: "record"
          output: "queue.clean_customer_ids"

  # Step 3: Use clean IDs
  customer_details:
    iterate:
      over: "queue.clean_customer_ids"
      into: "state.customer_id"
    request:
      url: "{state.base_url}/customers/{state.customer_id}"
```

### Data Type Handling

**Scalar values** (strings/numbers) are wrapped: `"user123"` → `{"value": "user123"}`

```yaml
processors:
  - expression: "record.value"  # Access via .value
    output: "record.user_id"
```

**Object values** are used as-is: `{"id": 1, "name": "Alice"}` → same structure

```yaml
processors:
  - expression: "record.id"  # Access fields directly
    output: "record.user_id"
```

### Limitations

* No HTTP response data (`response.status`, `response.headers` unavailable)
* Cannot use pagination or response rules
* Don't define a `request` block (it will be ignored)

> 💡 **Tip:** This pattern is much faster than unnecessary HTTP requests for queue transformation steps.

## Advanced Queue Patterns

### Pattern 1: Multi-Level Hierarchies

Process nested data structures with multiple queue levels:

```yaml
endpoints:
  accounts:
    response:
      # Level 1: Get accounts
      processors:
        - expression: "record.id"
          output: "queue.account_ids"

  customers:
    # Level 2: Get customers for each account
    iterate:
      over: "queue.account_ids"
      into: "state.account_id"
    response:
      processors:
        - expression: "record.id"
          output: "queue.customer_ids"

  subscriptions:
    # Level 3: Get subscriptions for each customer
    iterate:
      over: "queue.customer_ids"
      into: "state.customer_id"
    response:
      processors:
        - expression: "record.id"
          output: "queue.subscription_ids"
```

### Pattern 2: Conditional Queue Population

Only queue certain items based on conditions:

```yaml
processors:
  # Only queue active customers
  - expression: >
      if(record.status == "active", record.id, null)
    output: "queue.active_customer_ids"
  
  # Queue high-value customers for special processing
  - expression: >
      if(record.total_spent > 10000, record.id, null)
    output: "queue.vip_customer_ids"
```

### Pattern 3: Queue Transformation

Transform data before queuing:

```yaml
processors:
  # Create composite keys for the queue
  - expression: >
      {
        "customer_id": record.id,
        "type": record.customer_type,
        "priority": if(record.is_vip, "high", "normal")
      }
    output: "queue.enriched_customers"
```

## Queue Best Practices

### Performance Optimization

```yaml
iterate:
  over: "chunk(queue.large_dataset, 100)"  # Process in batches
  into: "state.batch"
  concurrency: 5  # Parallel processing
```

### Error Handling with Queues

```yaml
rules:
  # Continue processing other queue items even if one fails
  - action: "continue"
    condition: "response.status == 404"
    message: "Item not found, skipping"
  
  # Retry transient errors
  - action: "retry"
    condition: "response.status >= 500"
    max_attempts: 3
```

> 💡 **Tip:** Use descriptive queue names that clearly indicate their purpose (e.g., `customer_ids`, `pending_order_ids`, `failed_payment_ids`).

> ⚠️ **Warning:** Queues consume disk space proportional to the number of items. For very large datasets (millions of items), monitor available disk space.

> 📝 **Note:** Queue items are automatically JSON-encoded, so complex objects, arrays, and special characters are handled safely.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/concepts/api-specs/queues.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
