Incremental

Examples of incrementally loading data from APIs to databases using sync state

Incremental loading from APIs allows you to fetch only new or updated data since the last run, reducing API calls and improving performance. Sling uses the sync feature to persist state between runs.

Learn more: Incremental Sync " State Variables

How API Incremental Sync Works

API incremental sync uses the sync key to persist values between runs:

  1. Define which state variables to persist using sync: [variable_name]

  2. On first run, use a default value (e.g., 30 days ago)

  3. Track the maximum timestamp/ID in each response using processors

  4. On subsequent runs, use the persisted value from sync.variable_name

The state is stored in the target database by default, or in a location specified by SLING_STATE.

Timestamp-Based Incremental Sync

This is the most common pattern - fetching records updated since the last run.

Spec File (orders_api.yaml)

orders_api.yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.shop.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  orders:
    description: "Get orders updated since last sync"

    # Persist last_updated_at for next run
    sync: [last_updated_at]

    state:
      # Use last sync time if available, otherwise default to 30 days ago
      updated_at_min: >
        {coalesce(
          sync.last_updated_at,
          date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
        )}
      limit: 100
      offset: 0

    request:
      url: "{state.base_url}/orders"
      parameters:
        # Filter API to only return records updated after this timestamp
        updated_at_min: "{state.updated_at_min}"
        limit: "{state.limit}"
        offset: "{state.offset}"

    pagination:
      next_state:
        offset: "{state.offset + state.limit}"
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
        primary_key: ["order_id"]

      processors:
        # Track the maximum updated_at timestamp across all records
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"  # Save highest timestamp for next sync

    overrides:
      mode: incremental  # Upsert based on primary_key

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders

env:
  SLING_STATE: postgres/sling_state.my_api

On first run, this will fetch orders from the last 30 days. On subsequent runs, it will only fetch orders updated since the last run.


Using Python

api_to_database.py
from sling import Replication, ReplicationStream

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='public.orders'
        )
    },
    env={'SLING_STATE': 'postgres/sling_state.my_api'}
)

# First run: fetches last 30 days
replication.run()

# Subsequent runs: fetches only new/updated orders
replication.run()

ID-Based Incremental Sync

For APIs with monotonically increasing IDs, you can sync based on the highest ID.

Spec File (events_api.yaml)

events_api.yaml
name: "Events API"

defaults:
  state:
    base_url: https://api.events.com/v2
  request:
    headers:
      X-API-Key: "{secrets.api_key}"

endpoints:
  events:
    description: "Get events with ID greater than last sync"

    # Persist last event ID
    sync: [last_event_id]

    state:
      # Start from last ID, or 0 if first run
      min_id: "{coalesce(sync.last_event_id, 0)}"
      limit: 1000

    request:
      url: "{state.base_url}/events"
      parameters:
        # Only fetch events with ID greater than last sync
        id_gt: "{state.min_id}"
        limit: "{state.limit}"
        sort: "id asc"  # Ensure ascending order

    pagination:
      next_state:
        # Use last record's ID for next page
        min_id: "{response.records[-1].id}"
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "events[]"
        primary_key: ["id"]

      processors:
        # Track the highest event ID
        - expression: "record.id"
          output: "state.last_event_id"
          aggregation: "maximum"

    overrides:
      mode: incremental

Using Replication

replication.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  events:
    object: analytics.events

env:
  SLING_STATE: postgres/sling_state.my_api

Date-Based Incremental with Iteration

For APIs that require a date parameter, use iteration with sync state.

Spec File (analytics_api.yaml)

analytics_api.yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://analytics.example.com/api
  request:
    headers:
      Authorization: "Bearer {secrets.access_token}"

