# Queues

Queues enable sophisticated multi-step data extraction workflows where one endpoint collects data (such as records or identifiers )that is used by subsequent endpoints. This is essential for APIs that require separate calls to fetch related data.

## Queue Architecture

{% @mermaid/diagram content="graph TD
A\[Endpoint 1: List Items] --> B\[Extract IDs]
B --> C\[Queue: item\_ids]
C --> D\[Endpoint 2: Item Details]
D --> E\[Iterate over Queue]
E --> F\[Make Detail Requests]

```
style A fill:#ff8c42,stroke:#ffffff,stroke-width:2px,color:#ffffff
style B fill:#7cb342,stroke:#ffffff,stroke-width:2px,color:#ffffff
style C fill:#5c6bc0,stroke:#ffffff,stroke-width:2px,color:#ffffff
style D fill:#ff8c42,stroke:#ffffff,stroke-width:2px,color:#ffffff
style E fill:#ffd54f,stroke:#ffffff,stroke-width:2px,color:#000000
style F fill:#ab47bc,stroke:#ffffff,stroke-width:2px,color:#ffffff" %}
```

## Queue Declaration and Usage

### 1. Queue Declaration

Queues must be declared at the top level of your API spec:

```yaml
# Declare queues for passing data between endpoints
queues:
  - customer_ids
  - invoice_ids
  - order_items
```

### 2. Sending Data to Queues

Use processors to send data from one endpoint to a queue:

```yaml
endpoints:
  list_customers:
    description: "Get list of customers"
    
    request:
      url: "customers"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Send customer IDs to queue for detailed processing
        - expression: "record.id"
          output: "queue.customer_ids"
```

### 3. Consuming Data from Queues

Use the `iterate` section to process each item from a queue:

```yaml
endpoints:
  customer_details:
    description: "Get detailed customer information"
    
    # Process each customer ID from the queue
    iterate:
      over: "queue.customer_ids"
      into: "state.current_customer_id"
    
    request:
      url: "customers/{state.current_customer_id}"
    
    response:
      records:
        jmespath: "[*]"  # Single customer object
        primary_key: ["id"]
      
      processors:
        # Add the customer ID to each record for reference
        - expression: "state.current_customer_id"
          output: "record.customer_id"
```

## Queue Functions

Queues can be used with built-in functions for advanced processing:

### Chunking Queue Data

Process queue items in batches for more efficient API calls:

```yaml
endpoints:
  batch_customer_details:
    description: "Process customers in batches"
    
    iterate:
      # Process 50 customer IDs at a time
      over: "chunk(queue.customer_ids, 50)"
      into: "state.customer_batch"          # as an array
      concurrency: 3  # Process 3 batches concurrently
    
    request:
      url: "customers/batch"
      method: "POST"
      payload:
        ids: '{join(state.customer_batch, ","}'
    
    response:
      records:
        jmespath: "customers[]"
        primary_key: ["id"]
```

## Real-World Example: Stripe API

This example from the Stripe API demonstrates a complete queue-based workflow:

```yaml
queues:
  - customer_ids
  - invoice_ids

endpoints:
  # Step 1: Collect customer IDs
  customer:
    description: "Retrieve list of customers"
    
    request:
      url: "customers"
    
    pagination:
      next_state:
        starting_after: '{response.records[-1].id}'
      stop_condition: "response.json.has_more == false"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Send each customer ID to the queue
        - expression: "record.id"
          output: "queue.customer_ids"

  # Step 2: Get customer balance transactions
  customer_balance_transaction:
    description: "Retrieve customer balance transactions"
    
    iterate:
      over: "queue.customer_ids"
      into: "state.customer_id"
    
    request:
      url: "customers/{state.customer_id}/balance_transactions"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Add customer_id to each transaction record
        - expression: "state.customer_id"
          output: "record.customer_id"

  # Step 3: Collect invoice IDs while getting invoice data
  invoice:
    description: "Retrieve invoices and queue IDs for line items"
    
    request:
      url: "invoices"
    
    pagination:
      next_state:
        starting_after: '{response.records[-1].id}'
      stop_condition: 'jmespath(response.json, "has_more") == false'
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Queue invoice IDs for line item extraction
        - expression: "record.id"
          output: "queue.invoice_ids"

  # Step 4: Get line items for each invoice
  invoice_line_item:
    description: "Retrieve invoice line items"
    
    iterate:
      over: "queue.invoice_ids"
      into: "state.invoice_id"
    
    request:
      url: "invoices/{state.invoice_id}/lines"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Link line items back to their invoice
        - expression: "state.invoice_id"
          output: "record.invoice_id"
```

## Queue Properties and Behavior

### Queue Characteristics

| Property              | Description                                       | Example                                    |
| --------------------- | ------------------------------------------------- | ------------------------------------------ |
| **Temporary Storage** | Queues are backed by temporary files              | Automatically cleaned up after run         |
| **FIFO Order**        | Items are processed in first-in, first-out order  | IDs processed in the order they were added |
| **JSON Encoding**     | All data is JSON-encoded for safe storage         | Handles strings, numbers, objects, arrays  |
| **Single Run Scope**  | Queues exist only within a single Sling execution | Cannot persist between separate runs       |

### Queue Lifecycle

{% @mermaid/diagram content="sequenceDiagram
participant E1 as Producer Endpoint
participant Q as Queue (temp file)
participant E2 as Consumer Endpoint

```
Note over E1: Start processing records
E1->>Q: Append ID 1
E1->>Q: Append ID 2
E1->>Q: Append ID 3
Note over E1: Finish writing

Note over Q: Switch to read mode
E2->>Q: Read ID 1
E2->>Q: Read ID 2
E2->>Q: Read ID 3
Note over E2: Process each ID

Note over Q: Auto cleanup on completion" %}
```

## Direct Queue-to-Records Pattern

You can pipe queue data **directly to records** without making HTTP requests using the special syntax `iterate.into: "response.records"`.

### Basic Syntax

```yaml
endpoints:
  deduplicate_customers:
    description: "Remove duplicate customer IDs"

    iterate:
      over: "queue.raw_customer_ids"
      into: "response.records"  # Special: Direct to records, no HTTP call

    # No request block needed!

    response:
      records:
        primary_key: ["customer_id"]  # Deduplicate

      processors:
        - expression: "record"
          output: "queue.clean_customer_ids"
```

### When to Use

* **Deduplicate** queue items before further processing
* **Enrich** queue data with state variables
* **Transform** queue structure
* **Export** queue contents as a separate dataset

### Example: Deduplication Workflow

```yaml
queues:
  - raw_customer_ids
  - clean_customer_ids

endpoints:
  # Step 1: Collect IDs (may have duplicates)
  list_orders:
    request:
      url: "{state.base_url}/orders"
    response:
      processors:
        - expression: "record.customer_id"
          output: "queue.raw_customer_ids"

  # Step 2: Deduplicate using direct queue-to-records
  deduplicate_customers:
    iterate:
      over: "queue.raw_customer_ids"
      into: "response.records"
    response:
      records:
        primary_key: ["customer_id"]
      processors:
        - expression: "record"
          output: "queue.clean_customer_ids"

  # Step 3: Use clean IDs
  customer_details:
    iterate:
      over: "queue.clean_customer_ids"
      into: "state.customer_id"
    request:
      url: "{state.base_url}/customers/{state.customer_id}"
```

### Data Type Handling

**Scalar values** (strings/numbers) are wrapped: `"user123"` → `{"value": "user123"}`

```yaml
processors:
  - expression: "record.value"  # Access via .value
    output: "record.user_id"
```

**Object values** are used as-is: `{"id": 1, "name": "Alice"}` → same structure

```yaml
processors:
  - expression: "record.id"  # Access fields directly
    output: "record.user_id"
```

### Limitations

* No HTTP response data (`response.status`, `response.headers` unavailable)
* Cannot use pagination or response rules
* Don't define a `request` block (it will be ignored)

> 💡 **Tip:** This pattern is much faster than unnecessary HTTP requests for queue transformation steps.

## Advanced Queue Patterns

### Pattern 1: Multi-Level Hierarchies

Process nested data structures with multiple queue levels:

```yaml
queues:
  - account_ids
  - customer_ids
  - subscription_ids

endpoints:
  accounts:
    response:
      # Level 1: Get accounts
      processors:
        - expression: "record.id"
          output: "queue.account_ids"

  customers:
    # Level 2: Get customers for each account
    iterate:
      over: "queue.account_ids"
      into: "state.account_id"
    response:
      processors:
        - expression: "record.id"
          output: "queue.customer_ids"

  subscriptions:
    # Level 3: Get subscriptions for each customer
    iterate:
      over: "queue.customer_ids"
      into: "state.customer_id"
    response:
      processors:
        - expression: "record.id"
          output: "queue.subscription_ids"
```

### Pattern 2: Conditional Queue Population

Only queue certain items based on conditions:

```yaml
processors:
  # Only queue active customers
  - expression: >
      if(record.status == "active", record.id, null)
    output: "queue.active_customer_ids"
  
  # Queue high-value customers for special processing
  - expression: >
      if(record.total_spent > 10000, record.id, null)
    output: "queue.vip_customer_ids"
```

### Pattern 3: Queue Transformation

Transform data before queuing:

```yaml
processors:
  # Create composite keys for the queue
  - expression: >
      {
        "customer_id": record.id,
        "type": record.customer_type,
        "priority": if(record.is_vip, "high", "normal")
      }
    output: "queue.enriched_customers"
```

## Queue Best Practices

### Performance Optimization

```yaml
iterate:
  over: "chunk(queue.large_dataset, 100)"  # Process in batches
  into: "state.batch"
  concurrency: 5  # Parallel processing
```

### Error Handling with Queues

```yaml
rules:
  # Continue processing other queue items even if one fails
  - action: "continue"
    condition: "response.status == 404"
    message: "Item not found, skipping"
  
  # Retry transient errors
  - action: "retry"
    condition: "response.status >= 500"
    max_attempts: 3
```

> 💡 **Tip:** Use descriptive queue names that clearly indicate their purpose (e.g., `customer_ids`, `pending_order_ids`, `failed_payment_ids`).

> ⚠️ **Warning:** Queues consume disk space proportional to the number of items. For very large datasets (millions of items), monitor available disk space.

> 📝 **Note:** Queue items are automatically JSON-encoded, so complex objects, arrays, and special characters are handled safely.
