Sling
Slingdata.ioBlogGithubHelp!
  • Introduction
  • Sling CLI
    • Installation
    • Environment
    • Running Sling
    • Global Variables
    • CLI Pro
  • Sling Platform
    • Sling Platform
      • Architecture
      • Agents
      • Connections
      • Editor
      • API
      • Deploy from CLI
  • Concepts
    • Replications
      • Structure
      • Modes
      • Source Options
      • Target Options
      • Columns
      • Transforms
      • Runtime Variables
      • Tags & Wildcards
    • Hooks / Steps
      • Check
      • Command
      • Copy
      • Delete
      • Group
      • Http
      • Inspect
      • List
      • Log
      • Query
      • Replication
      • Store
    • Pipelines
    • Data Quality
      • Constraints
  • Examples
    • File to Database
      • Custom SQL
      • Incremental
    • Database to Database
      • Custom SQL
      • Incremental
      • Backfill
    • Database to File
      • Incremental
  • Connections
    • Database Connections
      • BigTable
      • BigQuery
      • Cloudflare D1
      • Clickhouse
      • DuckDB
      • MotherDuck
      • MariaDB
      • MongoDB
      • Elasticsearch
      • MySQL
      • Oracle
      • Postgres
      • Prometheus
      • Proton
      • Redshift
      • StarRocks
      • SQLite
      • SQL Server
      • Snowflake
      • Trino
    • Storage Connections
      • AWS S3
      • Azure Storage
      • Backblaze B2
      • Cloudflare R2
      • DigitalOcean Spaces
      • FTP
      • Google Storage
      • Local Storage
      • Min.IO
      • SFTP
      • Wasabi
Powered by GitBook
On this page
  1. Examples
  2. Database to File

Incremental

Examples of using Sling to incrementally load data from databases to files

PreviousDatabase to FileNextDatabase Connections

Last updated 3 months ago

In order to write to files incrementally, we need to provide an update_key (which will be used as the partition key) to partition the files into a time resolution. Therefore, specific runtime variables are required to be part of the target object path.

Here are the list of for partitioning the file chunks: part_year, part_month, part_year_month, part_day, part_week, part_hour, part_minute.

Additionally, we need to provide a environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See for more details.

Here is an example of incrementally writing to an S3 bucket.

source: postgres
target: aws_s3

# applies to all streams
defaults:
  object: tables/{stream_schema}/{stream_table}/{part_year}/{part_month}
  mode: incremental   # mode applies to all streams
  target_options:     # target_options applies to all streams
    format: parquet

streams:
  # all tables in schema main
  main.*:
    primary_key: id
    update_key: created_dt
    target_options:     # overwrites default target_options (write as csv)
      format: csv

  public.transactions:
    primary_key: tx_id
    update_key: created_dt

  public.orders:
    primary_key: order_id
    update_key: timestamp
    sql: |
      select *
      from public.orders
      where status not in ('voided')
        and {incremental_where_cond}

env:
  # uses the `path/to/folder` in the same AWS_S3 connection
  SLING_STATE: AWS_S3/path/to/folder

This will write data from tables public.transactions and public.orders (custom SQL) into the respective paths (such as tables/public/transactions/2024/11/data_0.parquet and tables/public/orders/2024/11/data_0.parquet) at the year and month level:

  • tables/public/transactions/created_dt_year=2024/created_dt_month=01/data_0.parquet

  • tables/public/transactions/created_dt_year=2024/created_dt_month=02/data_0.parquet

  • ...

  • tables/public/orders/timestamp_year=2024/timestamp_month=01/data_0.parquet

  • tables/public/orders/timestamp_year=2024/timestamp_month=01/data_0.parquet

  • ...

The first time the replication is ran, it will select all the data (unless limited), and write the last incremental value into the SLING_STATE location. This incremental value will be truncated to the lowest partition level. For example, if the lowest level is part_day, a value of 2024-11-04 04:05:06 will be truncated to 2024-11-04. If the lowest level was part_month, the truncated value would be 2024-11-01.

Once the replication is ran again, it will read this truncated incremental value from the SLING_STATE, and use it to obtain the complete partition so that no data is missed.

runtime variables
Global Variables