Replication

Multiple streams in a YAML or JSON file. Best way to scale Sling.

Overview

Replications are the best way to use sling in a reusable manner. The defaults key allows reusing your inputs with the ability to override any of them in a particular stream. Both YAML or JSON files are accepted. When you run a replication, internally, Sling auto-generates many tasks (one per stream) and runs them in order.

Here is a basic example, where all PostgreSQL tables in the schema my_schema will be loaded into Snowflake. The my_schema.* notation as the stream name is a feature possible only in Replications. Also notice how defaults.object uses runtime variables.

replication.yaml
source: MY_POSTGRES
target: MY_SNOWFLAKE

# default config options which apply to all streams
defaults:
  mode: full-refresh
  object: new_schema.{stream_schema}_{stream_table}

streams:
  my_schema.*:

Another example:

replication.yaml
source: MY_MYSQL
target: MY_BIGQUERY

defaults:
  mode: incremental
  object: '{target_schema}.{stream_schema}_{stream_table}'
  primary_key: [id]
  
  source_options:
    empty_as_null: false
    
  target_options:
    column_casing: snake

streams:
  finance.accounts:
  finance.users:
    disabled: true
  
  finance.departments:
    object: '{target_schema}.finance_departments_old' # overwrite default object
    source_options:
      empty_as_null: false

  finance."Transactions":
    mode: incremental # overwrite default mode
    primary_key: [other_id]
    update_key: last_updated_at
  
  finance.all_users.custom:
    sql: |
      select col1, col2
      from finance."all_Users"
    object: finance.all_users # need to add 'object' key for custom SQL

env:
  # adds the _sling_loaded_at timestamp column
  SLING_LOADED_AT_COLUMN: true 
  
  # if source is file, adds a _sling_stream_url column with file path / url
  SLING_STREAM_URL_COLUMN: true

We can use a replication config with: sling run -r /path/to/replication.yaml

Interface Specifications

Replication Config KeyDescription

source

The source database connection (name, conn string or URL).

target

The target database connection (name, conn string or URL).

streams.<key>

The source table (schema.table), local / cloud file path. Use file:// for local paths. See here for details on object patterns.

streams.<key>.object

or defaults.object

The target table (schema.table) or local / cloud file path. Use file:// for local paths.

streams.<key>.mode

or defaults.mode

The target load mode to use: incremental, truncate, full-refresh, backfill or snapshot. Default is full-refresh.

streams.<key>.select or defaults.select

Select or exclude specific columns from the source stream. Use - prefix to exclude.

streams.<key>.single or defaults.single

When using a wildcard (*) in the stream name, consider as a single stream (don't expand into many streams).

streams.<key>.sql or defaults.sql

The custom SQL query to use. Accepts file://path/to.query.sql as well.

streams.<key>.primary_key

or defaults.primary_key

The column(s) to use as primary key. If composite key, use array.

streams.<key>.update_key

or defaults.update_key

The column to use as update key (for incremental mode).

streams.<key>.source_options

or defaults.source_options

Options to further configure source. See here for details.

streams.<key>.target_options

or defaults.target_options

Options to further configure target. See here for details.

env

Environment variables to use for replication. See here for details.

Here are the relevant Go structs for replications:

type ReplicationConfig struct {
    Source   string                              `json:"source" yaml:"source"`
    Target   string                              `json:"target" yaml:"target"`
    Defaults ReplicationStreamConfig             `json:"defaults" yaml:"defaults"`
    Streams  map[string]*ReplicationStreamConfig `json:"streams" yaml:"streams"`
    Env      map[string]any                      `json:"env" yaml:"env"`
}

type ReplicationStreamConfig struct {
    Mode          Mode           `json:"mode" yaml:"mode"`
    Object        string         `json:"object" yaml:"object"`
    Select        []string       `json:"select" yaml:"select"`
    PrimaryKey    []string       `json:"primary_key" yaml:"primary_key"`
    UpdateKey     string         `json:"update_key" yaml:"update_key"`
    Single        *bool          `json:"single,omitempty" yaml:"single,omitempty"`
    SQL           string         `json:"sql" yaml:"sql"`
    SourceOptions *SourceOptions `json:"source_options" yaml:"source_options"`
    TargetOptions *TargetOptions `json:"target_options" yaml:"target_options"`
    Disabled      bool           `json:"disabled" yaml:"disabled"`
}

Last updated