Transformations

Using Sling transformations

Can Sling do the T part of ELT?

Not in the traditional sense. However, Sling is capable of the below transformations.

Here is a list of built-in transforms that Sling can perform:

  • decode_latin1: Decodes Latin-1 (ISO-8859-1) encoded text to UTF-8

  • decode_latin5: Decodes Latin-5 (ISO-8859-5) encoded text to UTF-8

  • decode_latin9: Decodes Latin-9 (ISO-8859-15) encoded text to UTF-8

  • decode_utf8: Decodes UTF-8 encoded text

  • decode_utf8_bom: Decodes UTF-8 with BOM encoded text

  • decode_utf16: Decodes UTF-16 encoded text to UTF-8

  • decode_windows1250: Decodes Windows-1250 encoded text to UTF-8

  • decode_windows1252: Decodes Windows-1252 encoded text to UTF-8

  • duckdb_list_to_text: Adds a space suffix to DuckDB lists to prevent JSON parsing errors

  • encode_latin1: Encodes text to Latin-1 (ISO-8859-1)

  • encode_latin5: Encodes text to Latin-5 (ISO-8859-5)

  • encode_latin9: Encodes text to Latin-9 (ISO-8859-15)

  • encode_utf8: Encodes text to UTF-8

  • encode_utf8_bom: Encodes text to UTF-8 with BOM

  • encode_utf16: Encodes text to UTF-16

  • encode_windows1250: Encodes text to Windows-1250

  • encode_windows1252: Encodes text to Windows-1252

  • hash_md5: Generates MD5 hash of the input

  • hash_sha256: Generates SHA-256 hash of the input

  • hash_sha512: Generates SHA-512 hash of the input

  • parse_bit: Parses binary data as bits

  • parse_fix: Parses FIX (Financial Information eXchange) protocol messages into JSON format

  • parse_uuid: Parses 16-byte UUID into string format

  • parse_ms_uuid: Parses 16-byte Microsoft UUID into string format

  • replace_0x00: Replaces null characters (0x00) with an empty string

  • replace_accents: Replaces accented characters with their non-accented equivalents

  • replace_non_printable: Replaces or removes non-printable characters

  • trim_space: Removes leading and trailing whitespace

  • set_timezone (v1.2.16): Sets the timezone for datetime values. Accepts the IANA timezone string (e.g. "America/New_York")

Examples using the CLI

# Transform at stream Level (applied to all columns)
sling run \
  --src-conn POSTGRES \
  --src-stream my_schema.my_table \
  --tgt-conn SNOWFLAKE \
  --tgt-object new_schema.{stream_schema}_{stream_table} \
  --transforms '[ decode_utf16, replace_accents ]' # will decode first, then replace accents

# Transform at column Level
sling run \
  --src-conn POSTGRES \
  --src-stream my_schema.my_table \
  --transforms '{ col1: [ hash_md5 ], col5: [ trim_space, parse_uuid ] }' \
  --stdout --limit 10

# Transform at stream level (with "*") and column Level
sling run \
  --src-conn POSTGRES \
  --src-stream my_schema.my_table \
  --stdout --limit 10 \
  --transforms '{ "*": [ decode_utf16 ], col_sensitive: [ hash_sha512 ] }'

Examples using a replication.yaml

source: MY_SOURCE_FILE
target: MY_TARGET

default:
  # Transform at stream Level (applied to all columns)
  transforms: [ decode_utf16, replace_accents ]

streams:
  path/to/file1.csv:
  path/to/file2.csv:
    # Transform at column Level
    transforms: 
      "*": [ decode_utf16, replace_accents ] # all columns
      col_sensitive: [ hash_sha512 ] # only this column
source: MY_SOURCE_DB
target: MY_TARGET

default:
  transforms: [ set_timezone('America/New_York') ]

streams:
  my_schema.my_table:

Other Transformations

Custom SQL

Custom SELECT query as the source stream, using a custom SQL input as a source.stream. Here is an example::

sling run \
  --src-conn STARROCKS \
  --src-stream "SELECT columnB, columnA FROM tbl WHERE columnB > 6000" \
  --tgt-conn MYSQL \
  --tgt-object mysql.tbl \
  --mode full-refresh
