Queues
Queues enable sophisticated multi-step data extraction workflows where one endpoint collects data (such as records or identifiers )that is used by subsequent endpoints. This is essential for APIs that require separate calls to fetch related data.
Queue Architecture
Queue Declaration and Usage
1. Queue Declaration
Queues must be declared at the top level of your API spec:
# Declare queues for passing data between endpoints
queues:
- customer_ids
- invoice_ids
- order_items2. Sending Data to Queues
Use processors to send data from one endpoint to a queue:
endpoints:
list_customers:
description: "Get list of customers"
request:
url: "customers"
response:
records:
jmespath: "data[]"
primary_key: ["id"]
processors:
# Send customer IDs to queue for detailed processing
- expression: "record.id"
output: "queue.customer_ids"3. Consuming Data from Queues
Use the iterate section to process each item from a queue:
endpoints:
customer_details:
description: "Get detailed customer information"
# Process each customer ID from the queue
iterate:
over: "queue.customer_ids"
into: "state.current_customer_id"
request:
url: "customers/{state.current_customer_id}"
response:
records:
jmespath: "[*]" # Single customer object
primary_key: ["id"]
processors:
# Add the customer ID to each record for reference
- expression: "state.current_customer_id"
output: "record.customer_id"Queue Functions
Queues can be used with built-in functions for advanced processing:
Chunking Queue Data
Process queue items in batches for more efficient API calls:
endpoints:
batch_customer_details:
description: "Process customers in batches"
iterate:
# Process 50 customer IDs at a time
over: "chunk(queue.customer_ids, 50)"
into: "state.customer_batch" # as an array
concurrency: 3 # Process 3 batches concurrently
request:
url: "customers/batch"
method: "POST"
payload:
ids: '{join(state.customer_batch, ","}'
response:
records:
jmespath: "customers[]"
primary_key: ["id"]Real-World Example: Stripe API
This example from the Stripe API demonstrates a complete queue-based workflow:
queues:
- customer_ids
- invoice_ids
endpoints:
# Step 1: Collect customer IDs
customer:
description: "Retrieve list of customers"
request:
url: "customers"
pagination:
next_state:
starting_after: '{response.records[-1].id}'
stop_condition: "response.json.has_more == false"
response:
records:
jmespath: "data[]"
primary_key: ["id"]
processors:
# Send each customer ID to the queue
- expression: "record.id"
output: "queue.customer_ids"
# Step 2: Get customer balance transactions
customer_balance_transaction:
description: "Retrieve customer balance transactions"
iterate:
over: "queue.customer_ids"
into: "state.customer_id"
request:
url: "customers/{state.customer_id}/balance_transactions"
response:
records:
jmespath: "data[]"
primary_key: ["id"]
processors:
# Add customer_id to each transaction record
- expression: "state.customer_id"
output: "record.customer_id"
# Step 3: Collect invoice IDs while getting invoice data
invoice:
description: "Retrieve invoices and queue IDs for line items"
request:
url: "invoices"
pagination:
next_state:
starting_after: '{response.records[-1].id}'
stop_condition: 'jmespath(response.json, "has_more") == false'
response:
records:
jmespath: "data[]"
primary_key: ["id"]
processors:
# Queue invoice IDs for line item extraction
- expression: "record.id"
output: "queue.invoice_ids"
# Step 4: Get line items for each invoice
invoice_line_item:
description: "Retrieve invoice line items"
iterate:
over: "queue.invoice_ids"
into: "state.invoice_id"
request:
url: "invoices/{state.invoice_id}/lines"
response:
records:
jmespath: "data[]"
primary_key: ["id"]
processors:
# Link line items back to their invoice
- expression: "state.invoice_id"
output: "record.invoice_id"Queue Properties and Behavior
Queue Characteristics
Temporary Storage
Queues are backed by temporary files
Automatically cleaned up after run
FIFO Order
Items are processed in first-in, first-out order
IDs processed in the order they were added
JSON Encoding
All data is JSON-encoded for safe storage
Handles strings, numbers, objects, arrays
Single Run Scope
Queues exist only within a single Sling execution
Cannot persist between separate runs
Queue Lifecycle
Direct Queue-to-Records Pattern
You can pipe queue data directly to records without making HTTP requests using the special syntax iterate.into: "response.records".
Basic Syntax
endpoints:
deduplicate_customers:
description: "Remove duplicate customer IDs"
iterate:
over: "queue.raw_customer_ids"
into: "response.records" # Special: Direct to records, no HTTP call
# No request block needed!
response:
records:
primary_key: ["customer_id"] # Deduplicate
processors:
- expression: "record"
output: "queue.clean_customer_ids"When to Use
Deduplicate queue items before further processing
Enrich queue data with state variables
Transform queue structure
Export queue contents as a separate dataset
Example: Deduplication Workflow
queues:
- raw_customer_ids
- clean_customer_ids
endpoints:
# Step 1: Collect IDs (may have duplicates)
list_orders:
request:
url: "{state.base_url}/orders"
response:
processors:
- expression: "record.customer_id"
output: "queue.raw_customer_ids"
# Step 2: Deduplicate using direct queue-to-records
deduplicate_customers:
iterate:
over: "queue.raw_customer_ids"
into: "response.records"
response:
records:
primary_key: ["customer_id"]
processors:
- expression: "record"
output: "queue.clean_customer_ids"
# Step 3: Use clean IDs
customer_details:
iterate:
over: "queue.clean_customer_ids"
into: "state.customer_id"
request:
url: "{state.base_url}/customers/{state.customer_id}"Data Type Handling
Scalar values (strings/numbers) are wrapped: "user123" → {"value": "user123"}
processors:
- expression: "record.value" # Access via .value
output: "record.user_id"Object values are used as-is: {"id": 1, "name": "Alice"} → same structure
processors:
- expression: "record.id" # Access fields directly
output: "record.user_id"Limitations
No HTTP response data (
response.status,response.headersunavailable)Cannot use pagination or response rules
Don't define a
requestblock (it will be ignored)
💡 Tip: This pattern is much faster than unnecessary HTTP requests for queue transformation steps.
Advanced Queue Patterns
Pattern 1: Multi-Level Hierarchies
Process nested data structures with multiple queue levels:
queues:
- account_ids
- customer_ids
- subscription_ids
endpoints:
accounts:
response:
# Level 1: Get accounts
processors:
- expression: "record.id"
output: "queue.account_ids"
customers:
# Level 2: Get customers for each account
iterate:
over: "queue.account_ids"
into: "state.account_id"
response:
processors:
- expression: "record.id"
output: "queue.customer_ids"
subscriptions:
# Level 3: Get subscriptions for each customer
iterate:
over: "queue.customer_ids"
into: "state.customer_id"
response:
processors:
- expression: "record.id"
output: "queue.subscription_ids"Pattern 2: Conditional Queue Population
Only queue certain items based on conditions:
processors:
# Only queue active customers
- expression: >
if(record.status == "active", record.id, null)
output: "queue.active_customer_ids"
# Queue high-value customers for special processing
- expression: >
if(record.total_spent > 10000, record.id, null)
output: "queue.vip_customer_ids"Pattern 3: Queue Transformation
Transform data before queuing:
processors:
# Create composite keys for the queue
- expression: >
{
"customer_id": record.id,
"type": record.customer_type,
"priority": if(record.is_vip, "high", "normal")
}
output: "queue.enriched_customers"Queue Best Practices
Performance Optimization
iterate:
over: "chunk(queue.large_dataset, 100)" # Process in batches
into: "state.batch"
concurrency: 5 # Parallel processingError Handling with Queues
rules:
# Continue processing other queue items even if one fails
- action: "continue"
condition: "response.status == 404"
message: "Item not found, skipping"
# Retry transient errors
- action: "retry"
condition: "response.status >= 500"
max_attempts: 3💡 Tip: Use descriptive queue names that clearly indicate their purpose (e.g.,
customer_ids,pending_order_ids,failed_payment_ids).
⚠️ Warning: Queues consume disk space proportional to the number of items. For very large datasets (millions of items), monitor available disk space.
📝 Note: Queue items are automatically JSON-encoded, so complex objects, arrays, and special characters are handled safely.
Last updated
Was this helpful?