Incremental

Examples of using Sling to load data from storage systems to databases

Using the File timestamp

The current approach for incrementally loading files into a database is using the file timestamp.

Here is an example replication, incrementally loading files from an S3 bucket into a Postgres database:

source: aws_s3
target: postgres

defaults:
  object: 'target_schema.{stream_file_folder}_{stream_file_name}'
  mode: incremental
  update_key: _sling_loaded_at # <-- uses the _sling_loaded_at column in the target table
  columns:
    '*': string # cast all columns as string
  source_options:
    format: csv

streams:
  # no need to specify scheme://bucket
  "my_csv_folder/*.csv":          # individual streams for each file
  "my_csv_folder/":                     # single stream for whole folder
    object: 'target_schema.my_csv_data' # overwrite default object
  
  "my_csv_folder/prefix_*.csv":   
    object: 'target_schema.my_csv_data' # overwrite default object

env:
  SLING_LOADED_AT_COLUMN: timestamp

Using SLING_STATE

We can also provide a environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See Global Variables for more details. This allows you to modify the value directly if you want to change the incremental marker value manually.

Let us assume we have parquet files in the following format, at a daily level:

|- dumps/orders/2024/09/21/*.parquet
|- dumps/orders/2024/09/22/*.parquet
|- dumps/orders/2024/09/23/*.parquet
...

We can specify the stream string as dumps/orders/{YYYY}/{MM}/{DD}/*.parquet

source: aws_s3
target: postgres

defaults:
  mode: incremental

streams:
  # no need to specify scheme://bucket
  "dumps/orders/{YYYY}/{MM}/{DD}/":
    id: orders   # id to run only this stream
    object: 'target_schema.orders'
    source_options:
      format: parquet

env:
  # uses the `path/to/folder` in the same AWS_S3 connection
  SLING_STATE: AWS_S3/path/to/folder

Backfilling

Best Practice for first load is to backfill the period range of interest. Sling will ingest all data found according to the stream path provided. Upon completion, the incremental date value will be set to the last date in the backfill range. Therefore it is necessary to run backfills in ascending order (e.g 2021-01-01,2021-12-31, 2022-01-01,2022-12-31, etc).

# backfill from 2021-01-01 to 2022-01-01
# incremental state value will be set to 2022-01-01
sling run -d -r replication.yaml --mode backfill \
  --range 2021-01-01,2022-01-01 \
  --streams "orders" # only run "orders" stream

Incremental

Run normally. Sling will process the next incremental data value

sling run -r replication.yaml -d

Last updated