Queues

Queues enable sophisticated multi-step data extraction workflows where one endpoint collects data (such as records or identifiers )that is used by subsequent endpoints. This is essential for APIs that require separate calls to fetch related data.

Queue Architecture

Queue Declaration and Usage

1. Queue Declaration

Queues must be declared at the top level of your API spec:

# Declare queues for passing data between endpoints
queues:
  - customer_ids
  - invoice_ids
  - order_items

2. Sending Data to Queues

Use processors to send data from one endpoint to a queue:

endpoints:
  list_customers:
    description: "Get list of customers"
    
    request:
      url: "customers"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Send customer IDs to queue for detailed processing
        - expression: "record.id"
          output: "queue.customer_ids"

3. Consuming Data from Queues

Use the iterate section to process each item from a queue:

endpoints:
  customer_details:
    description: "Get detailed customer information"
    
    # Process each customer ID from the queue
    iterate:
      over: "queue.customer_ids"
      into: "state.current_customer_id"
    
    request:
      url: "customers/{state.current_customer_id}"
    
    response:
      records:
        jmespath: "[*]"  # Single customer object
        primary_key: ["id"]
      
      processors:
        # Add the customer ID to each record for reference
        - expression: "state.current_customer_id"
          output: "record.customer_id"

Queue Functions

Queues can be used with built-in functions for advanced processing:

Chunking Queue Data

Process queue items in batches for more efficient API calls:

endpoints:
  batch_customer_details:
    description: "Process customers in batches"
    
    iterate:
      # Process 50 customer IDs at a time
      over: "chunk(queue.customer_ids, 50)"
      into: "state.customer_batch"          # as an array
      concurrency: 3  # Process 3 batches concurrently
    
    request:
      url: "customers/batch"
      method: "POST"
      payload:
        ids: '{join(state.customer_batch, ","}'
    
    response:
      records:
        jmespath: "customers[]"
        primary_key: ["id"]

Real-World Example: Stripe API

This example from the Stripe API demonstrates a complete queue-based workflow:

queues:
  - customer_ids
  - invoice_ids

endpoints:
  # Step 1: Collect customer IDs
  customer:
    description: "Retrieve list of customers"
    
    request:
      url: "customers"
    
    pagination:
      next_state:
        starting_after: '{response.records[-1].id}'
      stop_condition: "response.json.has_more == false"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Send each customer ID to the queue
        - expression: "record.id"
          output: "queue.customer_ids"

  # Step 2: Get customer balance transactions
  customer_balance_transaction:
    description: "Retrieve customer balance transactions"
    
    iterate:
      over: "queue.customer_ids"
      into: "state.customer_id"
    
    request:
      url: "customers/{state.customer_id}/balance_transactions"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Add customer_id to each transaction record
        - expression: "state.customer_id"
          output: "record.customer_id"

  # Step 3: Collect invoice IDs while getting invoice data
  invoice:
    description: "Retrieve invoices and queue IDs for line items"
    
    request:
      url: "invoices"
    
    pagination:
      next_state:
        starting_after: '{response.records[-1].id}'
      stop_condition: 'jmespath(response.json, "has_more") == false'
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Queue invoice IDs for line item extraction
        - expression: "record.id"
          output: "queue.invoice_ids"

  # Step 4: Get line items for each invoice
  invoice_line_item:
    description: "Retrieve invoice line items"
    
    iterate:
      over: "queue.invoice_ids"
      into: "state.invoice_id"
    
    request:
      url: "invoices/{state.invoice_id}/lines"
    
    response:
      records:
        jmespath: "data[]"
        primary_key: ["id"]
      
      processors:
        # Link line items back to their invoice
        - expression: "state.invoice_id"
          output: "record.invoice_id"

Queue Properties and Behavior

Queue Characteristics

Property
Description
Example

Temporary Storage

Queues are backed by temporary files

Automatically cleaned up after run

FIFO Order

Items are processed in first-in, first-out order

IDs processed in the order they were added

JSON Encoding

All data is JSON-encoded for safe storage

Handles strings, numbers, objects, arrays

Single Run Scope

Queues exist only within a single Sling execution

Cannot persist between separate runs

Queue Lifecycle

Direct Queue-to-Records Pattern

You can pipe queue data directly to records without making HTTP requests using the special syntax iterate.into: "response.records".

Basic Syntax

