Incremental
Examples of using Sling to incrementally load data from databases to files
In order to write to files incrementally, we need to provide an update_key (which will be used as the partition key) to partition the files into a time resolution. Therefore, specific runtime variables are required to be part of the target object path.
Here are the list of runtime variables for partitioning the file chunks: part_year, part_month, part_year_month, part_day, part_week, part_hour, part_minute.
Additionally, we need to provide a environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See Global Variables for more details.
Here is an example of incrementally writing to an S3 bucket.
source: postgres
target: aws_s3
# applies to all streams
defaults:
object: tables/{stream_schema}/{stream_table}/{part_year}/{part_month}
mode: incremental # mode applies to all streams
target_options: # target_options applies to all streams
format: parquet
streams:
# all tables in schema main
main.*:
primary_key: id
update_key: created_dt
target_options: # overwrites default target_options (write as csv)
format: csv
public.transactions:
primary_key: tx_id
update_key: created_dt
public.orders:
primary_key: order_id
update_key: timestamp
sql: |
select *
from public.orders
where status not in ('voided')
and {incremental_where_cond}
env:
# uses the `path/to/folder` in the same AWS_S3 connection
SLING_STATE: AWS_S3/path/to/folderThis will write data from tables public.transactions and public.orders (custom SQL) into the respective paths (such as tables/public/transactions/2024/11/data_0.parquet and tables/public/orders/2024/11/data_0.parquet) at the year and month level:
tables/public/transactions/created_dt_year=2024/created_dt_month=01/data_0.parquettables/public/transactions/created_dt_year=2024/created_dt_month=02/data_0.parquet...
tables/public/orders/timestamp_year=2024/timestamp_month=01/data_0.parquettables/public/orders/timestamp_year=2024/timestamp_month=01/data_0.parquet...
The first time the replication is ran, it will select all the data (unless limited), and write the last incremental value into the SLING_STATE location. This incremental value will be truncated to the lowest partition level. For example, if the lowest level is part_day, a value of 2024-11-04 04:05:06 will be truncated to 2024-11-04. If the lowest level was part_month, the truncated value would be 2024-11-01.
Once the replication is ran again, it will read this truncated incremental value from the SLING_STATE, and use it to obtain the complete partition so that no data is missed.
Last updated
Was this helpful?