Custom SQL

Sling allows you to use custom DuckDB SQL statements to read from files, giving you more control over the data ingestion process. This is particularly useful when you need to perform transformations or filtering during the read operation.

CLI Flags Examples

Full Refresh Mode

In the example below, when we specify the source connection aws_s3, sling will auto-inject the necessary secrets for proper auth.

# Read CSV files with custom SQL
sling run \
  --src-conn aws_s3 \
  --src-stream "select * from read_csv('s3://my-bucket/data/*.csv') where amount > 1000" \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.large_transactions' \
  --mode full-refresh

# Read Parquet files with custom SQL and aggregation
sling run \
  --src-conn aws_s3 \
  --src-stream "select date_trunc('month', date) as month, sum(amount) as total 
                from read_parquet('gs://my-bucket/data/*.parquet')
                group by 1" \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.monthly_totals' \
  --mode full-refresh

Incremental Mode

# Incremental load using timestamp column
sling run \
  --src-stream "select * from read_csv('s3://my-bucket/data/*.csv') 
                where {incremental_where_cond}" \
  --src-options '{"sql": true}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.transactions' \
  --mode incremental \
  --primary-key id \
  --update-key created_at

Replication Configuration

You can also use DuckDB SQL in your replication configuration:

source: AWS_S3
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  # Using SQL to read and transform CSV data
  daily_sales_summary:
    object: analytics.daily_sales_summary
    sql: |
      select date, 
              sum(case when type='sale' then amount else 0 end) as sales,
              sum(case when type='refund' then amount else 0 end) as refunds
      from read_csv('s3://my-bucket/transactions/*.csv')
      group by date

  # Incremental load with custom SQL
  events:
    object: analytics.events
    sql: |
      select * from read_parquet('s3://my-bucket/events/*.parquet')
      where event_timestamp > coalesce({incremental_value}, '2001-01-01')
    mode: incremental
    update_key: event_timestamp

  # Join multiple files
  enriched_transactions:
    object: analytics.enriched_transactions
    sql: |
      select t.*, c.category 
      from read_csv('s3://my-bucket/transactions.csv') t
      left join read_parquet('s3://my-bucket/categories.parquet') c
        on t.category_id = c.id

Features

  • SQL Functions: Access to DuckDB's rich SQL function library

  • File Format Support: Works with CSV, Parquet, JSON, and other formats supported by DuckDB

  • Aggregations: Perform aggregations and transformations during read

  • Joins: Join data from multiple files

  • Filtering: Apply filters to reduce data transfer

  • Type Casting: Use SQL CAST functions for type conversions

Notes

  1. Sling with auto-download the duckdb binary into the Sling home directory. You can specify the desired duckDB version with env var DUCKDB_VERSION.

  2. Use DuckDB's read_* functions to specify input files

  3. For incremental loads, use the placeholder variables such as {incremental_where_cond} and {incremental_value}. See here for more details.

  4. File paths support wildcards (*) for matching multiple files. See Reading Multiple Files.

  5. Cloud storage paths (s3://, gs://, etc.) are supported with proper credentials. Make sure to specify the respective source connection, sling will auto-inject the needed secrets before running the query. If you are facing issues with auth not working, please reach out to us at support@slingdata.io, on discord or open a Github Issue here.

Last updated