Incremental

Examples of using Sling to incrementally load data from databases to databases

New Data Upsert

This mode performs incremental loading by only processing new/updated records based on an update key. It requires both a primary key and update key.

source: postgres
target: snowflake

defaults:
  mode: incremental
  primary_key: id  
  update_key: updated_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.orders:
    # Will only load records where updated_at is greater than the max value in target
  
  public.customers:
    primary_key: [customer_id]  # Override default primary key
    update_key: last_modified  # Override default update key

Full Data Upsert

This mode performs incremental loading by processing the full source dataset and upserting records based on the primary key. No update key is required.

source: postgres 
target: snowflake

defaults:
  mode: incremental
  primary_key: id
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.products:
    # Will load all records and upsert based on id
  
  public.categories:
    primary_key: [category_id, region]  # Composite primary key

Append Only

This mode performs incremental loading by only appending new records based on an update key, without updating existing records. No primary key is required.

source: postgres
target: snowflake

defaults:
  mode: incremental
  update_key: created_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.events:
    # Will only append records where created_at is greater than max value in target
  
  public.logs:
    update_key: timestamp  # Override default update key

Custom SQL

This mode allows using custom SQL queries with incremental loading by using special variables that Sling will replace at runtime. See here from more details.

source: postgres
target: snowflake

defaults:
  mode: incremental
  primary_key: id
  update_key: modified_at
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.orders:
    sql: |
      select * from public.orders 
      where {incremental_where_cond}
      order by modified_at asc
  
  public.customers:
    sql: |
      with ranked_customers as (
        select *
        from public.customers
        where modified_at > coalesce({incremental_value}, '2001-01-01')
      )
      select * from ranked_customers where rn = 1

Using SLING_STATE

If we wish to store the incremental state externally (and avoid using the max value of the target table), we can use the state feature. We need to provide an environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See Global Variables for more details.

Here is an example, where sling will store the incremental values in the my/state path, in the AWS_S3 connection:

source: postgres
target: snowflake

defaults:
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.*:
    mode: incremental
    primary_key: id
    update_key: update_dt
  
  public.accounts:
    mode: full-refresh

env:
  SLING_STATE: AWS_S3/my/state

Delete Missing Records (Soft / Hard)

When loading data incrementally, you may want to handle records that exist in the target but are missing from the source. The delete_missing option supports two modes:

  • hard: Physically deletes records from the target table that no longer exist in the source

  • soft: Marks records as deleted in the target table by setting a deletion timestamp

Careful not to enable this feature on massive tables. The primary key column(s) is fully selected from the source stream each run in order to determine which records don't exist anymore.

Hard Delete Example

source: MY_POSTGRES
target: MY_SNOWFLAKE

defaults:
  mode: incremental
  update_key: updated_at
  primary_key: id  # primary key is required for delete_missing

streams:
  finance.accounts:
    object: finance.accounts_target
    target_options:
      delete_missing: hard  # will remove records that don't exist in source

Soft Delete Example

source: MY_POSTGRES 
target: MY_SNOWFLAKE

defaults:
  mode: incremental
  update_key: updated_at
  primary_key: id  # primary key is required for delete_missing
  target_options:
    delete_missing: soft  # will mark records as deleted with timestamp

streams:
  finance.accounts:
    object: finance.accounts_target

When using soft delete mode, Sling will add a _sling_deleted_at timestamp column to track when records were marked as deleted.

Important Notes:

  • The delete_missing option requires that you specify a primary_key to uniquely identify records

  • For incremental loads, an update_key is also required to determine which records to process

  • The comparison is done using a temporary table to efficiently identify missing records

Last updated