Custom SQL

Full Refresh with Custom SQL

Using CLI Flags

# Using inline SQL
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where status = "active"' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# Using SQL from a file
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  my_schema.my_table.1:
    sql: |
      select 
        id,
        first_name,
        last_name,
        email,
        status
      from my_schema.my_table 
      where status = 'active'
    object: target_schema.active_users

  my_schema.my_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.custom_table

Incremental with Custom SQL

Using CLI Flags

# Using inline SQL with incremental variables
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where updated_at > {incremental_where_cond}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

# Using SQL file with incremental loading
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/incremental_query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where {incremental_where_cond}
      order by o.updated_at asc
    object: target_schema.enriched_orders

  my_schema.transactions:
    sql: |
      with ranked_transactions as (
        select 
          *,
          row_number() over (partition by transaction_id order by modified_at desc) as rn
        from my_schema.transactions
        where modified_at > coalesce({incremental_value}, '2001-01-01')
      )
      select * from ranked_transactions 
      where rn = 1
    object: target_schema.latest_transactions
    update_key: modified_at  # override default update_key

  my_schema.daily_metrics:
    sql: file:///path/to/daily_metrics.sql
    object: target_schema.daily_metrics
    primary_key: [date, metric_id]

The examples above demonstrate:

  • Using both inline SQL and SQL files

  • Joining multiple tables in custom SQL

  • Using incremental variables ({incremental_value} and {incremental_where_cond}). See here for details.

  • Handling duplicates with window functions

  • Overriding default primary keys and update keys

Custom-SQL Chunking

Combine custom SQL queries with chunking using the {incremental_where_cond} variable in your SQL. See the chunking documentation for details.

source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.large_orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where o.finalized and ({incremental_where_cond})
      order by o.updated_at asc
    object: target_schema.enriched_orders_chunked
    source_options:
      chunk_size: 1m  # Process in monthly chunks

  my_schema.historical_events:
    sql: |
      select *
      from my_schema.events
      where type = 'init' and ({incremental_where_cond})
    object: target_schema.events_chunked
    update_key: event_timestamp
    source_options:
      chunk_count: 4  # Split into 4 equal chunks

  my_schema.user_activities:
    sql: |
      select *
      from my_schema.user_activities
      where {incrementalbeek_where_cond}
    object: target_schema.activities_chunked
    update_key: activity_id
    source_options:
      chunk_size: 10000  # Process 10,000 records per chunk
      range: '1,100000'  # Optional range for numeric chunking

  public.complex_query:
    sql: |
      select 
        u.id,
        u.username,
        u.created_at,
        p.purchase_count
      from users u
      join (
        select user_id, count(*) as purchase_count
        from purchases
        group by user_id
      ) p on u.id = p.user_id
      where u.plan = 'Pro'
        and ( {incremental_where_cond} )
      order by u.created_at asc
    object: target_schema.user_purchase_summary
    update_key: created_at
    source_options:
      chunk_expr: mod(abs(hashtext(u.username)), {chunk_count})  # Process by modulo of hash
      chunk_count: 8  # Creates 8 chunks based on username hash

env:
  SLING_THREADS: 4  # Enable parallel processing of chunks

Last updated