Custom SQL
Full Refresh with Custom SQL
Using CLI Flags
# Using inline SQL
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'select * from my_schema.my_table where status = "active"' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
# Using SQL from a file
$ sling run --src-conn MY_SOURCE_DB \
--src-stream file:///path/to/query.sql \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
my_schema.my_table.1:
sql: |
select
id,
first_name,
last_name,
email,
status
from my_schema.my_table
where status = 'active'
object: target_schema.active_users
my_schema.my_table.2:
sql: file:///path/to/query.sql
object: target_schema.custom_table
Incremental with Custom SQL
Using CLI Flags
# Using inline SQL with incremental variables
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'select * from my_schema.my_table where updated_at > {incremental_where_cond}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--primary-key 'id' \
--update-key 'updated_at'
# Using SQL file with incremental loading
$ sling run --src-conn MY_SOURCE_DB \
--src-stream file:///path/to/incremental_query.sql \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--primary-key 'id' \
--update-key 'updated_at'
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: incremental
primary_key: [id]
update_key: updated_at
streams:
my_schema.orders:
sql: |
select
o.id,
o.order_date,
o.customer_id,
o.status,
o.updated_at,
c.name as customer_name
from my_schema.orders o
join my_schema.customers c on o.customer_id = c.id
where {incremental_where_cond}
order by o.updated_at asc
object: target_schema.enriched_orders
my_schema.transactions:
sql: |
with ranked_transactions as (
select
*,
row_number() over (partition by transaction_id order by modified_at desc) as rn
from my_schema.transactions
where modified_at > coalesce({incremental_value}, '2001-01-01')
)
select * from ranked_transactions
where rn = 1
object: target_schema.latest_transactions
update_key: modified_at # override default update_key
my_schema.daily_metrics:
sql: file:///path/to/daily_metrics.sql
object: target_schema.daily_metrics
primary_key: [date, metric_id]
The examples above demonstrate:
Using both inline SQL and SQL files
Joining multiple tables in custom SQL
Using incremental variables (
{incremental_value}
and{incremental_where_cond}
). See here for details.Handling duplicates with window functions
Overriding default primary keys and update keys
Custom-SQL Chunking
Combine custom SQL queries with chunking using the {incremental_where_cond}
variable in your SQL. See the chunking documentation for details.
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: incremental
primary_key: [id]
update_key: updated_at
streams:
my_schema.large_orders:
sql: |
select
o.id,
o.order_date,
o.customer_id,
o.status,
o.updated_at,
c.name as customer_name
from my_schema.orders o
join my_schema.customers c on o.customer_id = c.id
where o.finalized and ({incremental_where_cond})
order by o.updated_at asc
object: target_schema.enriched_orders_chunked
source_options:
chunk_size: 1m # Process in monthly chunks
my_schema.historical_events:
sql: |
select *
from my_schema.events
where type = 'init' and ({incremental_where_cond})
object: target_schema.events_chunked
update_key: event_timestamp
source_options:
chunk_count: 4 # Split into 4 equal chunks
my_schema.user_activities:
sql: |
select *
from my_schema.user_activities
where {incrementalbeek_where_cond}
object: target_schema.activities_chunked
update_key: activity_id
source_options:
chunk_size: 10000 # Process 10,000 records per chunk
range: '1,100000' # Optional range for numeric chunking
public.complex_query:
sql: |
select
u.id,
u.username,
u.created_at,
p.purchase_count
from users u
join (
select user_id, count(*) as purchase_count
from purchases
group by user_id
) p on u.id = p.user_id
where u.plan = 'Pro'
and ( {incremental_where_cond} )
order by u.created_at asc
object: target_schema.user_purchase_summary
update_key: created_at
source_options:
chunk_expr: mod(abs(hashtext(u.username)), {chunk_count}) # Process by modulo of hash
chunk_count: 8 # Creates 8 chunks based on username hash
env:
SLING_THREADS: 4 # Enable parallel processing of chunks
Last updated