Custom SQL

Full Refresh with Custom SQL

Using CLI Flags

# Using inline SQL
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where status = "active"' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# Using SQL from a file
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  my_schema.my_table.1:
    sql: |
      select 
        id,
        first_name,
        last_name,
        email,
        status
      from my_schema.my_table 
      where status = 'active'
    object: target_schema.active_users

  my_schema.my_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.custom_table

Incremental with Custom SQL

Using CLI Flags

# Using inline SQL with incremental variables
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where updated_at > {incremental_where_cond}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

# Using SQL file with incremental loading
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/incremental_query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where {incremental_where_cond}
      order by o.updated_at asc
    object: target_schema.enriched_orders

  my_schema.transactions:
    sql: |
      with ranked_transactions as (
        select 
          *,
          row_number() over (partition by transaction_id order by modified_at desc) as rn
        from my_schema.transactions
        where modified_at > coalesce({incremental_value}, '2001-01-01')
      )
      select * from ranked_transactions 
      where rn = 1
    object: target_schema.latest_transactions
    update_key: modified_at  # override default update_key

  my_schema.daily_metrics:
    sql: file:///path/to/daily_metrics.sql
    object: target_schema.daily_metrics
    primary_key: [date, metric_id]

The examples above demonstrate:

  • Using both inline SQL and SQL files

  • Joining multiple tables in custom SQL

  • Using incremental variables ({incremental_value} and {incremental_where_cond}). See here for details.

  • Handling duplicates with window functions

  • Overriding default primary keys and update keys

Last updated