endpoints:
  daily_stats:
    description: "Get daily statistics since last sync"

    # Persist last processed date
    sync: [last_date]

    iterate:
      # Generate dates from last sync to yesterday
      over: >
        range(
          coalesce(sync.last_date, date_format(date_add(now(), -7, "day"), "%Y-%m-%d")),
          date_format(date_add(now(), -1, "day"), "%Y-%m-%d"),
          "1d"
        )
      into: "state.current_date"
      concurrency: 5

    state:
      date: '{date_format(state.current_date, "%Y-%m-%d")}'

    request:
      url: "{state.base_url}/stats/daily/{state.date}"

    response:
      records:
        jmespath: "data[]"
        primary_key: ["metric_id", "date"]

      processors:
        # Track the latest date processed
        - expression: "state.date"
          output: "state.last_date"
          aggregation: "maximum"

    overrides:
      mode: incremental

Using Replication

replication.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  daily_stats:
    object: analytics.daily_statistics

env:
  SLING_STATE: postgres/sling_state.my_api

Multiple Sync Variables

You can persist multiple values for complex sync scenarios.

Spec File (multi_sync_api.yaml)

multi_sync_api.yaml
name: "Multi-Sync API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  products:
    description: "Track both timestamp and version for sync"

    # Persist multiple sync variables
    sync: [last_updated_at, last_version]

    state:
      updated_at_min: >
        {coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%d"))}
      version_min: "{coalesce(sync.last_version, 0)}"

    request:
      url: "{state.base_url}/products"
      parameters:
        updated_since: "{state.updated_at_min}"
        version_gt: "{state.version_min}"

    pagination:
      next_state:
        offset: "{state.offset + 100}"
      stop_condition: length(response.records) < 100

    response:
      records:
        jmespath: "products[]"
        primary_key: ["product_id"]

      processors:
        # Track both timestamp and version
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"

        - expression: "record.version"
          output: "state.last_version"
          aggregation: "maximum"

    overrides:
      mode: incremental

Using SLING_STATE

By default, sync state is stored in the target database. You can store it externally using SLING_STATE.

Using Replication with External State

replication.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders

  events:
    object: analytics.events

  products:
    object: public.products

env:
  # Store state in S3 instead of target database
  SLING_STATE: AWS_S3/sling/state

This creates state files at s3://your-bucket/sling/state/ for each stream.

Incremental with Full Refresh Fallback

You can mix incremental and full-refresh streams based on your needs.

replication.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  # Incremental sync for large, frequently updated table
  orders:
    object: public.orders
    # Uses incremental mode from spec overrides

  # Full refresh for small, infrequently updated table
  product_categories:
    object: public.categories
    mode: full-refresh  # Override spec's incremental mode

env:
  SLING_STATE: postgres/sling_state.my_api

Best Practices

1. Always Provide Sensible Defaults

Use coalesce() to handle the first run gracefully:

state:
  updated_at_min: >
    {coalesce(
      sync.last_updated_at,
      date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
    )}

2. Use Appropriate Aggregation

Match the aggregation to your sync variable type:

processors:
  # For timestamps and IDs
  - expression: "record.updated_at"
    output: "state.last_updated_at"
    aggregation: "maximum"

  # For first/last values
  - expression: "record.cursor"
    output: "state.next_cursor"
    aggregation: "last"

3. Include Primary Keys

Always specify primary keys for proper upserting:

response:
  records:
    primary_key: ["id"]  # Or composite: ["user_id", "event_id"]

4. Handle Edge Cases

Account for APIs that might return stale data:

state:
  # Add a small lookback window to catch late-arriving updates
  updated_at_min: >
    {coalesce(
      date_format(date_add(date_parse(sync.last_updated_at), -1, "hour"), "%Y-%m-%dT%H:%M:%S%z"),
      date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
    )}

5. Use Incremental Mode Override

Set the mode in the spec to ensure consistent behavior:

overrides:
  mode: incremental

Combining Incremental with Backfill

You can use both incremental sync and backfill capabilities together.

First: Backfill Historical Data

backfill.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders
    source_options:
      # Backfill 2023 data
      range: '2023-01-01,2023-12-31'

env:
  SLING_STATE: postgres/sling_state.my_api

Then: Switch to Incremental

incremental.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders
    # No range specified - uses sync state for incremental loading

env:
  SLING_STATE: postgres/sling_state.my_api

Last updated

Was this helpful?