Incremental
Examples of using Sling to incrementally load data from databases to databases
New Data Upsert
This mode performs incremental loading by only processing new/updated records based on an update key. It requires both a primary key and update key.
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
update_key: updated_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.orders:
# Will only load records where updated_at is greater than the max value in target
public.customers:
primary_key: [customer_id] # Override default primary key
update_key: last_modified # Override default update keyFull Data Upsert
This mode performs incremental loading by processing the full source dataset and upserting records based on the primary key. No update key is required.
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
object: new_schema.{stream_schema}_{stream_table}
streams:
public.products:
# Will load all records and upsert based on id
public.categories:
primary_key: [category_id, region] # Composite primary keyAppend Only
This mode performs incremental loading by only appending new records based on an update key, without updating existing records. No primary key is required.
source: postgres
target: snowflake
defaults:
mode: incremental
update_key: created_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.events:
# Will only append records where created_at is greater than max value in target
public.logs:
update_key: timestamp # Override default update keyCustom SQL
This mode allows using custom SQL queries with incremental loading by using special variables that Sling will replace at runtime. See here from more details.
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
update_key: modified_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.orders:
sql: |
select * from public.orders
where {incremental_where_cond}
order by modified_at asc
public.customers:
sql: |
with ranked_customers as (
select *
from public.customers
where modified_at > coalesce({incremental_value}, '2001-01-01')
)
select * from ranked_customers where rn = 1Incremental Chunking
In incremental mode, chunking helps process large datasets by breaking them into smaller batches based on the update key. This is useful for memory management, progress tracking, and reducing source database load. See the chunking documentation for details.
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
update_key: updated_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.orders:
source_options:
chunk_size: 1m # Process 1 month at a time
public.events:
source_options:
chunk_size: 7d # Process 1 week at a time
public.logs:
source_options:
chunk_size: 1d # Process 1 day at a time
env:
SLING_THREADS: 4 # Enable parallel processingUsing SLING_STATE
If we wish to store the incremental state externally (and avoid using the max value of the target table), we can use the state feature. We need to provide an environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See Global Variables for more details.
Here is an example, where sling will store the incremental values in the my/state path, in the AWS_S3 connection:
source: postgres
target: snowflake
defaults:
object: new_schema.{stream_schema}_{stream_table}
streams:
public.*:
mode: incremental
primary_key: id
update_key: update_dt
public.accounts:
mode: full-refresh
env:
SLING_STATE: AWS_S3/my/stateDelete Missing Records (Soft / Hard)
When loading data incrementally, you may want to handle records that exist in the target but are missing from the source. The delete_missing option supports two modes:
hard: Physically deletes records from the target table that no longer exist in the sourcesoft: Marks records as deleted in the target table by setting a deletion timestamp
Careful not to enable this feature on massive tables. The primary key column(s) is fully selected from the source stream each run in order to determine which records don't exist anymore.
Hard Delete Example
source: MY_POSTGRES
target: MY_SNOWFLAKE
defaults:
mode: incremental
update_key: updated_at
primary_key: id # primary key is required for delete_missing
streams:
finance.accounts:
object: finance.accounts_target
target_options:
delete_missing: hard # will remove records that don't exist in sourceSoft Delete Example
source: MY_POSTGRES
target: MY_SNOWFLAKE
defaults:
mode: incremental
update_key: updated_at
primary_key: id # primary key is required for delete_missing
target_options:
delete_missing: soft # will mark records as deleted with timestamp
streams:
finance.accounts:
object: finance.accounts_targetWhen using soft delete mode, Sling will add a _sling_deleted_at timestamp column to track when records were marked as deleted.
Important Notes:
The
delete_missingoption requires that you specify aprimary_keyto uniquely identify recordsFor incremental loads, an
update_keyis also required to determine which records to processThe comparison is done using a temporary table to efficiently identify missing records
Last updated
Was this helpful?