Incremental
Examples of using Sling to load data from storage systems to databases
Using the File timestamp
The current approach for incrementally loading files into a database is using the file timestamp.
Here is an example replication, incrementally loading files from an S3 bucket into a Postgres database:
source: aws_s3
target: postgres
defaults:
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
mode: incremental
update_key: _sling_loaded_at # <-- uses the _sling_loaded_at column in the target table
columns:
'*': string # cast all columns as string
source_options:
format: csv
streams:
# no need to specify scheme://bucket
"my_csv_folder/*.csv": # individual streams for each file
"my_csv_folder/": # single stream for whole folder
object: 'target_schema.my_csv_data' # overwrite default object
"my_csv_folder/prefix_*.csv":
object: 'target_schema.my_csv_data' # overwrite default object
env:
SLING_LOADED_AT_COLUMN: timestampUsing SLING_STATE
We can also provide a environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See Global Variables for more details. This allows you to modify the value directly if you want to change the incremental marker value manually.
Let us assume we have parquet files in the following format, at a daily level:
|- dumps/orders/2024/09/21/*.parquet
|- dumps/orders/2024/09/22/*.parquet
|- dumps/orders/2024/09/23/*.parquet
...We can specify the stream string as dumps/orders/{YYYY}/{MM}/{DD}/*.parquet
source: aws_s3
target: postgres
defaults:
mode: incremental
streams:
# no need to specify scheme://bucket
"dumps/orders/{YYYY}/{MM}/{DD}/":
id: orders # id to run only this stream
object: 'target_schema.orders'
source_options:
format: parquet
env:
# uses the `path/to/folder` in the same AWS_S3 connection
SLING_STATE: AWS_S3/path/to/folderBackfilling
Best Practice for first load is to backfill the period range of interest. Sling will ingest all data found according to the stream path provided. Upon completion, the incremental date value will be set to the last date in the backfill range. Therefore it is necessary to run backfills in ascending order (e.g 2021-01-01,2021-12-31, 2022-01-01,2022-12-31, etc).
# backfill from 2021-01-01 to 2022-01-01
# incremental state value will be set to 2022-01-01
sling run -d -r replication.yaml --mode backfill \
--range 2021-01-01,2022-01-01 \
--streams "orders" # only run "orders" streamIncremental
Run normally. Sling will process the next incremental data value
sling run -r replication.yaml -dLast updated
Was this helpful?