Database to Database

Examples of using Sling to load data from one database to another

We first need to make sure our connections are available in our environment. See Environmentarrow-up-right and Database Connections for more details.

export MY_SOURCE_DB='...'
export MY_TARGET_DB='...'

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_SOURCE_DB  | DB - PostgreSQL  | env variable    |
| MY_TARGET_DB  | DB - Snowflake   | env variable    |
+---------------+------------------+-----------------+
chevron-rightDatabase ⇨ Database (Full Refresh)hashtag

Using CLI Flags

sling.sh
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  object: '{target_schema}.{stream_schema}_{stream_table}'
  mode: full-refresh

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern

  source_schema.another_table:

  # chunking into 6 equal sized streams
  source_schema.large_table:
    primary_key: id
    update_key: updated_at
    source_options:
      chunk_count: 6

  # all tables in schema, except "forbidden_table"
  my_schema.*:
  my_schema.forbidden_table:
    disabled: true

env:
  SLING_THREADS: 3 # run streams concurrently

Using Python

database_to_database.py
from sling import Replication, ReplicationStream, Mode

# Single stream example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        object='{target_schema}.{stream_schema}_{stream_table}',
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table'  # override default object pattern
        ),
        'source_schema.another_table': {},
        # all tables in schema, except "forbidden_table"
        'my_schema.*': {},
        'my_schema.forbidden_table': ReplicationStream(
            disabled=True
        )
    },
    env={'SLING_THREADS': '3'}  # run streams concurrently
)

replication.run()
chevron-rightDatabase ⇨ Database (Custom SQL)hashtag

See also Custom SQL Examples.

Using CLI Flags

$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'select * from my_schema.my_table where col1 is not null' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

# we can also read from a SQL file (/path/to/query.sql)
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream file:///path/to/query.sql \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  source_schema.source_table.1:
    sql: |
      select *
      from my_schema.my_table
      where col1 is not null
    object: target_schema.target_table

  source_schema.source_table.2:
    sql: file:///path/to/query.sql
    object: target_schema.target_table

Using Python

replication.py
from sling import Replication, ReplicationStream, Mode

# Single stream example with inline SQL
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()

# Multiple streams example with SQL file reference
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.FULL_REFRESH
    ),
    streams={
        'source_schema.source_table.1': ReplicationStream(
            sql="""
                select *
                from my_schema.my_table
                where col1 is not null
            """,
            object='target_schema.target_table'
        ),
        'source_schema.source_table.2': ReplicationStream(
            sql='file:///path/to/query.sql',
            object='target_schema.target_table'
        )
    }
)

replication.run()
chevron-rightDatabase ⇨ Database (Incremental / Backfill)hashtag

See also Incremental and Backfill examples.

Using CLI Flags

# limit to 10M records at a time. Will be sorted by update_key ASC.
# just loop command until all data is transferred / caught up.
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode incremental \
  --limit 1000000 -d

# Backfill specific date range
$ sling run --src-conn MY_SOURCE_DB \
  --src-stream 'source_schema.source_table' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --primary-key 'id' \
  --update-key 'last_modified_dt' \
  --mode backfill \
  --range '2021-01-01,2021-02-01'

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  update_key: last_modified_dt
  source_options:
    limit: 10000000 # limit to 10M records at a time

streams:
  source_schema.source_table:
    object: target_schema.target_table # override default object pattern
    update_key: updated_at # override default update key

  # backfill
  source_schema.backfill_table:
    mode: backfill
    object: target_schema.backfill_table
    primary_key: [some_id]
    update_key: updated_at # override default update key
    source_options:
      range: 2021-01-01,2021-02-01 # specific date range
      chunk_size: 7d               # 7-day stream chunking/splitting

  source_schema.another_table:
    target_options:
      delete_missing: soft  # track deletes from source table

  # chunking with count-based approach
  source_schema.large_incremental_table:
    object: target_schema.large_incremental_table
    primary_key: [id]
    update_key: updated_at
    source_options:
      chunk_count: 8  # split into 8 equal chunks for parallel processing

  # chunking with time-range approach
  source_schema.events_table:
    object: target_schema.events_table
    primary_key: [event_id]
    update_key: event_timestamp
    source_options:
      chunk_size: 1d  # process 1 day at a time

env:
  SLING_THREADS: 4 # run streams concurrently

Using Python

replication.py
from sling import Replication, ReplicationStream, SourceOptions, TargetOptions, Mode

# Incremental load with limit
replication = Replication(
    source='MY_SOURCE_DB',
    target='MY_TARGET_DB',
    defaults=ReplicationStream(
        mode=Mode.INCREMENTAL,
        object='{target_schema}.{stream_schema}_{stream_table}',
        primary_key=['id'],
        update_key='last_modified_dt',
        source_options=SourceOptions(
            limit=10000000  # limit to 10M records at a time
        )
    ),
    streams={
        'source_schema.source_table': ReplicationStream(
            object='target_schema.target_table',  # override default object pattern
            update_key='updated_at'  # override default update key
        ),
        # backfill example
        'source_schema.backfill_table': ReplicationStream(
            mode=Mode.BACKFILL,
            object='target_schema.backfill_table',
            primary_key=['some_id'],
            update_key='updated_at',
            source_options=SourceOptions(
                range='2021-01-01,2021-02-01',  # specific date range
                chunk_size='7d'  # 7-day stream chunking/splitting
            )
        ),
        'source_schema.another_table': ReplicationStream(
            target_options=TargetOptions(
                delete_missing='soft'  # track deletes from source table
            )
        )
    }
)

replication.run()
chevron-rightDatabase ⇨ Database (Incremental - New Data Upsert)hashtag

See also Incremental Examples.

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightDatabase ⇨ Database (Incremental - Full Data Upsert)hashtag

See also Incremental Examples.

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightDatabase ⇨ Database (Incremental - Append Only)hashtag

See also Incremental Examples.

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightDatabase ⇨ Database (Truncate)hashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightDatabase ⇨ Database (Snapshot)hashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Last updated

Was this helpful?