# Database to Database

We first need to make sure our connections are available in our environment. See [Environment](https://github.com/slingdata-io/sling-docs/blob/master/environment.md) and [Database Connections](https://docs.slingdata.io/connections/database-connections) for more details.

{% tabs %}
{% tab title="Linux / Mac" %}

```bash
export MY_SOURCE_DB='...'
export MY_TARGET_DB='...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
```

{% endtab %}

{% tab title="Windows" %}

```powershell
# using windows Powershell
$env:MY_SOURCE_DB = '...'
$env:MY_TARGET_DB = '...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
```

{% endtab %}
{% endtabs %}

<details>

<summary>Database ⇨ Database (Full Refresh)</summary>

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code title="sling.sh" overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: full-refresh

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # chunking into 6 equal sized streams
  source_schema.large_table:
    primary_key: id
    update_key: updated_at
    source_options:
      chunk_count: 6

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true

env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="database\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Custom SQL)</summary>

See also [Custom SQL Examples](https://docs.slingdata.io/examples/database-to-database/custom-sql).

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where col1 is not null' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# we can also read from a SQL file (/path/to/query.sql)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  source_schema.source_table.1:
    sql: |
      select *
      from my_schema.my_table
      where col1 is not null
    object: target_schema.target_table

  source_schema.source_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.target_table
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

# Single stream example with inline SQL
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example with SQL file reference
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table'
        ),
        'source_schema.source_table.2': ReplicationStream(
            sql='file:///path/to/query.sql',
            object='target_schema.target_table'
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental / Backfill)</summary>

See also [Incremental](https://docs.slingdata.io/examples/database-to-database/incremental) and [Backfill](https://docs.slingdata.io/examples/database-to-database/backfill) examples.

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
# limit to 10M records at a time. Will be sorted by update_key ASC.
# just loop command until all data is transferred / caught up.
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode incremental \
  --limit 1000000 -d

# Backfill specific date range
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode backfill \
  --range '2021-01-01,2021-02-01'
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt
  source_options:
    limit: 10000000 # limit to 10M records at a time

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  # backfill
  source_schema.backfill_table:
    mode: backfill
    object: target_schema.backfill_table
    primary_key: [some_id]
    update_key: updated_at # override default update key
    source_options:
      range: 2021-01-01,2021-02-01 # specific date range
      chunk_size: 7d               # 7-day stream chunking/splitting

  source_schema.another_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # use delete_insert strategy for MySQL (no native MERGE)
  source_schema.mysql_table:
    object: target_schema.mysql_table
    target_options:
      merge_strategy: delete_insert

  # chunking with count-based approach
  source_schema.large_incremental_table:
    object: target_schema.large_incremental_table
    primary_key: [id]
    update_key: updated_at
    source_options:
      chunk_count: 8  # split into 8 equal chunks for parallel processing

  # chunking with time-range approach
  source_schema.events_table:
    object: target_schema.events_table
    primary_key: [event_id]
    update_key: event_timestamp
    source_options:
      chunk_size: 1d  # process 1 day at a time

env:
  SLING_THREADS: 4 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode, MergeStrategy

# Incremental load with limit
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt',
        source_options=SourceOptions(
            limit=10000000  # limit to 10M records at a time
        )
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        # backfill example
        'source_schema.backfill_table': ReplicationStream(
            mode=Mode.BACKFILL,
            object='target_schema.backfill_table',
            primary_key=['some_id'],
            update_key='updated_at',
            source_options=SourceOptions(
                range='2021-01-01,2021-02-01',  # specific date range
                chunk_size='7d'  # 7-day stream chunking/splitting
            )
        ),
        'source_schema.another_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # use delete_insert strategy for MySQL (no native MERGE)
        'source_schema.mysql_table': ReplicationStream(
            object='target_schema.mysql_table',
            target_options=TargetOptions(
                merge_strategy=MergeStrategy.DELETE_INSERT
            )
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental - New Data Upsert)</summary>

See also [Incremental Examples](https://docs.slingdata.io/examples/database-to-database/incremental).

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id' \
  --update-key 'last_modified_dt'
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  source_schema.another_table:
  
  source_schema.some_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            primary_key=['id'],
            update_key='last_modified_dt'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        'source_schema.another_table': {},
        'source_schema.some_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental - Full Data Upsert)</summary>

See also [Incremental Examples](https://docs.slingdata.io/examples/database-to-database/incremental).

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --primary-key 'id'
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: incremental
  primary_key: [id]

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    primary_key: [ col1, col2 ] # override default primary_key

  source_schema.another_table:
  
  source_schema.some_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # append-only audit table (insert strategy - never update existing records)
  source_schema.audit_log:
    object: target_schema.audit_log
    target_options:
      merge_strategy: insert

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true

env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode, MergeStrategy

# Single stream example (full data upsert - no update_key)
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            primary_key=['id']
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.INCREMENTAL,
        primary_key=['id']
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            primary_key=['col1', 'col2']  # override default primary_key
        ),
        'source_schema.another_table': {},
        'source_schema.some_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        ),
        # append-only audit table (insert strategy - never update existing records)
        'source_schema.audit_log': ReplicationStream(
            object='target_schema.audit_log',
            target_options=TargetOptions(
                merge_strategy=MergeStrategy.INSERT
            )
        ),
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Incremental - Append Only)</summary>

See also [Incremental Examples](https://docs.slingdata.io/examples/database-to-database/incremental).

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode incremental \
  --update-key 'created_dt'
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  update_key: created_dt

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: created_at  # override default update_key

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

# Single stream example (append only)
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.INCREMENTAL,
            update_key='created_dt'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        update_key='created_dt'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='created_at'  # override default update_key
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Truncate)</summary>

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode truncate
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: truncate
  object: '{target_schema}.{stream_schema}_{stream_table}'

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode='truncate'
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.TRUNCATE,
        object='{target_schema}.{stream_schema}_{stream_table}'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Database ⇨ Database (Snapshot)</summary>

**Using** [**CLI Flags**](https://docs.slingdata.io/sling-cli/run#cli-flags-overview)

{% code overflow="wrap" %}

```bash
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode snapshot
```

{% endcode %}

***

**Using** [**Replication**](https://docs.slingdata.io/concepts/replication)

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" fullWidth="false" %}

```yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: snapshot
  object: '{target_schema}.{stream_schema}_{stream_table}'

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true
    
env:
  SLING_THREADS: 3 # run streams concurrently
```

{% endcode %}

***

**Using** [**Python**](https://docs.slingdata.io/examples/sling-python)

{% code title="replication.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.SNAPSHOT
        )
    }
)

replication.run()

# Multiple streams example with defaults
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.SNAPSHOT,
        object='{target_schema}.{stream_schema}_{stream_table}'
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
```

{% endcode %}

</details>
