Transforms
Using Sling transforms
Sling provides powerful data transformation capabilities that allow you to clean, modify, and enhance your data during the replication process. Transform data inline without needing separate ETL tools or complex SQL queries.
Starting with v1.4.17, Sling introduced Functions and Staged Transforms - a major enhancement that provides access to 50+ built-in functions with flexible expression-based transformations. Before this version, only a limited set of legacy transforms were available.
Transform Input Structures
Sling supports three different input structures for transforms, each offering varying degrees of flexibility and simplicity:
Array of Strings - Simple Global Transforms
Apply transform functions in sequence to all column values. Works with functions that accept 1 parameter.
# Apply to all columns globally
transforms: ["trim_space", "remove_diacritics"]# CLI usage
sling run --transforms '["trim_space", "remove_diacritics"]' ...Use case: When you need the same transformation applied to every column in your dataset.
Map/Object - Column-Specific Transforms
Apply transform functions in sequence to specific columns. Works with functions that accept 1 parameter.
# Apply to specific columns
transforms:
name: ["trim_space", "upper"]
email: ["lower"]
customer_id: ["hash"]
target: ["bool_parse"]
text: ["replace_non_printable"]# CLI usage
sling run --transforms '{"name": ["trim_space", "upper"], "email": ["lower"]}' ...Use case: When you need different transformations for different columns, or want to combine global and column-specific transforms.
Array of Objects - Staged Transforms
Multi-stage transformations with expressions and functions, evaluated in order. Each stage can modify existing columns or create new ones.
transforms:
# Stage 1: Clean text fields
- text_field: "trim_space(value)"
email: "lower(value)"
# Stage 2: Create new columns using record references
- full_name: 'record.first_name + " " + record.last_name'
email_hash: 'hash(record.email, "md5")'
# Stage 3: Conditional logic
- category: 'record.amount >= 1000 ? "premium" : "standard"'Use case: When you need complex, multi-step transformations with conditional logic, cross-column references, or computed fields.
Staged Transforms
Staged transforms provide the most powerful and flexible transformation capabilities in Sling. They allow you to:
Multi-stage processing: Apply transformations in sequential stages
Cross-column references: Use
record.<column>to reference other column valuesConditional logic: Apply
if/then/elselogic based on multiple conditionsCreate new columns: Generate computed columns based on existing data
Access to 50+ functions: Use string, numeric, date, and utility functions
Syntax
Staged transforms use an array of transformation stages. Each stage can modify existing columns or create new ones using expressions parsed by Goval, a powerful Go expression evaluator:
transforms:
# Stage 1: Clean text fields
- text_field: "trim_space(value)"
email: "lower(value)"
# Stage 2: Create new columns using record references
- full_name: 'record.first_name + " " + record.last_name'
email_hash: 'hash(record.email, "md5")'
# Stage 3: Conditional logic
- category: 'record.amount >= 1000 ? "premium" : "standard"'Key Features
1. Value Transformations
Use value to reference the current column's value:
transforms:
- name: 'upper(value)' # Convert to uppercase
age: 'cast(value, "int")' # Convert to integer
- "*": "trim_space(value)" # Then, apply to all columns2. Record References
Use record.<column> to reference other columns in the same row:
transforms:
- full_name: 'record.first_name + " " + record.last_name'
total_amount: 'record.quantity * record.price'
display_name: 'record.last_name + ", " + record.first_name'3. Multi-stage Processing
Each stage can build upon the results of previous stages:
transforms:
# Stage 1: Clean the data
- name: "trim_space(value)"
email: "lower(value)"
# Stage 2: Use cleaned data from stage 1
- full_name: 'record.name + " (" + record.email + ")"'
# Stage 3: Use results from previous stages
- summary: 'record.full_name + " - processed"'4. Conditional Transformations
Use ternary operators and conditional logic:
transforms:
- status: 'record.amount >= 1000 ? "premium" : (record.amount >= 500 ? "standard" : "basic")'
discount: 'record.status == "premium" ? record.amount * 0.15 : record.amount * 0.05'
message: 'record.age >= 65 ? "Senior discount available" : "Standard pricing"'5. Available Functions
Access to 50+ built-in functions including:
String functions:
upper(),lower(),trim_space(),replace(),substring()Numeric functions:
int_parse(),float_parse(),greatest(),least()Date functions:
now(),date_parse(),date_format(),date_add()Utility functions:
hash(),coalesce(),cast(),uuid()Conditional functions:
if(),equals(),is_null(),is_empty()
See the complete Functions Reference for all available functions.
6. Creating New Columns On-The-Fly
One of the most powerful features of staged transforms is the ability to create new columns dynamically during data processing. Unlike traditional transformations that only modify existing columns, staged transforms let you add computed columns, derived fields, and calculated values without modifying your source data or schema.
streams:
my_stream:
transforms:
# first stage: replace column (using value)
- mycol: 'coalesce(value, "N/A")' # replace column
email_hashed: 'hash(record.email, "md5")' # create new column
# second stage: create new column
- new_col: 'record.mycol * 100' # create new column
# final stage: cast all as string and replace accents
- "*": 'replace_accents(cast(value, "string"))' Key Benefits:
No schema changes needed: Add columns without altering source tables
Dynamic calculations: Create computed fields based on existing data
Multi-stage logic: Build complex columns using results from previous stages
Flexible data enrichment: Add metadata, hashes, flags, or derived metrics
Common Use Cases:
Data enrichment: Add calculated fields, hash values, or lookup results
Business logic: Create status flags, categories, or computed metrics
Data quality: Add validation flags, completeness scores, or data lineage
Analytics preparation: Create derived dimensions or calculated measures
Examples
Example 1: Customer Data Processing
streams:
customers:
transforms:
# Stage 1: Clean and normalize
- first_name: "trim_space(value)"
last_name: "trim_space(value)"
email: "lower(value)"
# Stage 2: Create computed columns
- full_name: 'record.first_name + " " + record.last_name'
email_hash: 'hash(record.email, "md5")'
# Stage 3: Customer categorization
- customer_type: |
record.total_orders >= 50 ? "vip" : (
record.total_orders >= 10 ? "regular" : "new"
)Example 2: E-commerce Order Processing
streams:
orders:
transforms:
# Stage 1: Parse and clean numeric values
- quantity: "int_parse(value)"
unit_price: "float_parse(value)"
status: "lower(trim_space(value))"
# Stage 2: Calculate totals
- subtotal: 'record.quantity * record.unit_price'
tax_amount: 'record.subtotal * 0.08'
total_amount: 'record.subtotal + record.tax_amount'
# Stage 3: Status and priority logic
- priority: |
record.total_amount >= 1000 ? "high" : (
record.status == "urgent" ? "high" : "normal"
)
shipping_method: |
record.priority == "high" ? "express" : "standard"Example 3: Using CLI
# Using staged transforms with CLI
sling run \
--src-conn POSTGRES \
--src-stream my_schema.raw_data \
--tgt-conn SNOWFLAKE \
--tgt-object processed.clean_data \
--transforms '[
{"name": "trim_space(value)", "email": "lower(value)"},
{"full_name": "record.name + \" (\" + record.email + \")\""},
{"status": "record.active ? \"enabled\" : \"disabled\""}
]'Other Transformation Methods
Beyond the main transform structures, Sling provides additional methods for data transformation:
Custom SQL Transformations
Use custom SELECT queries as the source stream for complex transformations that go beyond what built-in transforms can handle:
sling run \
--src-conn STARROCKS \
--src-stream "SELECT columnB, columnA FROM tbl WHERE columnB > 6000" \
--tgt-conn MYSQL \
--tgt-object mysql.tbl \
--mode full-refreshsource: STARROCKS
target: MYSQL
streams:
my_table:
sql: "SELECT columnB, columnA FROM tbl WHERE columnB > 6000"
object: mysql.tbl
mode: full-refreshJSON Flattening
Automatically flatten nested JSON structures using the flatten key in source.options (see configuration docs):
sling run \
--src-conn AWS_S3 \
--src-stream s3://path/to/file.json \
--tgt-object file://./target/models.csv \
--src-options '{flatten: true}'source: AWS_S3
target: FILE
streams:
s3://path/to/file.json:
object: file://./target/models.csv
source_options:
flatten: trueJMESPath Transforms
Extract and transform specific data from JSON responses using JMESPath expressions via the jmespath key in source.options. Here's an example extracting models information from a DBT manifest file:
sling run \
--src-stream file://./target/manifest.json \
--tgt-object file://./target/models.csv \
--src-options '{
jmespath: "nodes.*.{resource_type: resource_type, database: database, schema: schema, name: name, relation_name: relation_name, original_file_path: original_file_path, materialized: config.materialized }",
flatten: true
}'source: LOCAL
target: LOCAL
streams:
file://./target/manifest.json:
object: file://./target/models.csv
source_options:
jmespath: "nodes.*.{resource_type: resource_type, database: database, schema: schema, name: name, relation_name: relation_name, original_file_path: original_file_path, materialized: config.materialized }"
flatten: trueSchema Evolution
When using Sling to extract/load data in a incremental manner, it will attempt to match whatever columns are present in both the source stream and target table. If an extra column is present in the source stream, it will add it in the target table. If no columns match from source stream at all, it will error. At least the primary_key or update_key must be present in the target table.
See below for a simple example, mimicking the addition and removal of columns.
# Initial data
$ echo 'a,b,c
1,2,3
4,5,6' > test1.csv
$ sling run \
--src-stream file://./test1.csv \
--tgt-conn postgres \
--tgt-object public.test1
<...log output omitted>
$ sling run \
--src-conn postgres \
--src-stream public.test1 \
--stdout
a,b,c,_sling_loaded_at
1,2,3,1707869559
4,5,6,1707869559# test2.csv is missing column b
echo 'a,c
7,8' > test2.csv
$ sling run \
--src-stream file://./test2.csv \
--tgt-conn postgres \
--tgt-object public.test1 \
--mode incremental \
--primary-key a
<...log output omitted>
$ sling run \
--src-conn postgres \
--src-stream public.test1 \
--stdout
a,b,c,_sling_loaded_at
1,2,3,1707869559
4,5,6,1707869559
7,,8,1707869689# test3.csv is missing column b, c and has extra column d
$ echo 'a,d
9,10' > test3.csv
$ sling run \
--src-stream file://./test3.csv \
--tgt-conn postgres \
--tgt-object public.test1 \
--mode incremental \
--primary-key a
<...log output omitted>
$ sling run \
--src-conn postgres \
--src-stream public.test1 \
--stdout
a,b,c,_sling_loaded_at,d
1,2,3,1707869559,
4,5,6,1707869559,
7,,8,1707869689,
9,,,1707870320,10We can see that sling handled the changes properly, in a non-destructive manner. If the source stream were from a database, the same rules would apply, whether a column disappeared or appeared.
Legacy Transform Syntax (Before v1.4.17)
Note: This is the legacy syntax for transforms used before v1.4.17. While still supported for backwards compatibility, we recommend using the new Transform Input Structures for new implementations as they provide much more powerful capabilities and access to functions.
These are legacy single-column transforms that can be applied to clean and modify data:
parse_bit: Parses binary data as bitsparse_fix: Parses FIX (Financial Information eXchange) protocol messages into JSON formatparse_uuid: Parses 16-byte UUID into string formatparse_ms_uuid: Parses 16-byte Microsoft UUID into string formatreplace_0x00: Replaces null characters (0x00) with an empty stringreplace_accents: Replaces accented characters with their non-accented equivalentsreplace_non_printable: Replaces or removes non-printable characterstrim_space: Removes leading and trailing whitespaceempty_as_null: If value is empty, set as null
Last updated
Was this helpful?