# Incremental

Incremental loading from APIs allows you to fetch only new or updated data since the last run, reducing API calls and improving performance. Sling uses the `sync` feature to persist state between runs.

Learn more: [Incremental Sync](/concepts/api-specs/advanced.md#incremental-synchronization) " [State Variables](/concepts/api-specs/structure.md#state-variables)

## How API Incremental Sync Works

API incremental sync uses the `sync` key to persist values between runs:

1. Define which state variables to persist using `sync: [variable_name]`
2. On first run, use a default value (e.g., 30 days ago)
3. Track the maximum timestamp/ID in each response using processors
4. On subsequent runs, use the persisted value from `sync.variable_name`

The state is stored in the target database by default, or in a location specified by `SLING_STATE`.

## Timestamp-Based Incremental Sync

This is the most common pattern - fetching records updated since the last run.

**Spec File** (`orders_api.yaml`)

{% code title="orders\_api.yaml" overflow="wrap" %}

```yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.shop.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  orders:
    description: "Get orders updated since last sync"

    # Persist last_updated_at for next run
    sync: [last_updated_at]

    state:
      # Use last sync time if available, otherwise default to 30 days ago
      updated_at_min: >
        {coalesce(
          sync.last_updated_at,
          date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
        )}
      limit: 100
      offset: 0

    request:
      url: "{state.base_url}/orders"
      parameters:
        # Filter API to only return records updated after this timestamp
        updated_at_min: "{state.updated_at_min}"
        limit: "{state.limit}"
        offset: "{state.offset}"

    pagination:
      next_state:
        offset: "{state.offset + state.limit}"
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
        primary_key: ["order_id"]

      processors:
        # Track the maximum updated_at timestamp across all records
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"  # Save highest timestamp for next sync

    overrides:
      mode: incremental  # Upsert based on primary_key
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

On first run, this will fetch orders from the last 30 days. On subsequent runs, it will only fetch orders updated since the last run.

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='public.orders'
        )
    },
    env={'SLING_STATE': 'postgres/sling_state.my_api'}
)

# First run: fetches last 30 days
replication.run()

# Subsequent runs: fetches only new/updated orders
replication.run()
```

{% endcode %}

## ID-Based Incremental Sync

For APIs with monotonically increasing IDs, you can sync based on the highest ID.

**Spec File** (`events_api.yaml`)

{% code title="events\_api.yaml" overflow="wrap" %}

```yaml
name: "Events API"

defaults:
  state:
    base_url: https://api.events.com/v2
  request:
    headers:
      X-API-Key: "{secrets.api_key}"

endpoints:
  events:
    description: "Get events with ID greater than last sync"

    # Persist last event ID
    sync: [last_event_id]

    state:
      # Start from last ID, or 0 if first run
      min_id: "{coalesce(sync.last_event_id, 0)}"
      limit: 1000

    request:
      url: "{state.base_url}/events"
      parameters:
        # Only fetch events with ID greater than last sync
        id_gt: "{state.min_id}"
        limit: "{state.limit}"
        sort: "id asc"  # Ensure ascending order

    pagination:
      next_state:
        # Use last record's ID for next page
        min_id: "{response.records[-1].id}"
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "events[]"
        primary_key: ["id"]

      processors:
        # Track the highest event ID
        - expression: "record.id"
          output: "state.last_event_id"
          aggregation: "maximum"

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  events:
    object: analytics.events

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

## Date-Based Incremental with Iteration

For APIs that require a date parameter, use iteration with sync state.

**Spec File** (`analytics_api.yaml`)

{% code title="analytics\_api.yaml" overflow="wrap" %}

```yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://analytics.example.com/api
  request:
    headers:
      Authorization: "Bearer {secrets.access_token}"

endpoints:
  daily_stats:
    description: "Get daily statistics since last sync"

    # Persist last processed date
    sync: [last_date]

    iterate:
      # Generate dates from last sync to yesterday
      over: >
        range(
          coalesce(sync.last_date, date_format(date_add(now(), -7, "day"), "%Y-%m-%d")),
          date_format(date_add(now(), -1, "day"), "%Y-%m-%d"),
          "1d"
        )
      into: "state.current_date"
      concurrency: 5

    state:
      date: '{date_format(state.current_date, "%Y-%m-%d")}'

    request:
      url: "{state.base_url}/stats/daily/{state.date}"

    response:
      records:
        jmespath: "data[]"
        primary_key: ["metric_id", "date"]

      processors:
        # Track the latest date processed
        - expression: "state.date"
          output: "state.last_date"
          aggregation: "maximum"

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  daily_stats:
    object: analytics.daily_statistics

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

## Multiple Sync Variables

You can persist multiple values for complex sync scenarios.

**Spec File** (`multi_sync_api.yaml`)

{% code title="multi\_sync\_api.yaml" overflow="wrap" %}

```yaml
name: "Multi-Sync API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  products:
    description: "Track both timestamp and version for sync"

    # Persist multiple sync variables
    sync: [last_updated_at, last_version]

    state:
      updated_at_min: >
        {coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%d"))}
      version_min: "{coalesce(sync.last_version, 0)}"

    request:
      url: "{state.base_url}/products"
      parameters:
        updated_since: "{state.updated_at_min}"
        version_gt: "{state.version_min}"

    pagination:
      next_state:
        offset: "{state.offset + 100}"
      stop_condition: length(response.records) < 100

    response:
      records:
        jmespath: "products[]"
        primary_key: ["product_id"]

      processors:
        # Track both timestamp and version
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"

        - expression: "record.version"
          output: "state.last_version"
          aggregation: "maximum"

    overrides:
      mode: incremental
```

{% endcode %}

## Using SLING\_STATE

By default, sync state is stored in the target database. You can store it externally using `SLING_STATE`.

**Using Replication with External State**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders

  events:
    object: analytics.events

  products:
    object: public.products

env:
  # Store state in S3 instead of target database
  SLING_STATE: AWS_S3/sling/state
```

{% endcode %}

This creates state files at `s3://your-bucket/sling/state/` for each stream.

## Incremental with Full Refresh Fallback

You can mix incremental and full-refresh streams based on your needs.

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  # Incremental sync for large, frequently updated table
  orders:
    object: public.orders
    # Uses incremental mode from spec overrides

  # Full refresh for small, infrequently updated table
  product_categories:
    object: public.categories
    mode: full-refresh  # Override spec's incremental mode

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

## Best Practices

### 1. Always Provide Sensible Defaults

Use `coalesce()` to handle the first run gracefully:

```yaml
state:
  updated_at_min: >
    {coalesce(
      sync.last_updated_at,
      date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
    )}
```

### 2. Use Appropriate Aggregation

Match the aggregation to your sync variable type:

```yaml
processors:
  # For timestamps and IDs
  - expression: "record.updated_at"
    output: "state.last_updated_at"
    aggregation: "maximum"

  # For first/last values
  - expression: "record.cursor"
    output: "state.next_cursor"
    aggregation: "last"
```

### 3. Include Primary Keys

Always specify primary keys for proper upserting:

```yaml
response:
  records:
    primary_key: ["id"]  # Or composite: ["user_id", "event_id"]
```

### 4. Handle Edge Cases

Account for APIs that might return stale data:

```yaml
state:
  # Add a small lookback window to catch late-arriving updates
  updated_at_min: >
    {coalesce(
      date_format(date_add(date_parse(sync.last_updated_at), -1, "hour"), "%Y-%m-%dT%H:%M:%S%z"),
      date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
    )}
```

### 5. Use Incremental Mode Override

Set the mode in the spec to ensure consistent behavior:

```yaml
overrides:
  mode: incremental
```

## Combining Incremental with Backfill

You can use both incremental sync and backfill capabilities together.

**First: Backfill Historical Data**

{% code title="backfill.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders
    source_options:
      # Backfill 2023 data
      range: '2023-01-01,2023-12-31'

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

**Then: Switch to Incremental**

{% code title="incremental.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders
    # No range specified - uses sync state for incremental loading

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/examples/api-to-database/incremental.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
