Incremental

Examples of using Sling to incrementally load data from databases to files

In order to write to files incrementally, we need to provide an update_key (which will be used as the partition key) to partition the files into a time resolution. Therefore, specific runtime variables are required to be part of the target object path.

Here are the list of runtime variables for partitioning the file chunks: part_year, part_month, part_year_month, part_day, part_week, part_hour, part_minute.

Additionally, we need to provide a environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See Global Variables for more details.

Here is an example of incrementally writing to an S3 bucket.

source: postgres
target: aws_s3

# applies to all streams
defaults:
  object: tables/{stream_schema}/{stream_table}/{part_year}/{part_month}
  mode: incremental   # mode applies to all streams
  target_options:     # target_options applies to all streams
    format: parquet

streams:
  # all tables in schema main
  main.*:
    primary_key: id
    update_key: created_dt
    target_options:     # overwrites default target_options (write as csv)
      format: csv

  public.transactions:
    primary_key: tx_id
    update_key: created_dt

  public.orders:
    primary_key: order_id
    update_key: timestamp
    sql: |
      select *
      from public.orders
      where status not in ('voided')
        and {incremental_where_cond}

env:
  # uses the `path/to/folder` in the same AWS_S3 connection
  SLING_STATE: AWS_S3/path/to/folder

This will write data from tables public.transactions and public.orders (custom SQL) into the respective paths (such as tables/public/transactions/2024/11/data_0.parquet and tables/public/orders/2024/11/data_0.parquet) at the year and month level:

  • tables/public/transactions/created_dt_year=2024/created_dt_month=01/data_0.parquet

  • tables/public/transactions/created_dt_year=2024/created_dt_month=02/data_0.parquet

  • ...

  • tables/public/orders/timestamp_year=2024/timestamp_month=01/data_0.parquet

  • tables/public/orders/timestamp_year=2024/timestamp_month=01/data_0.parquet

  • ...

The first time the replication is ran, it will select all the data (unless limited), and write the last incremental value into the SLING_STATE location. This incremental value will be truncated to the lowest partition level. For example, if the lowest level is part_day, a value of 2024-11-04 04:05:06 will be truncated to 2024-11-04. If the lowest level was part_month, the truncated value would be 2024-11-01.

Once the replication is ran again, it will read this truncated incremental value from the SLING_STATE, and use it to obtain the complete partition so that no data is missed.

Last updated