# Custom SQL

## Full Refresh with Custom SQL

**Using CLI Flags**

{% code overflow="wrap" %}

```bash
# Using inline SQL
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where status = "active"' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# Using SQL from a file
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh
```

{% endcode %}

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  my_schema.my_table.1:
    sql: |
      select 
        id,
        first_name,
        last_name,
        email,
        status
      from my_schema.my_table 
      where status = 'active'
    object: target_schema.active_users

  my_schema.my_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.custom_table
```

{% endcode %}

## Incremental with Custom SQL

**Using CLI Flags**

{% code overflow="wrap" %}

```bash
# Using inline SQL with incremental variables
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where updated_at > {incremental_where_cond}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'

# Using SQL file with incremental loading
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/incremental_query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'updated_at'
```

{% endcode %}

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where {incremental_where_cond}
      order by o.updated_at asc
    object: target_schema.enriched_orders

  my_schema.transactions:
    sql: |
      with ranked_transactions as (
        select 
          *,
          row_number() over (partition by transaction_id order by modified_at desc) as rn
        from my_schema.transactions
        where modified_at > coalesce({incremental_value}, '2001-01-01')
      )
      select * from ranked_transactions 
      where rn = 1
    object: target_schema.latest_transactions
    update_key: modified_at  # override default update_key

  my_schema.daily_metrics:
    sql: file:///path/to/daily_metrics.sql
    object: target_schema.daily_metrics
    primary_key: [date, metric_id]
```

{% endcode %}

The examples above demonstrate:

* Using both inline SQL and SQL files
* Joining multiple tables in custom SQL
* Using incremental variables (`{incremental_value}` and `{incremental_where_cond}`). See [here](/concepts/replication/modes.md#incremental-mode-strategies) for details.
* Handling duplicates with window functions
* Overriding default primary keys and update keys

## Custom-SQL Chunking

Combine custom SQL queries with chunking using the `{incremental_where_cond}` variable in your SQL. See the [chunking documentation](/examples/database-to-database/chunking.md#chunking-with-custom-sql) for details.

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.large_orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where o.finalized and ({incremental_where_cond})
      order by o.updated_at asc
    object: target_schema.enriched_orders_chunked
    source_options:
      chunk_size: 1m  # Process in monthly chunks

  my_schema.historical_events:
    sql: |
      select *
      from my_schema.events
      where type = 'init' and ({incremental_where_cond})
    object: target_schema.events_chunked
    update_key: event_timestamp
    source_options:
      chunk_count: 4  # Split into 4 equal chunks

  my_schema.user_activities:
    sql: |
      select *
      from my_schema.user_activities
      where {incremental_where_cond}
    object: target_schema.activities_chunked
    update_key: activity_id
    source_options:
      chunk_size: 10000  # Process 10,000 records per chunk
      range: '1,100000'  # Optional range for numeric chunking

  public.complex_query:
    sql: |
      select 
        u.id,
        u.username,
        u.created_at,
        p.purchase_count
      from users u
      join (
        select user_id, count(*) as purchase_count
        from purchases
        group by user_id
      ) p on u.id = p.user_id
      where u.plan = 'Pro'
        and ( {incremental_where_cond} )
      order by u.created_at asc
    object: target_schema.user_purchase_summary
    update_key: created_at
    source_options:
      chunk_expr: mod(abs(hashtext(u.username)), {chunk_count})  # Process by modulo of hash
      chunk_count: 8  # Creates 8 chunks based on username hash

env:
  SLING_THREADS: 4  # Enable parallel processing of chunks
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/examples/database-to-database/custom-sql.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
