# Database to Database

We first need to make sure our connections are available in our environment. See [Environment](https://github.com/slingdata-io/sling-docs/blob/master/environment.md) and [Database Connections](/connections/database-connections.md) for more details.

{% tabs %}
{% tab title="Linux / Mac" %}

```bash
export MY_SOURCE_DB='...'
export MY_TARGET_DB='...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
```

{% endtab %}

{% tab title="Windows" %}

```powershell
# using windows Powershell
$env:MY_SOURCE_DB = '...'
$env:MY_TARGET_DB = '...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
```

{% endtab %}
{% endtabs %}

<details>

<summary>Database ⇨ Database (Full Refresh)</summary>

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code title="sling.sh" overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: full-refresh

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # chunking into 6 equal sized streams
  source_schema.large_table:
    primary_key: id
    update_key: updated_at
    source_options:
      chunk_count: 6

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true

env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="database\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Custom SQL)</summary>

See also [Custom SQL Examples](/examples/database-to-database/custom-sql.md).

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where col1 is not null' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# we can also read from a SQL file (/path/to/query.sql)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  source_schema.source_table.1:
    sql: |
      select *
      from my_schema.my_table
      where col1 is not null
    object: target_schema.target_table

  source_schema.source_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.target_table
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

# Single stream example with inline SQL
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example with SQL file reference
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table'
        ),
        'source_schema.source_table.2': ReplicationStream(
            sql='file:///path/to/query.sql',
            object='target_schema.target_table'
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental / Backfill)</summary>

See also [Incremental](/examples/database-to-database/incremental.md) and [Backfill](/examples/database-to-database/backfill.md) examples.

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
# limit to 10M records at a time. Will be sorted by update_key ASC.
# just loop command until all data is transferred / caught up.
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode incremental \
  --limit 1000000 -d

# Backfill specific date range
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode backfill \
  --range '2021-01-01,2021-02-01'
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt
  source_options:
    limit: 10000000 # limit to 10M records at a time

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  # backfill
  source_schema.backfill_table:
    mode: backfill
    object: target_schema.backfill_table
    primary_key: [some_id]
    update_key: updated_at # override default update key
    source_options:
      range: 2021-01-01,2021-02-01 # specific date range
      chunk_size: 7d               # 7-day stream chunking/splitting

  source_schema.another_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # use delete_insert strategy for MySQL (no native MERGE)
  source_schema.mysql_table:
    object: target_schema.mysql_table
    target_options:
      merge_strategy: delete_insert

  # chunking with count-based approach
  source_schema.large_incremental_table:
    object: target_schema.large_incremental_table
    primary_key: [id]
    update_key: updated_at
    source_options:
      chunk_count: 8  # split into 8 equal chunks for parallel processing

  # chunking with time-range approach
  source_schema.events_table:
    object: target_schema.events_table
    primary_key: [event_id]
    update_key: event_timestamp
    source_options:
      chunk_size: 1d  # process 1 day at a time

env:
  SLING_THREADS: 4 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode, MergeStrategy

# Incremental load with limit
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt',
        source_options=SourceOptions(
            limit=10000000  # limit to 10M records at a time
        )
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        # backfill example
        'source_schema.backfill_table': ReplicationStream(
            mode=Mode.BACKFILL,
            object='target_schema.backfill_table',
            primary_key=['some_id'],
            update_key='updated_at',
            source_options=SourceOptions(
                range='2021-01-01,2021-02-01',  # specific date range
                chunk_size='7d'  # 7-day stream chunking/splitting
            )
        ),
        'source_schema.another_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # use delete_insert strategy for MySQL (no native MERGE)
        'source_schema.mysql_table': ReplicationStream(
            object='target_schema.mysql_table',
            target_options=TargetOptions(
                merge_strategy=MergeStrategy.DELETE_INSERT
            )
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental - New Data Upsert)</summary>

See also [Incremental Examples](/examples/database-to-database/incremental.md).

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'last_modified_dt'
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  source_schema.another_table:
  
  source_schema.some_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            primary_key=['id'],
            update_key='last_modified_dt'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        'source_schema.another_table': {},
        'source_schema.some_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental - Full Data Upsert)</summary>

See also [Incremental Examples](/examples/database-to-database/incremental.md).

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id'
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: incremental
  primary_key: [id]

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    primary_key: [ col1, col2 ] # override default primary_key

  source_schema.another_table:
  
  source_schema.some_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # append-only audit table (insert strategy - never update existing records)
  source_schema.audit_log:
    object: target_schema.audit_log
    target_options:
      merge_strategy: insert

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true

env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode, MergeStrategy

# Single stream example (full data upsert - no update_key)
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            primary_key=['id']
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.INCREMENTAL,
        primary_key=['id']
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            primary_key=['col1', 'col2']  # override default primary_key
        ),
        'source_schema.another_table': {},
        'source_schema.some_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # append-only audit table (insert strategy - never update existing records)
        'source_schema.audit_log': ReplicationStream(
            object='target_schema.audit_log',
            target_options=TargetOptions(
                merge_strategy=MergeStrategy.INSERT
            )
        ),
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental - Append Only)</summary>

See also [Incremental Examples](/examples/database-to-database/incremental.md).

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --update-key 'created_dt'
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  update_key: created_dt

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: created_at  # override default update_key

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

# Single stream example (append only)
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            update_key='created_dt'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        update_key='created_dt'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='created_at'  # override default update_key
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Truncate)</summary>

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode truncate
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: truncate
  object: '{target_schema}.{stream_schema}_{stream_table}'

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode='truncate'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.TRUNCATE,
        object='{target_schema}.{stream_schema}_{stream_table}'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Snapshot)</summary>

**Using** [**CLI Flags**](/sling-cli/run.md#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode snapshot
```

{% endcode %}

***

**Using** [**Replication**](/concepts/replication.md)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: snapshot
  object: '{target_schema}.{stream_schema}_{stream_table}'

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](/examples/sling-python.md)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.SNAPSHOT
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.SNAPSHOT,
        object='{target_schema}.{stream_schema}_{stream_table}'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/examples/database-to-database.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
