Incremental
Examples of incrementally loading data from APIs to databases using sync state
Incremental loading from APIs allows you to fetch only new or updated data since the last run, reducing API calls and improving performance. Sling uses the sync feature to persist state between runs.
Learn more: Incremental Sync " State Variables
How API Incremental Sync Works
API incremental sync uses the sync key to persist values between runs:
Define which state variables to persist using
sync: [variable_name]On first run, use a default value (e.g., 30 days ago)
Track the maximum timestamp/ID in each response using processors
On subsequent runs, use the persisted value from
sync.variable_name
The state is stored in the target database by default, or in a location specified by SLING_STATE.
Timestamp-Based Incremental Sync
This is the most common pattern - fetching records updated since the last run.
Spec File (orders_api.yaml)
name: "Orders API"
defaults:
state:
base_url: https://api.shop.com/v1
request:
headers:
Authorization: "Bearer {secrets.api_key}"
endpoints:
orders:
description: "Get orders updated since last sync"
# Persist last_updated_at for next run
sync: [last_updated_at]
state:
# Use last sync time if available, otherwise default to 30 days ago
updated_at_min: >
{coalesce(
sync.last_updated_at,
date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
)}
limit: 100
offset: 0
request:
url: "{state.base_url}/orders"
parameters:
# Filter API to only return records updated after this timestamp
updated_at_min: "{state.updated_at_min}"
limit: "{state.limit}"
offset: "{state.offset}"
pagination:
next_state:
offset: "{state.offset + state.limit}"
stop_condition: length(response.records) < state.limit
response:
records:
jmespath: "orders[]"
primary_key: ["order_id"]
processors:
# Track the maximum updated_at timestamp across all records
- expression: "record.updated_at"
output: "state.last_updated_at"
aggregation: "maximum" # Save highest timestamp for next sync
overrides:
mode: incremental # Upsert based on primary_keyUsing Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
streams:
orders:
object: public.orders
env:
SLING_STATE: postgres/sling_state.my_apiOn first run, this will fetch orders from the last 30 days. On subsequent runs, it will only fetch orders updated since the last run.
Using Python
from sling import Replication, ReplicationStream
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'orders': ReplicationStream(
object='public.orders'
)
},
env={'SLING_STATE': 'postgres/sling_state.my_api'}
)
# First run: fetches last 30 days
replication.run()
# Subsequent runs: fetches only new/updated orders
replication.run()ID-Based Incremental Sync
For APIs with monotonically increasing IDs, you can sync based on the highest ID.
Spec File (events_api.yaml)
name: "Events API"
defaults:
state:
base_url: https://api.events.com/v2
request:
headers:
X-API-Key: "{secrets.api_key}"
endpoints:
events:
description: "Get events with ID greater than last sync"
# Persist last event ID
sync: [last_event_id]
state:
# Start from last ID, or 0 if first run
min_id: "{coalesce(sync.last_event_id, 0)}"
limit: 1000
request:
url: "{state.base_url}/events"
parameters:
# Only fetch events with ID greater than last sync
id_gt: "{state.min_id}"
limit: "{state.limit}"
sort: "id asc" # Ensure ascending order
pagination:
next_state:
# Use last record's ID for next page
min_id: "{response.records[-1].id}"
stop_condition: length(response.records) < state.limit
response:
records:
jmespath: "events[]"
primary_key: ["id"]
processors:
# Track the highest event ID
- expression: "record.id"
output: "state.last_event_id"
aggregation: "maximum"
overrides:
mode: incrementalUsing Replication
source: MY_API
target: MY_TARGET_DB
streams:
events:
object: analytics.events
env:
SLING_STATE: postgres/sling_state.my_apiDate-Based Incremental with Iteration
For APIs that require a date parameter, use iteration with sync state.
Spec File (analytics_api.yaml)
name: "Analytics API"
defaults:
state:
base_url: https://analytics.example.com/api
request:
headers:
Authorization: "Bearer {secrets.access_token}"
endpoints:
daily_stats:
description: "Get daily statistics since last sync"
# Persist last processed date
sync: [last_date]
iterate:
# Generate dates from last sync to yesterday
over: >
range(
coalesce(sync.last_date, date_format(date_add(now(), -7, "day"), "%Y-%m-%d")),
date_format(date_add(now(), -1, "day"), "%Y-%m-%d"),
"1d"
)
into: "state.current_date"
concurrency: 5
state:
date: '{date_format(state.current_date, "%Y-%m-%d")}'
request:
url: "{state.base_url}/stats/daily/{state.date}"
response:
records:
jmespath: "data[]"
primary_key: ["metric_id", "date"]
processors:
# Track the latest date processed
- expression: "state.date"
output: "state.last_date"
aggregation: "maximum"
overrides:
mode: incrementalUsing Replication
source: MY_API
target: MY_TARGET_DB
streams:
daily_stats:
object: analytics.daily_statistics
env:
SLING_STATE: postgres/sling_state.my_apiMultiple Sync Variables
You can persist multiple values for complex sync scenarios.
Spec File (multi_sync_api.yaml)
name: "Multi-Sync API"
defaults:
state:
base_url: https://api.example.com
request:
headers:
Authorization: "Bearer {secrets.api_key}"
endpoints:
products:
description: "Track both timestamp and version for sync"
# Persist multiple sync variables
sync: [last_updated_at, last_version]
state:
updated_at_min: >
{coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%d"))}
version_min: "{coalesce(sync.last_version, 0)}"
request:
url: "{state.base_url}/products"
parameters:
updated_since: "{state.updated_at_min}"
version_gt: "{state.version_min}"
pagination:
next_state:
offset: "{state.offset + 100}"
stop_condition: length(response.records) < 100
response:
records:
jmespath: "products[]"
primary_key: ["product_id"]
processors:
# Track both timestamp and version
- expression: "record.updated_at"
output: "state.last_updated_at"
aggregation: "maximum"
- expression: "record.version"
output: "state.last_version"
aggregation: "maximum"
overrides:
mode: incrementalUsing SLING_STATE
By default, sync state is stored in the target database. You can store it externally using SLING_STATE.
Using Replication with External State
source: MY_API
target: MY_TARGET_DB
streams:
orders:
object: public.orders
events:
object: analytics.events
products:
object: public.products
env:
# Store state in S3 instead of target database
SLING_STATE: AWS_S3/sling/stateThis creates state files at s3://your-bucket/sling/state/ for each stream.
Incremental with Full Refresh Fallback
You can mix incremental and full-refresh streams based on your needs.
source: MY_API
target: MY_TARGET_DB
streams:
# Incremental sync for large, frequently updated table
orders:
object: public.orders
# Uses incremental mode from spec overrides
# Full refresh for small, infrequently updated table
product_categories:
object: public.categories
mode: full-refresh # Override spec's incremental mode
env:
SLING_STATE: postgres/sling_state.my_apiBest Practices
1. Always Provide Sensible Defaults
Use coalesce() to handle the first run gracefully:
state:
updated_at_min: >
{coalesce(
sync.last_updated_at,
date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
)}2. Use Appropriate Aggregation
Match the aggregation to your sync variable type:
processors:
# For timestamps and IDs
- expression: "record.updated_at"
output: "state.last_updated_at"
aggregation: "maximum"
# For first/last values
- expression: "record.cursor"
output: "state.next_cursor"
aggregation: "last"3. Include Primary Keys
Always specify primary keys for proper upserting:
response:
records:
primary_key: ["id"] # Or composite: ["user_id", "event_id"]4. Handle Edge Cases
Account for APIs that might return stale data:
state:
# Add a small lookback window to catch late-arriving updates
updated_at_min: >
{coalesce(
date_format(date_add(date_parse(sync.last_updated_at), -1, "hour"), "%Y-%m-%dT%H:%M:%S%z"),
date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
)}5. Use Incremental Mode Override
Set the mode in the spec to ensure consistent behavior:
overrides:
mode: incrementalCombining Incremental with Backfill
You can use both incremental sync and backfill capabilities together.
First: Backfill Historical Data
source: MY_API
target: MY_TARGET_DB
streams:
orders:
object: public.orders
source_options:
# Backfill 2023 data
range: '2023-01-01,2023-12-31'
env:
SLING_STATE: postgres/sling_state.my_apiThen: Switch to Incremental
source: MY_API
target: MY_TARGET_DB
streams:
orders:
object: public.orders
# No range specified - uses sync state for incremental loading
env:
SLING_STATE: postgres/sling_state.my_apiLast updated
Was this helpful?