Configuration

Configuration Concepts for your Sling runs

Configuration Inputs

Sling CLI offers 2 ways to input your configuration:

  • CLI Flags: Quick ad-hoc runs from your terminal shell or script.

    sling run --src-stream file://my_file.csv --tgt-conn postgres --tgt-object my_schema.my_table --mode full-refresh

  • Replication: Multiple streams in a YAML or JSON file. Best way to scale Sling.

    sling run -r /path/to/replication.yaml

Modes

Here the various loading modes available. All modes load into a new temporary table prior to final load.

ModeDescription

full-refresh

This is the default mode. The target table will be dropped and recreated with the source data.

incremental

The source data will be merged or appended into the target table. If the table does not exist, it will be created. See below for more details.

truncate

Similar to full-refresh, except that the target table is truncated instead of dropped. This keeps any special DDL / GRANT applied.

snapshot

Appends the full dataset with an added timestamp column. If the target table exists, Sling will insert into / append data with a _sling_loaded_at column. If it does not, the table will be created.

backfill

Similar to incremental, but takes a range input to backfill a specific update_key range, such as dates or numbers.

Incremental Mode Strategies

Load StrategyPrimary KeyUpdate KeyStream Strategy

New Data Upsert (update/insert)

yes

yes

Only new records after max(update_key)

Full Data Upsert (update/insert)

yes

no

Full data

Append Only (insert, no update)

no

yes

Only new records after max(update_key)

Incremental or Backfill Mode With Custom SQL

When using incremental or backfill mode, with a custom SQL stream, as well as with a provided update_key, it is necessary to include the placeholder {incremental_where_cond} or {incremental_value} in the SQL text. This will allow Sling to inject the necessary values to apply the watermark in order to get only newer records.

For example, let us assume the custom SQL stream is select * from my_schema.my_table where {incremental_where_cond}. When Sling executes, if the table does not exist (first-run), the {incremental_where_cond} value will be 1=1, and the rendered query will be select * from my_schema.my_table where 1=1. If the table exists, Sling will pull the max value of the update_key column (being 2001-01-01 01:01:01, rendering {incremental_where_cond} as my_update_key > '2001-01-01 01:01:01'), and inject the value: select * from my_schema.my_table where my_update_key > '2001-01-01 01:01:01'.

