Links
Comment on page

Run Replications (Many Tasks)

Replications are the best way to provide configuration files for many streams.
Replications are the best way to provide configuration files for many streams.
See here for source / target options details.
Example configuration (located at /tmp/pg-to-snowflake.yaml):
# We first need to make sure our connections are available in our environment
# See https://docs.slingdata.io/sling-cli/environment for help
source: MY_POSTGRES
target: MY_SNOWFLAKE
# default config options which apply to all streams
defaults:
mode: full-refresh # valid choices: incremental, truncate, full-refresh, snapshot
# specify pattern to use for object naming in target connection, see below for options
object: '{target_schema}.{stream_schema}_{stream_table}'
# source_options: # optional for more advanced options for source connection
# target_options: # optional for more advanced options for target connection
streams:
finance.accounts:
finance.users:
disabled: true
finance.departments:
object: '{target_schema}.finance_departments_old' # overwrite default object
source_options:
empty_as_null: false
finance."Transactions":
mode: incremental # overwrite default mode
primary_key: id
update_key: last_updated_at
finance.all_users.custom:
sql: |
select col1, col2
from finance."all_Users"
object: finance.all_users # need to add 'object' key for custom SQL
env:
SLING_LOADED_AT_COLUMN: true # adds the _sling_loaded_at timestamp column
SLING_STREAM_URL_COLUMN: true # if source is file, adds a _sling_stream_url column with file path / url
Example to process all tables in a schema finance:
source: MY_POSTGRES
target: MY_SNOWFLAKE
# default config options which apply to all streams
defaults:
mode: full-refresh
object: '{target_schema}.{stream_schema}_{stream_table}'
streams:
finance.users:
disabled: true
finance.*:
source_options:
empty_as_null: false
Example to read from files:
source: MY_FILE_CONN
target: MY_SNOWFLAKE
# default config options which apply to all streams
defaults:
mode: full-refresh
object: '{target_schema}.{stream_schema}_{stream_table}'
streams:
# cloud
s3://path/to/file.csv:
s3://path/to/folder:
gs://path/to/file.csv:
gs://path/to/folder:
sftp://host.ip/path/to/file.csv:
sftp://host.ip/path/to/folder:
# local
file://path/to/file.csv:
file://path/to/folder:
env:
SAMPLE_SIZE: 2000 # increase the sample size to infer types. Default is 900.
Running the replication with Sling CLI:
$ sling run -r /tmp/pg-to-snowflake.yaml
11:04AM INF Sling Replication [3 streams] | MY_POSTGRES -> MY_SNOWFLAKE
11:04AM INF [1 / 3] running stream `finance.accounts`
11:04AM INF connecting to source database (postgres)
11:04AM INF connecting to target database (snowflake)
11:04AM INF reading from source database
11:04AM INF writing to target database [mode: full-refresh]
11:04AM INF streaming data
11:04AM INF dropped table public.finance_accounts
11:04AM INF created table public.finance_accounts
11:04AM INF inserted 16900 rows in 30 sec
11:04AM INF execution succeeded
11:05AM INF [2 / 3] running stream `finance.departments`
11:05AM INF connecting to source database (postgres)
11:05AM INF connecting to target database (snowflake)
11:05AM INF reading from source database
11:05AM INF writing to target database [mode: full-refresh]
11:05AM INF streaming data
11:05AM INF dropped table public.finance_departments_old
11:05AM INF created table public.finance_departments_old
11:05AM INF inserted 18 rows in 1 sec
11:05AM INF execution succeeded
11:05AM INF [3 / 3] running stream `finance."Transactions"`
11:05AM INF connecting to source database (postgres)
11:05AM INF connecting to target database (snowflake)
11:05AM INF getting checkpoint value
11:05AM INF reading from source database
11:05AM INF writing to target database [mode: incremental]
11:05AM INF streaming data
11:08AM INF inserted 1128000 rows in 99 sec
11:08AM INF execution succeeded

Object Pattern Options

A powerful logic which allows dynamic naming of the target object. The used parts in the object name will be replaced at runtime with the corresponding values. So you could name your target object {target_schema}.{stream_schema}_{stream_table}, and at runtime it will be formatted correctly as depicted below.
  • run_timestamp: The run timestamp of the task (2006_01_02_150405)
  • source_account: the name of the account of the source connection (when source conn is AZURE)
  • source_bucket: the name of the bucket of the source connection (when source conn is GCS or S3)
  • source_container: the name of the container of the source connection (when source conn is AZURE)
  • source_name: the name of the source connection
  • stream_file_folder: the file parent folder name of the stream (when source is a file system)
  • stream_file_name: the file name of the stream (when source is a file system)
  • stream_file_path: the file path of the stream (when source is a file system)
  • stream_name: the name of the stream
  • stream_schema: the schema name of the source stream (when source conn is a database)
  • stream_table: the table name of the source stream (when source conn is a database)
  • target_account: the name of the account of the target connection (when target conn is AZURE)
  • target_bucket: the name of the bucket of the target connection (when target conn is GCS or S3)
  • target_container: the name of the container of the target connection (when target conn is AZURE)
  • target_name: the name of the target connection
  • target_schema: the target conn default schema specified in credentials

Timestamp Patterns

  • YYYY: The 4 digit year of the run timestamp of the task
  • YY: The 2 digit year of the run timestamp of the task
  • MMM: The abbreviation of the month of the run timestamp of the task
  • MM: The 2 digit month of the run timestamp of the task
  • DD: The 2 digit day of the run timestamp of the task
  • HH: The 2 digit 24-hour of the run timestamp of the task
  • hh: The 2 digit 12-hour of the run timestamp of the task
  • mm: The 2 digit minute of the run timestamp of the task
  • ss: The 2 digit second of the run timestamp of the task
  • ISO8601: The ISO-8601 format of the run timestamp of the task (2006-01-02T15:04:05Z)
Last modified 11d ago