endpoints:
  deduplicate_customers:
    description: "Remove duplicate customer IDs"

    iterate:
      over: "queue.raw_customer_ids"
      into: "response.records"  # Special: Direct to records, no HTTP call

    # No request block needed!

    response:
      records:
        primary_key: ["customer_id"]  # Deduplicate

      processors:
        - expression: "record"
          output: "queue.clean_customer_ids"

When to Use

  • Deduplicate queue items before further processing

  • Enrich queue data with state variables

  • Transform queue structure

  • Export queue contents as a separate dataset

Example: Deduplication Workflow

queues:
  - raw_customer_ids
  - clean_customer_ids

endpoints:
  # Step 1: Collect IDs (may have duplicates)
  list_orders:
    request:
      url: "{state.base_url}/orders"
    response:
      processors:
        - expression: "record.customer_id"
          output: "queue.raw_customer_ids"

  # Step 2: Deduplicate using direct queue-to-records
  deduplicate_customers:
    iterate:
      over: "queue.raw_customer_ids"
      into: "response.records"
    response:
      records:
        primary_key: ["customer_id"]
      processors:
        - expression: "record"
          output: "queue.clean_customer_ids"

  # Step 3: Use clean IDs
  customer_details:
    iterate:
      over: "queue.clean_customer_ids"
      into: "state.customer_id"
    request:
      url: "{state.base_url}/customers/{state.customer_id}"

Data Type Handling

Scalar values (strings/numbers) are wrapped: "user123"{"value": "user123"}

processors:
  - expression: "record.value"  # Access via .value
    output: "record.user_id"

Object values are used as-is: {"id": 1, "name": "Alice"} → same structure

processors:
  - expression: "record.id"  # Access fields directly
    output: "record.user_id"

Limitations

  • No HTTP response data (response.status, response.headers unavailable)

  • Cannot use pagination or response rules

  • Don't define a request block (it will be ignored)

💡 Tip: This pattern is much faster than unnecessary HTTP requests for queue transformation steps.

Advanced Queue Patterns

Pattern 1: Multi-Level Hierarchies

Process nested data structures with multiple queue levels:

queues:
  - account_ids
  - customer_ids
  - subscription_ids

endpoints:
  accounts:
    response:
      # Level 1: Get accounts
      processors:
        - expression: "record.id"
          output: "queue.account_ids"

  customers:
    # Level 2: Get customers for each account
    iterate:
      over: "queue.account_ids"
      into: "state.account_id"
    response:
      processors:
        - expression: "record.id"
          output: "queue.customer_ids"

  subscriptions:
    # Level 3: Get subscriptions for each customer
    iterate:
      over: "queue.customer_ids"
      into: "state.customer_id"
    response:
      processors:
        - expression: "record.id"
          output: "queue.subscription_ids"

Pattern 2: Conditional Queue Population

Only queue certain items based on conditions:

processors:
  # Only queue active customers
  - expression: >
      if(record.status == "active", record.id, null)
    output: "queue.active_customer_ids"
  
  # Queue high-value customers for special processing
  - expression: >
      if(record.total_spent > 10000, record.id, null)
    output: "queue.vip_customer_ids"

Pattern 3: Queue Transformation

Transform data before queuing:

processors:
  # Create composite keys for the queue
  - expression: >
      {
        "customer_id": record.id,
        "type": record.customer_type,
        "priority": if(record.is_vip, "high", "normal")
      }
    output: "queue.enriched_customers"

Queue Best Practices

Performance Optimization

iterate:
  over: "chunk(queue.large_dataset, 100)"  # Process in batches
  into: "state.batch"
  concurrency: 5  # Parallel processing

Error Handling with Queues

rules:
  # Continue processing other queue items even if one fails
  - action: "continue"
    condition: "response.status == 404"
    message: "Item not found, skipping"
  
  # Retry transient errors
  - action: "retry"
    condition: "response.status >= 500"
    max_attempts: 3

💡 Tip: Use descriptive queue names that clearly indicate their purpose (e.g., customer_ids, pending_order_ids, failed_payment_ids).

⚠️ Warning: Queues consume disk space proportional to the number of items. For very large datasets (millions of items), monitor available disk space.

📝 Note: Queue items are automatically JSON-encoded, so complex objects, arrays, and special characters are handled safely.

Last updated

Was this helpful?