Furthermore, let us assume the custom SQL stream is select * from my_schema.my_table where my_int_key > coalesce({incremental_value}, 0) (for a timestamp column, we'd put something like coalesce({incremental_value}, '2001-01-01')). When Sling executes, if the table does not exist (first-run), the {incremental_value} value will be null, and the rendered query will be select * from my_schema.my_table where my_int_key > coalesce(null, 0). If the table exists, Sling will pull the max value of the update_key column (being 99), and inject the value: select * from my_schema.my_table where my_int_key > coalesce(99, 0).

Options

Source

Here is the Go struct for source options:

type SourceOptions struct {
	Columns        map[string]string   `json:"columns" yaml:"columns"`
	Compression    *iop.CompressorType `json:"compression" yaml:"compression"`
	DatetimeFormat string              `json:"datetime_format" yaml:"datetime_format"`
	Delimiter      string              `json:"delimiter" yaml:"delimiter"`
	EmptyAsNull    *bool               `json:"empty_as_null" yaml:"empty_as_null"`
	Flatten        *bool               `json:"flatten" yaml:"flatten"`
	Format         *filesys.FileType   `json:"format" yaml:"format"`
	Header         *bool               `json:"header" yaml:"header"`
	JmesPath       *string             `json:"jmespath" yaml:"jmespath"`
	Limit          *int                `json:"limit" yaml:"limit"`
	MaxDecimals    *int                `json:"max_decimals" yaml:"max_decimals"`
	NullIf         *string             `json:"null_if" yaml:"null_if"`
	Sheet          *string             `json:"sheet" yaml:"sheet"`
	Range          *string             `json:"range" yaml:"range"`
	SkipBlankLines *bool               `json:"skip_blank_lines" yaml:"skip_blank_lines"`
	Transforms     []string            `json:"transforms" yaml:"transforms"`
	TrimSpace      *bool               `json:"trim_space" yaml:"trim_space"`
}
KeyDescription

columns

When source column type cannot be inferred (e.g. source is File), you can use this object/map to specify the type that a column should be cast as. It is not necessary to include all columns. Sling with attempt to auto-detect all unspecified column types. The map key is the column name, and the value is the type (string, json, integer, bigint, decimal, datetime or bool). decimal(<precision>, <scale>) is also accepted for precise decimals. To cast all columns to one type, you can use "*" as the key.

compression

(Only for file source)

The type of compression to use when reading files. Valid inputs are none, auto and gzip, zstd, snappy. Default is auto.

datetime_format

The ISO 8601 date format to use when reading date values. Default is auto

delimiter

(Only for file source)

The delimiter to use when parsing tabular files. Default is auto.

escape

(Only for file source - since v1.2.4)

The escape character to use when parsing tabular files. Default is "

empty_as_null

Whether empty fields should be treated as NULL. Default is true.

flatten

(Only for file source)

Whether to flatten a semi-structure file source format (JSON, XML)

format

(Only for file source)

The format of the file(s). Options are: csv, parquet, xlsx, avro, json, jsonlines, sas7bdat and xml.

header

(Only for file source) Whether to consider the first line as header. Default is true.

jmespath

(Only for file source)

Specify a JMESPath expression to use to filter / extract nested JSON data. See https://jmespath.org/ for more

limit

The maximum number of rows to pull from the source

null_if

Whether this case-sensitive value should be treated as NULL when encountered. Default is NULL.

sheet

(Only for Excel source files) The name of the sheet to use as a data source, for example Sheet1. Default is the first sheet. You can also specify the range (Sheet2!B:H, Sheet3!B1:H70).

range

The range to use for backfill mode, separated by a single comma. Example: 2021-01-01,2021-02-01 or 1,10000

skip_blank_lines

Whether blank lines should be skipped when encountered. Default is false.

transforms

An object/map, or array/list of built-in transforms to apply to records. When an array is provided, it will apply the transforms to all columns. Available transforms: replace_accents, parse_uuid, trim_space, decode_latin1 and replace_non_printable, etc. See here for full list.

Array example: [ decode_latin1, replace_non_printable] Object example: { col1: [ replace_accents ], col2: [ parse_uuid ]}

trim_space

Whether white spaces at beginning or end of parsed values should be removed. Default is false.

Target

Here is the Go struct for target options:

type TargetOptions struct {
	AddNewColumns    *bool               `json:"add_new_columns" yaml:"add_new_columns"`
	AdjustColumnType *bool               `json:"adjust_column_type" yaml:"adjust_column_type"`
	ColumnCasing     *ColumnCasing       `json:"column_casing" yaml:"column_casing"`
	Compression      *iop.CompressorType `json:"compression" yaml:"compression"`
	DatetimeFormat   string              `json:"datetime_format" yaml:"datetime_format"`
	Delimiter        string              `json:"delimiter" yaml:"delimiter"`
	FileMaxBytes     int64               `json:"file_max_bytes" yaml:"file_max_bytes"`
	FileMaxRows      int64               `json:"file_max_rows" yaml:"file_max_rows"`
	Format           filesys.FileType    `json:"format" yaml:"format"`
	Header           *bool               `json:"header" yaml:"header"`
	MaxDecimals      *int                `json:"max_decimals" yaml:"max_decimals"`
	PostSQL          string              `json:"post_sql" yaml:"post_sql"`
	PreSQL           string              `json:"pre_sql" yaml:"pre_sql"`
	TableDDL         string              `json:"table_ddl" yaml:"table_ddl"`
	TableKeys        map[string][]string `json:"table_keys" yaml:"table_keys"`
	TableTmp         string              `json:"table_tmp" yaml:"table_tmp"`
	UseBulk          *bool               `json:"use_bulk" yaml:"use_bulk"`
}
KeyDescription

add_new_columns

(Only for database target)

Whether to add new columns from stream not found in target table (when mode is not full-refresh). Default is true.

adjust_column_type

(Only for database target)

Whether to adjust the column type when needed. Default is false. [BETA]

column_casing

Whether to convert the column name casing. This facilitates querying tables in target database without using quotes. Accepts source (keep original casing), target (converts casing according to target database), snake (converts snake casing according to target database). Default is source.

compression

(Only for file target)

The type of compression to use when writing files. Valid inputs are none, auto and gzip, zstd, snappy. Default is auto.

datetime_format

(Only for file target)

The ISO 8601 date format to use when writing date values. Default is auto

delimiter

(Only for file target)

The delimiter to use when writing tabular files. Default is ,.

file_max_bytes

(Only for file target)

The maximum number of bytes to write to a file. 0 means infinite number of bytes. When a value greater than 0 is specified, the output location will be a folder with many parts in it. Default is 16000000. Does not work with parquet file format (use file_max_rows instead).

file_max_rows

(Only for file target)

The maximum number of rows (usually lines) to write to a file. 0 means infinite number of rows. When a value greater than 0 is specified, the output location will be a folder with many parts in it. Default is 500000.

format

(Only for file target)

The format of the file(s). Options are: csv, parquet, xlsx, json and jsonlines.

header

(Only for file target)

Whether to write the first line as header. Default is true.

post_sql

(Only for database target)

The SQL query to run after loading.

pre_sql

(Only for database target)

The SQL query to run before loading.

table_ddl

(Only for database target)

The table DDL to use when writing to a database. Default is auto-generated by Sling. Accepts the {col_types} runtime variable. Example: create table {object_name} ({col_types}) engine=MergeTree

table_keys

(Only for database target)

The table keys to define when creating a table, such as partition, cluster, sort, etc. Each entry defines an array of column names, or expression. See Examples here.

table_tmp

(Only for database target)

The temporary table name that should be used when loading into a database. Default is auto-generated by Sling.

use_bulk

(Only for database target)

Whether to use external bulk loading tools, if installed and available in the PATH environment variable. Sling looks to use bcp for SQL Server, sqlldr for Oracle and mysql for MySQL. If false, traditional batch INSERT loading will be used. Default is true.

Third Party Tools / Drivers

sling can take advantage of several 3rd party free tools to speed up insert speeds depending on the kind of database being interacted with. Those need to be installed separately and available in the PATH environment variable for use. If you use our docker image, those tools should be already available (including the Oracle client). The following tools are supported:

Last updated