# Using inline SQL
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'select * from my_schema.my_table where status = "active"' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
# Using SQL from a file
$ sling run --src-conn MY_SOURCE_DB \
--src-stream file:///path/to/query.sql \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
my_schema.my_table.1:
sql: |
select
id,
first_name,
last_name,
email,
status
from my_schema.my_table
where status = 'active'
object: target_schema.active_users
my_schema.my_table.2:
sql: file:///path/to/query.sql
object: target_schema.custom_table
Incremental with Custom SQL
Using CLI Flags
# Using inline SQL with incremental variables
$ sling run --src-conn MY_SOURCE_DB \
--src-stream 'select * from my_schema.my_table where updated_at > {incremental_where_cond}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--primary-key 'id' \
--update-key 'updated_at'
# Using SQL file with incremental loading
$ sling run --src-conn MY_SOURCE_DB \
--src-stream file:///path/to/incremental_query.sql \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode incremental \
--primary-key 'id' \
--update-key 'updated_at'
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB
defaults:
mode: incremental
primary_key: [id]
update_key: updated_at
streams:
my_schema.orders:
sql: |
select
o.id,
o.order_date,
o.customer_id,
o.status,
o.updated_at,
c.name as customer_name
from my_schema.orders o
join my_schema.customers c on o.customer_id = c.id
where {incremental_where_cond}
order by o.updated_at asc
object: target_schema.enriched_orders
my_schema.transactions:
sql: |
with ranked_transactions as (
select
*,
row_number() over (partition by transaction_id order by modified_at desc) as rn
from my_schema.transactions
where modified_at > coalesce({incremental_value}, '2001-01-01')
)
select * from ranked_transactions
where rn = 1
object: target_schema.latest_transactions
update_key: modified_at # override default update_key
my_schema.daily_metrics:
sql: file:///path/to/daily_metrics.sql
object: target_schema.daily_metrics
primary_key: [date, metric_id]
The examples above demonstrate:
Using both inline SQL and SQL files
Joining multiple tables in custom SQL
Using incremental variables ({incremental_value} and {incremental_where_cond}). See here for details.