source: STARROCKS
target: MYSQL

streams:
  my_table:
    sql: "SELECT columnB, columnA FROM tbl WHERE columnB > 6000"
    object: mysql.tbl
    mode: full-refresh

Column Type Casting

Column type coercion, the columns key in source.options (see here). This is especially useful for file sources. Here is an example casting types of columns:

sling run \
  --src-conn GCS \
  --src-stream path/to/file3.csv \
  --tgt-conn CLICKHOUSE \
  --tgt-object default.gcs_{stream_file_name} \
  --columns '{ col1: string, col2: "decimal(20,10)", col3: json }'
source: GCS
target: CLICKHOUSE

streams:
  path/to/file3.csv:
    object: default.gcs_{stream_file_name}
    mode: full-refresh
    columns:
      col1: string
      col2: decimal(20,10)
      col3: json

JSON Flattening

JSON Flattening: the flatten key in source.options (see here). Example:

sling run \
  --src-conn AWS_S3 \
  --src-stream s3://path/to/file.json \
  --tgt-object file://./target/models.csv \
  --src-options '{flatten: true}'
source: AWS_S3
target: FILE

streams:
  s3://path/to/file.json:
    object: file://./target/models.csv
    source_options:
      flatten: true

JMESPath Transforms

JMESPath transformations for JSON source files: the jmespath key in source.options. Here is a example, extracting the models information from the DBT manifest file using jmespath + flatten options:

sling run \
  --src-stream file://./target/manifest.json \
  --tgt-object file://./target/models.csv \
  --src-options '{
      jmespath: "nodes.*.{resource_type: resource_type, database: database, schema: schema, name: name, relation_name: relation_name, original_file_path: original_file_path, materialized: config.materialized }",
      flatten: true
    }'
source: LOCAL
target: LOCAL

streams:
  file://./target/manifest.json:
    object: file://./target/models.csv
    source_options:
      jmespath: "nodes.*.{resource_type: resource_type, database: database, schema: schema, name: name, relation_name: relation_name, original_file_path: original_file_path, materialized: config.materialized }"
      flatten: true

Schema Evolution

When using Sling to extract/load data in a incremental manner, it will attempt to match whatever columns are present in both the source stream and target table. If an extra column is present in the source stream, it will add it in the target table. If no columns match from source stream at all, it will error. At least the primary_key or update_key must be present in the target table.

See below for a simple example, mimicking the addition and removal of columns.

# Initial data

$ echo 'a,b,c
1,2,3
4,5,6' > test1.csv

$ sling run \
  --src-stream file://./test1.csv \
  --tgt-conn postgres \
  --tgt-object public.test1

<...log output omitted>

$ sling run \
  --src-conn postgres \
  --src-stream public.test1 \
  --stdout

a,b,c,_sling_loaded_at
1,2,3,1707869559
4,5,6,1707869559
# test2.csv is missing column b

echo 'a,c
7,8' > test2.csv

$ sling run \
  --src-stream file://./test2.csv \
  --tgt-conn postgres \
  --tgt-object public.test1 \
  --mode incremental \
  --primary-key a

<...log output omitted>

$ sling run \
  --src-conn postgres \
  --src-stream public.test1 \
  --stdout

a,b,c,_sling_loaded_at
1,2,3,1707869559
4,5,6,1707869559
7,,8,1707869689
# test3.csv is missing column b, c and has extra column d

$ echo 'a,d
9,10' > test3.csv

$ sling run \
  --src-stream file://./test3.csv \
  --tgt-conn postgres \
  --tgt-object public.test1 \
  --mode incremental \
  --primary-key a

<...log output omitted>

$ sling run \
  --src-conn postgres \
  --src-stream public.test1 \
  --stdout

a,b,c,_sling_loaded_at,d
1,2,3,1707869559,
4,5,6,1707869559,
7,,8,1707869689,
9,,,1707870320,10

We can see that sling handled the changes properly, in a non-destructive manner. If the source stream were from a database, the same rules would apply, whether a column disappeared or appeared.

Suggestions?

There are currently no plans to supports other custom transformations, in the traditional "T" sense. However, any suggestions are welcome.

Last updated