Use Hooks Data
Examples of using data from hooks/steps to drive API iteration
Hooks can be used to query data and pass it to API endpoints for iteration. This pattern is powerful when you need to:
Fetch a list of IDs or parameters from a database
Use those values to make corresponding API calls
Coordinate data between multiple sources
Learn more: Query Hook | Store Hook | API Iteration
How Hook-Driven API Iteration Works
API endpoints can iterate over data provided via hooks using context.store:
Use a
queryhook withintoparameter to fetch records and store them in the storeReference the stored records in your API spec using
context.store.variable_nameThe API endpoint iterates over each record, making one API call per record
Each record's fields are accessible as
state.record_field
The data flow:
Database Query ➡️ Hook Store ➡️ context.store ➡️ API Iteration ➡️ Target DatabaseQuery-Driven Ticker Data Collection
Fetch ticker symbols from a database and retrieve market data for each one from an API.
Spec File (polygon.yaml)
name: Polygon
description: Polygon.io provides real-time and historical market data
defaults:
state:
base_url: https://api.polygon.io
request:
headers:
Authorization: 'Bearer {require(secrets.api_key, "Polygon API key required")}'
rate: 10
concurrency: 3
endpoints:
# This endpoint iterates over records provided via context.store
options_daily_ticker_summary:
description: "Daily open/close summary for options tickers"
docs: https://polygon.io/docs/rest/options/aggregates/daily-ticker-summary
# Require the hook to provide ticker_date_records
iterate:
over: require(context.store.ticker_date_records, "Must provide ticker_date_records via hook")
into: "state.ticker_date_record"
concurrency: 10
state:
# Extract fields from each record
date: '{date_format(state.ticker_date_record.date, "%Y-%m-%d")}'
ticker: '{require(state.ticker_date_record.ticker)}'
request:
url: '{state.base_url}/v1/open-close/{state.ticker}/{state.date}'
response:
records:
jmespath: "@" # Single object response
primary_key: ["symbol", "from"]
overrides:
mode: incrementalUsing Replication with Query Hook
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_POLYGON_API
target: MY_TARGET_DB
hooks:
start:
# Query database to get list of tickers and dates to fetch
- type: query
connection: MY_TARGET_DB
query: |
SELECT
ticker,
date
FROM public.active_tickers
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
AND data_fetched = false
ORDER BY date DESC, ticker
LIMIT 100
# Store results in the store for API to consume
into: "ticker_date_records"
streams:
# The endpoint will iterate over each ticker/date combination
options_daily_ticker_summary:
object: market_data.options_dailyEach record from the query (ticker + date) triggers one API call. The endpoint makes 100 API calls in this example.
Using Python
from sling import Replication, ReplicationStream
from sling.hooks import HookQuery, HookMap
replication = Replication(
source='MY_POLYGON_API',
target='MY_TARGET_DB',
streams={
'options_daily_ticker_summary': ReplicationStream(
object='market_data.options_daily'
)
},
hooks=HookMap(
start=[
HookQuery(
connection='MY_TARGET_DB',
query='''
SELECT ticker, date
FROM public.active_tickers
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
AND data_fetched = false
LIMIT 100
''',
into='ticker_date_records'
)
]
)
)
replication.run()Date Range Generation with Store Hook
Use store hooks to build date ranges that drive API iteration.
Spec File (analytics_api.yaml)
name: "Analytics API"
defaults:
state:
base_url: https://api.analytics.com/v1
request:
headers:
Authorization: "Bearer {secrets.api_key}"
endpoints:
daily_metrics:
description: "Get daily metrics for dates provided via context.store"
iterate:
over: require(context.store.date_list, "Must provide date_list via hook")
into: "state.current_date"
concurrency: 5
state:
date: '{date_format(state.current_date, "%Y-%m-%d")}'
request:
url: "{state.base_url}/metrics/daily"
parameters:
date: "{state.date}"
response:
records:
jmespath: "data.metrics[]"
primary_key: ["metric_id", "date"]
overrides:
mode: incrementalUsing Replication with Store Hook
source: MY_API
target: MY_TARGET_DB
hooks:
start:
# Generate list of dates to process
- type: store
key: date_list
value: >
{range(
date_format(date_add(now(), -7, "day"), "%Y-%m-%d"),
date_format(now(), "%Y-%m-%d"),
"1d"
)}
streams:
daily_metrics:
object: analytics.daily_metricsThe range() function generates an array of dates, stored in context.store.date_list, which the API endpoint iterates over.
Customer IDs with Record Enrichment
Query customer IDs and use them to fetch detailed customer data from an API.
Spec File (customers_api.yaml)
name: "Customers API"
defaults:
state:
base_url: https://api.customers.com/v2
request:
headers:
X-API-Key: "{secrets.api_key}"
endpoints:
customer_details:
description: "Get detailed customer information by ID"
iterate:
over: require(context.store.customer_records, "Must provide customer_records via hook")
into: "state.customer_record"
concurrency: 10
state:
customer_id: '{state.customer_record.customer_id}'
# Can also access other fields from the query
region: '{state.customer_record.region}'
request:
url: "{state.base_url}/customers/{state.customer_id}"
parameters:
include_details: "true"
region: "{state.region}"
response:
records:
jmespath: "@"
primary_key: ["customer_id"]
processors:
# Add the region from our query to the response
- expression: state.region
output: record.source_region
overrides:
mode: incrementalUsing Replication
source: MY_API
target: MY_TARGET_DB
streams:
customer_details:
object: public.customer_details
hooks:
pre:
# Query customers that need enrichment
- type: query
connection: MY_TARGET_DB
query: |
SELECT
c.customer_id,
c.region,
c.last_updated
FROM public.customers c
LEFT JOIN public.customer_details cd
ON c.customer_id = cd.customer_id
WHERE cd.customer_id IS NULL
OR c.last_updated > cd.details_fetched_at
LIMIT 500
into: "customer_records"
post:
# Mark customers as enriched
- type: query
connection: MY_TARGET_DB
if: run.status == "success"
query: |
UPDATE public.customer_details
SET details_fetched_at = CURRENT_TIMESTAMP
WHERE customer_id IN (
SELECT DISTINCT customer_id
FROM public.customer_details
WHERE details_fetched_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
)Processor Output to Hooks Integration
Use processor outputs (env.* and context.store.*) to pass aggregated data from API responses to hooks for validation, logging, or conditional logic.
Spec File (orders_api.yaml)
name: "Orders API"
defaults:
state:
base_url: https://api.orders.com/v1
request:
headers:
Authorization: "Bearer {secrets.api_key}"
endpoints:
orders:
description: "Fetch orders with metadata tracking"
request:
url: "{state.base_url}/orders"
parameters:
status: "completed"
limit: 100
response:
records:
jmespath: "data.orders[]"
primary_key: ["order_id"]
processors:
# Store the maximum timestamp in environment variable
- expression: "record.updated_at"
output: "env.MAX_ORDER_TIMESTAMP"
aggregation: "maximum"
# Store the first order ID in replication store
- expression: "record.order_id"
output: "context.store.first_order_id"
aggregation: "first"
# Store the last order ID in replication store
- expression: "record.order_id"
output: "context.store.last_order_id"
aggregation: "last"
# Track total order count
- expression: "1"
output: "context.store.order_count"
aggregation: "collect"
overrides:
mode: full-refreshUsing Replication with End Hooks
source: MY_ORDERS_API
target: MY_TARGET_DB
streams:
orders:
object: sales.orders
hooks:
start:
- type: log
message: "Starting order sync..."
end:
# Validate that we got data
- type: check
check: store.first_order_id != nil
on_failure: fail
message: "No orders were processed!"
# Log processing summary using values from processors
- type: log
message: |
Orders sync completed successfully!
========================================
First Order ID: {store.first_order_id}
Last Order ID: {store.last_order_id}
Total Orders: {len(store.order_count)}
Max Timestamp: {env.MAX_ORDER_TIMESTAMP}
# Update metadata table with sync information
- type: query
connection: MY_TARGET_DB
if: execution.status.error == 0
query: |
INSERT INTO sales.sync_metadata (
sync_date,
first_order_id,
last_order_id,
order_count,
max_timestamp
) VALUES (
CURRENT_TIMESTAMP,
'{store.first_order_id}',
'{store.last_order_id}',
{len(store.order_count)},
'{env.MAX_ORDER_TIMESTAMP}'
)
# Conditional validation based on order count
- type: check
check: len(store.order_count) >= 10
on_failure: log
message: "Warning: Fewer than 10 orders processed ({len(store.order_count)})"Using Python
from sling import Replication, ReplicationStream
from sling.hooks import HookLog, HookCheck, HookQuery, HookMap
replication = Replication(
source='MY_ORDERS_API',
target='MY_TARGET_DB',
streams={
'orders': ReplicationStream(
object='sales.orders'
)
},
hooks=HookMap(
start=[
HookLog(message="Starting order sync...")
],
end=[
HookCheck(
check='store.first_order_id != nil',
on_failure='fail',
message='No orders were processed!'
),
HookLog(
message='''Orders sync completed successfully!
First Order ID: {store.first_order_id}
Last Order ID: {store.last_order_id}
Total Orders: {len(store.order_count)}
Max Timestamp: {env.MAX_ORDER_TIMESTAMP}'''
),
HookQuery(
connection='MY_TARGET_DB',
if_='execution.status.error == 0',
query='''INSERT INTO sales.sync_metadata
(sync_date, first_order_id, last_order_id, order_count, max_timestamp)
VALUES (CURRENT_TIMESTAMP, '{store.first_order_id}',
'{store.last_order_id}', {len(store.order_count)},
'{env.MAX_ORDER_TIMESTAMP}')'''
)
]
)
)
replication.run()This pattern is useful for:
Tracking metadata about API responses (record counts, date ranges, etc.)
Validating data quality before committing to the target
Logging detailed sync information
Conditional hook execution based on aggregated values
Updating audit or metadata tables with sync statistics
Setting Environment Variables for API Authentication
Use store hooks with env.* prefix to set environment variables that are available during API spec rendering. This is particularly powerful when you need to:
Dynamically compute authentication parameters before API calls
Inject values into authentication blocks before spec compilation
Pass computed values to dynamic endpoints
Use values from database queries or previous API calls in authentication
The key advantage: environment variables set via env.* hooks are available before the API spec is compiled/rendered, making them usable in authentication blocks and dynamic_endpoints blocks.
Spec File (api_with_dynamic_auth.yaml)
name: "API with Dynamic Auth"
description: "Demonstrates using environment variables set by hooks in authentication"
# Environment variables set by hooks are available here during rendering
authentication:
type: basic
username: "{env.COMPUTED_USERNAME}"
password: "{env.COMPUTED_PASSWORD}"
defaults:
state:
base_url: https://api.example.com/v1
request:
headers:
X-Environment: "{env.DEPLOYMENT_ENV}"
endpoints:
users:
description: "Fetch users with dynamically configured authentication"
request:
url: "{state.base_url}/users"
parameters:
# Can also use env vars in request parameters
api_version: "{env.API_VERSION}"
response:
records:
jmespath: "data.users[]"
primary_key: ["user_id"]
# Environment variables are also available in dynamic endpoints
dynamic_endpoints:
- iterate: '{split(env.RESOURCE_LIST, ",")}'
into: "state.resource_name"
endpoint:
name: "data_{state.resource_name}"
request:
url: "{state.base_url}/{state.resource_name}"
headers:
X-Resource-Token: "{env.RESOURCE_TOKEN}"
response:
records:
jmespath: "data[]"Using Pipeline with Environment Variable Setup
Running with Sling: sling run -p /path/to/pipeline.yaml
steps:
# Step 1: Set environment variables before replication runs
- type: store
key: env.COMPUTED_USERNAME
value: "api_user_prod"
- type: store
key: env.COMPUTED_PASSWORD
value: >
{base64_encode("secure_password_123")}
- type: store
key: env.DEPLOYMENT_ENV
value: "production"
- type: store
key: env.API_VERSION
value: "v2"
# Step 2: Query database to get resource list
- type: query
connection: MY_CONFIG_DB
query: |
SELECT string_agg(resource_name, ',') as resources
FROM api_resources
WHERE enabled = true
into: resource_query
# Step 3: Set resource list/token as environment variable
- type: store
map:
env.RESOURCE_LIST: '{store.resource_query[0].resources}'
env.RESOURCE_TOKEN: '{sha256(env.COMPUTED_USERNAME + "-" + now())}'
# Step 4: Log the configuration
- type: log
message: |
API Configuration:
- Username: {env.COMPUTED_USERNAME}
- Environment: {env.DEPLOYMENT_ENV}
- API Version: {env.API_VERSION}
- Resources: {env.RESOURCE_LIST}
# Step 5: Run replication with dynamically configured API
- type: replication
path: /path/to/replication.yamlReplication File
source: MY_API_WITH_DYNAMIC_AUTH
target: MY_TARGET_DB
streams:
users:
object: public.api_users
# Dynamic endpoints will be created based on RESOURCE_LIST
# e.g., data_customers, data_products, data_ordersKey Benefits
Dynamic Authentication: Compute credentials at runtime (e.g., from database queries, secrets managers, or transformations)
Pre-Compilation Configuration: Environment variables set via
env.*hooks are available during API spec rendering, allowing use in:Authentication blocks
Dynamic endpoint definitions
Default configurations
Separation of Concerns: Keep sensitive or dynamic values out of spec files
Database-Driven Configuration: Query databases to determine which endpoints to call or which credentials to use
Multi-Environment Support: Dynamically configure API behavior based on deployment environment without changing spec files
Common Use Cases:
Rotating API keys fetched from a secrets manager
Environment-specific endpoints (dev/staging/prod)
Database-driven resource lists for dynamic endpoints
Computed authentication tokens based on current state
Multi-tenant API configurations
Last updated
Was this helpful?