# Incremental

Incremental loading from APIs allows you to fetch only new or updated data since the last run, reducing API calls and improving performance. Sling uses the `sync` feature to persist state between runs.

Learn more: [Incremental Sync](https://docs.slingdata.io/concepts/api-specs/advanced#incremental-synchronization) " [State Variables](https://docs.slingdata.io/concepts/api-specs/structure#state-variables)

## How API Incremental Sync Works

API incremental sync uses the `sync` key to persist values between runs:

1. Define which state variables to persist using `sync: [variable_name]`
2. On first run, use a default value (e.g., 30 days ago)
3. Track the maximum timestamp/ID in each response using processors
4. On subsequent runs, use the persisted value from `sync.variable_name`

The state is stored in the target database by default, or in a location specified by `SLING_STATE`.

## Timestamp-Based Incremental Sync

This is the most common pattern - fetching records updated since the last run.

**Spec File** (`orders_api.yaml`)

{% code title="orders\_api.yaml" overflow="wrap" %}

```yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.shop.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  orders:
    description: "Get orders updated since last sync"

    # Persist last_updated_at for next run
    sync: [last_updated_at]

    state:
      # Use last sync time if available, otherwise default to 30 days ago
      updated_at_min: >
        {coalesce(
          sync.last_updated_at,
          date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
        )}
      limit: 100
      offset: 0

    request:
      url: "{state.base_url}/orders"
      parameters:
        # Filter API to only return records updated after this timestamp
        updated_at_min: "{state.updated_at_min}"
        limit: "{state.limit}"
        offset: "{state.offset}"

    pagination:
      next_state:
        offset: "{state.offset + state.limit}"
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
        primary_key: ["order_id"]

      processors:
        # Track the maximum updated_at timestamp across all records
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"  # Save highest timestamp for next sync

    overrides:
      mode: incremental  # Upsert based on primary_key
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

On first run, this will fetch orders from the last 30 days. On subsequent runs, it will only fetch orders updated since the last run.

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='public.orders'
        )
    },
    env={'SLING_STATE': 'postgres/sling_state.my_api'}
)

# First run: fetches last 30 days
replication.run()

# Subsequent runs: fetches only new/updated orders
replication.run()
```

{% endcode %}

## ID-Based Incremental Sync

For APIs with monotonically increasing IDs, you can sync based on the highest ID.

**Spec File** (`events_api.yaml`)

{% code title="events\_api.yaml" overflow="wrap" %}

```yaml
name: "Events API"

defaults:
  state:
    base_url: https://api.events.com/v2
  request:
    headers:
      X-API-Key: "{secrets.api_key}"

endpoints:
  events:
    description: "Get events with ID greater than last sync"

    # Persist last event ID
    sync: [last_event_id]

    state:
      # Start from last ID, or 0 if first run
      min_id: "{coalesce(sync.last_event_id, 0)}"
      limit: 1000

    request:
      url: "{state.base_url}/events"
      parameters:
        # Only fetch events with ID greater than last sync
        id_gt: "{state.min_id}"
        limit: "{state.limit}"
        sort: "id asc"  # Ensure ascending order

    pagination:
      next_state:
        # Use last record's ID for next page
        min_id: "{response.records[-1].id}"
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "events[]"
        primary_key: ["id"]

      processors:
        # Track the highest event ID
        - expression: "record.id"
          output: "state.last_event_id"
          aggregation: "maximum"

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  events:
    object: analytics.events

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

## Date-Based Incremental with Iteration

For APIs that require a date parameter, use iteration with sync state.

**Spec File** (`analytics_api.yaml`)

{% code title="analytics\_api.yaml" overflow="wrap" %}

```yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://analytics.example.com/api
  request:
    headers:
      Authorization: "Bearer {secrets.access_token}"

endpoints:
  daily_stats:
    description: "Get daily statistics since last sync"

    # Persist last processed date
    sync: [last_date]

    iterate:
      # Generate dates from last sync to yesterday
      over: >
        range(
          coalesce(sync.last_date, date_format(date_add(now(), -7, "day"), "%Y-%m-%d")),
          date_format(date_add(now(), -1, "day"), "%Y-%m-%d"),
          "1d"
        )
      into: "state.current_date"
      concurrency: 5

    state:
      date: '{date_format(state.current_date, "%Y-%m-%d")}'

    request:
      url: "{state.base_url}/stats/daily/{state.date}"

    response:
      records:
        jmespath: "data[]"
        primary_key: ["metric_id", "date"]

      processors:
        # Track the latest date processed
        - expression: "state.date"
          output: "state.last_date"
          aggregation: "maximum"

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  daily_stats:
    object: analytics.daily_statistics

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

## Multiple Sync Variables

You can persist multiple values for complex sync scenarios.

**Spec File** (`multi_sync_api.yaml`)

{% code title="multi\_sync\_api.yaml" overflow="wrap" %}

```yaml
name: "Multi-Sync API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  products:
    description: "Track both timestamp and version for sync"

    # Persist multiple sync variables
    sync: [last_updated_at, last_version]

    state:
      updated_at_min: >
        {coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%d"))}
      version_min: "{coalesce(sync.last_version, 0)}"

    request:
      url: "{state.base_url}/products"
      parameters:
        updated_since: "{state.updated_at_min}"
        version_gt: "{state.version_min}"

    pagination:
      next_state:
        offset: "{state.offset + 100}"
      stop_condition: length(response.records) < 100

    response:
      records:
        jmespath: "products[]"
        primary_key: ["product_id"]

      processors:
        # Track both timestamp and version
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"

        - expression: "record.version"
          output: "state.last_version"
          aggregation: "maximum"

    overrides:
      mode: incremental
```

{% endcode %}

## Using SLING\_STATE

By default, sync state is stored in the target database. You can store it externally using `SLING_STATE`.

**Using Replication with External State**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders

  events:
    object: analytics.events

  products:
    object: public.products

env:
  # Store state in S3 instead of target database
  SLING_STATE: AWS_S3/sling/state
```

{% endcode %}

This creates state files at `s3://your-bucket/sling/state/` for each stream.

## Incremental with Full Refresh Fallback

You can mix incremental and full-refresh streams based on your needs.

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  # Incremental sync for large, frequently updated table
  orders:
    object: public.orders
    # Uses incremental mode from spec overrides

  # Full refresh for small, infrequently updated table
  product_categories:
    object: public.categories
    mode: full-refresh  # Override spec's incremental mode

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

## Best Practices

### 1. Always Provide Sensible Defaults

Use `coalesce()` to handle the first run gracefully:

```yaml
state:
  updated_at_min: >
    {coalesce(
      sync.last_updated_at,
      date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
    )}
```

### 2. Use Appropriate Aggregation

Match the aggregation to your sync variable type:

```yaml
processors:
  # For timestamps and IDs
  - expression: "record.updated_at"
    output: "state.last_updated_at"
    aggregation: "maximum"

  # For first/last values
  - expression: "record.cursor"
    output: "state.next_cursor"
    aggregation: "last"
```

### 3. Include Primary Keys

Always specify primary keys for proper upserting:

```yaml
response:
  records:
    primary_key: ["id"]  # Or composite: ["user_id", "event_id"]
```

### 4. Handle Edge Cases

Account for APIs that might return stale data:

```yaml
state:
  # Add a small lookback window to catch late-arriving updates
  updated_at_min: >
    {coalesce(
      date_format(date_add(date_parse(sync.last_updated_at), -1, "hour"), "%Y-%m-%dT%H:%M:%S%z"),
      date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z")
    )}
```

### 5. Use Incremental Mode Override

Set the mode in the spec to ensure consistent behavior:

```yaml
overrides:
  mode: incremental
```

## Combining Incremental with Backfill

You can use both incremental sync and backfill capabilities together.

**First: Backfill Historical Data**

{% code title="backfill.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders
    source_options:
      # Backfill 2023 data
      range: '2023-01-01,2023-12-31'

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}

**Then: Switch to Incremental**

{% code title="incremental.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  orders:
    object: public.orders
    # No range specified - uses sync state for incremental loading

env:
  SLING_STATE: postgres/sling_state.my_api
```

{% endcode %}
