# Incremental

## New Data Upsert

This mode performs incremental loading by only processing new/updated records based on an update key. It requires both a primary key and update key.

```yaml
source: postgres
target: snowflake

defaults:
  mode: incremental
  primary_key: id  
  update_key: updated_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.orders:
    # Will only load records where updated_at is greater than the max value in target
  
  public.customers:
    primary_key: [customer_id]  # Override default primary key
    update_key: last_modified  # Override default update key
```

## Full Data Upsert

This mode performs incremental loading by processing the full source dataset and upserting records based on the primary key. No update key is required.

```yaml
source: postgres 
target: snowflake

defaults:
  mode: incremental
  primary_key: id
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.products:
    # Will load all records and upsert based on id
  
  public.categories:
    primary_key: [category_id, region]  # Composite primary key
```

## Append Only

This mode performs incremental loading by only appending new records based on an update key, without updating existing records. No primary key is required.

```yaml
source: postgres
target: snowflake

defaults:
  mode: incremental
  update_key: created_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.events:
    # Will only append records where created_at is greater than max value in target
  
  public.logs:
    update_key: timestamp  # Override default update key
```

## Custom SQL

This mode allows using custom SQL queries with incremental loading by using special variables that Sling will replace at runtime. See [here](https://docs.slingdata.io/concepts/replication/modes#incremental-or-backfill-mode-with-custom-sql) from more details.

```yaml
source: postgres
target: snowflake

defaults:
  mode: incremental
  primary_key: id
  update_key: modified_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.orders:
    sql: |
      select *, coalesce(created_at, updated_at) as modified_at
      from public.orders 
      where {incremental_where_cond}
      order by modified_at asc
  
  public.customers:
    sql: |
      with ranked_customers as (
        select *
        from public.customers
        where modified_at > coalesce({incremental_value}, '2001-01-01')
      )
      select * from ranked_customers where rn = 1
```

## Incremental Chunking

In incremental mode, chunking helps process large datasets by breaking them into smaller batches based on the update key. This is useful for memory management, progress tracking, and reducing source database load. See the [chunking documentation](https://docs.slingdata.io/examples/database-to-database/chunking) for details.

```yaml
source: postgres
target: snowflake

defaults:
  mode: incremental
  primary_key: id
  update_key: updated_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.orders:
    source_options:
      chunk_size: 1m  # Process 1 month at a time
  
  public.events:
    source_options:
      chunk_size: 7d  # Process 1 week at a time
  
  public.logs:
    source_options:
      chunk_size: 1d  # Process 1 day at a time

env:
  SLING_THREADS: 4  # Enable parallel processing
```

## Using SLING\_STATE

If we wish to store the incremental state externally (and avoid using the max value of the target table), we can use the state feature. We need to provide an environment variable called `SLING_STATE`, which is a location where sling will store the respective incremental values. See [Global Variables](https://docs.slingdata.io/sling-cli/variables#global-environment-variables) for more details.

Here is an example, where sling will store the incremental values in the `my/state` path, in the `AWS_S3` connection:

```yaml
source: postgres
target: snowflake

defaults:
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.*:
    mode: incremental
    primary_key: id
    update_key: update_dt
  
  public.accounts:
    mode: full-refresh

env:
  SLING_STATE: AWS_S3/my/state
```

## Delete Missing Records

When loading data incrementally, you may want to handle records that exist in the target but are missing from the source. Sling supports both hard deletes (physically removing records) and soft deletes (marking with timestamps).

See the [Capture Deletes](https://docs.slingdata.io/examples/database-to-database/capture_deletes) page for detailed examples and configuration options.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/examples/database-to-database/incremental.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
