Transforms

Using Sling transforms

Sling provides powerful data transformation capabilities that allow you to clean, modify, and enhance your data during the replication process. Transform data inline without needing separate ETL tools or complex SQL queries.

Starting with v1.4.17, Sling introduced Functions and Staged Transforms - a major enhancement that provides access to 50+ built-in functions with flexible expression-based transformations. Before this version, only a limited set of legacy transforms were available.

Transform Input Structures

Sling supports three different input structures for transforms, each offering varying degrees of flexibility and simplicity:

Array of Strings - Simple Global Transforms

Apply transform functions in sequence to all column values. Works with functions that accept 1 parameter.

# Apply to all columns globally
transforms: ["trim_space", "remove_diacritics"]
# CLI usage
sling run --transforms '["trim_space", "remove_diacritics"]' ...

Use case: When you need the same transformation applied to every column in your dataset.

Map/Object - Column-Specific Transforms

Apply transform functions in sequence to specific columns. Works with functions that accept 1 parameter.

# Apply to specific columns
transforms:
  name: ["trim_space", "upper"]
  email: ["lower"]
  customer_id: ["hash"]
  target: ["bool_parse"]
  text: ["replace_non_printable"]
# CLI usage
sling run --transforms '{"name": ["trim_space", "upper"], "email": ["lower"]}' ...

Use case: When you need different transformations for different columns, or want to combine global and column-specific transforms.

Array of Objects - Staged Transforms

Multi-stage transformations with expressions and functions, evaluated in order. Each stage can modify existing columns or create new ones.

transforms:
  # Stage 1: Clean text fields
  - text_field: "trim_space(value)"
    email: "lower(value)"
  
  # Stage 2: Create new columns using record references
  - full_name: 'record.first_name + " " + record.last_name'
    email_hash: 'hash(record.email, "md5")'
  
  # Stage 3: Conditional logic
  - category: 'record.amount >= 1000 ? "premium" : "standard"'

Use case: When you need complex, multi-step transformations with conditional logic, cross-column references, or computed fields.

Staged Transforms

Staged transforms provide the most powerful and flexible transformation capabilities in Sling. They allow you to:

  • Multi-stage processing: Apply transformations in sequential stages

  • Cross-column references: Use record.<column> to reference other column values

  • Conditional logic: Apply if/then/else logic based on multiple conditions

  • Create new columns: Generate computed columns based on existing data

  • Access to 50+ functions: Use string, numeric, date, and utility functions

Syntax

Staged transforms use an array of transformation stages. Each stage can modify existing columns or create new ones using expressions parsed by Goval, a powerful Go expression evaluator:

transforms:
  # Stage 1: Clean text fields
  - text_field: "trim_space(value)"
    email: "lower(value)"
  
  # Stage 2: Create new columns using record references
  - full_name: 'record.first_name + " " + record.last_name'
    email_hash: 'hash(record.email, "md5")'
  
  # Stage 3: Conditional logic
  - category: 'record.amount >= 1000 ? "premium" : "standard"'

Key Features

1. Value Transformations

Use value to reference the current column's value:

transforms:
  - name: 'upper(value)'          # Convert to uppercase
    age: 'cast(value, "int")'     # Convert to integer

  - "*": "trim_space(value)"      # Then, apply to all columns

2. Record References

Use record.<column> to reference other columns in the same row:

transforms:
  - full_name: 'record.first_name + " " + record.last_name'
    total_amount: 'record.quantity * record.price'
    display_name: 'record.last_name + ", " + record.first_name'

3. Multi-stage Processing

Each stage can build upon the results of previous stages:

transforms:
  # Stage 1: Clean the data
  - name: "trim_space(value)"
    email: "lower(value)"
  
  # Stage 2: Use cleaned data from stage 1
  - full_name: 'record.name + " (" + record.email + ")"'
  
  # Stage 3: Use results from previous stages  
  - summary: 'record.full_name + " - processed"'

4. Conditional Transformations

Use ternary operators and conditional logic:

transforms:
  - status: 'record.amount >= 1000 ? "premium" : (record.amount >= 500 ? "standard" : "basic")'
    discount: 'record.status == "premium" ? record.amount * 0.15 : record.amount * 0.05'
    message: 'record.age >= 65 ? "Senior discount available" : "Standard pricing"'

5. Available Functions

Access to 50+ built-in functions including:

  • String functions: upper(), lower(), trim_space(), replace(), substring()

  • Numeric functions: int_parse(), float_parse(), greatest(), least()

  • Date functions: now(), date_parse(), date_format(), date_add()

  • Utility functions: hash(), coalesce(), cast(), uuid()

  • Conditional functions: if(), equals(), is_null(), is_empty()

See the complete Functions Reference for all available functions.

6. Creating New Columns On-The-Fly

One of the most powerful features of staged transforms is the ability to create new columns dynamically during data processing. Unlike traditional transformations that only modify existing columns, staged transforms let you add computed columns, derived fields, and calculated values without modifying your source data or schema.

streams:
  my_stream:
    transforms:
      # first stage: replace column (using value)
      - mycol: 'coalesce(value, "N/A")'     # replace column
        email_hashed: 'hash(record.email, "md5")'  # create new column
      
      # second stage: create new column  
      - new_col: 'record.mycol * 100'       # create new column
      
      # final stage: cast all as string and replace accents
      - "*": 'replace_accents(cast(value, "string"))'        

Key Benefits:

  • No schema changes needed: Add columns without altering source tables

  • Dynamic calculations: Create computed fields based on existing data

  • Multi-stage logic: Build complex columns using results from previous stages

  • Flexible data enrichment: Add metadata, hashes, flags, or derived metrics

Common Use Cases:

  • Data enrichment: Add calculated fields, hash values, or lookup results

  • Business logic: Create status flags, categories, or computed metrics

  • Data quality: Add validation flags, completeness scores, or data lineage

  • Analytics preparation: Create derived dimensions or calculated measures

Examples

Example 1: Customer Data Processing

streams:
  customers:
    transforms:
      # Stage 1: Clean and normalize
      - first_name: "trim_space(value)"
        last_name: "trim_space(value)"
        email: "lower(value)"
      
      # Stage 2: Create computed columns
      - full_name: 'record.first_name + " " + record.last_name'
        email_hash: 'hash(record.email, "md5")'
      
      # Stage 3: Customer categorization
      - customer_type: |
          record.total_orders >= 50 ? "vip" : (
            record.total_orders >= 10 ? "regular" : "new"
          )

Example 2: E-commerce Order Processing

streams:
  orders:
    transforms:
      # Stage 1: Parse and clean numeric values
      - quantity: "int_parse(value)"
        unit_price: "float_parse(value)"
        status: "lower(trim_space(value))"
      
      # Stage 2: Calculate totals
      - subtotal: 'record.quantity * record.unit_price'
        tax_amount: 'record.subtotal * 0.08'
        total_amount: 'record.subtotal + record.tax_amount'
      
      # Stage 3: Status and priority logic
      - priority: |
          record.total_amount >= 1000 ? "high" : (
            record.status == "urgent" ? "high" : "normal"
          )
        shipping_method: |
          record.priority == "high" ? "express" : "standard"

Example 3: Using CLI

# Using staged transforms with CLI
sling run \
  --src-conn POSTGRES \
  --src-stream my_schema.raw_data \
  --tgt-conn SNOWFLAKE \
  --tgt-object processed.clean_data \
  --transforms '[
    {"name": "trim_space(value)", "email": "lower(value)"},
    {"full_name": "record.name + \" (\" + record.email + \")\""},
    {"status": "record.active ? \"enabled\" : \"disabled\""}
  ]'

Other Transformation Methods

Beyond the main transform structures, Sling provides additional methods for data transformation:

Custom SQL Transformations

Use custom SELECT queries as the source stream for complex transformations that go beyond what built-in transforms can handle:

sling run \
  --src-conn STARROCKS \
  --src-stream "SELECT columnB, columnA FROM tbl WHERE columnB > 6000" \
  --tgt-conn MYSQL \
  --tgt-object mysql.tbl \
  --mode full-refresh
source: STARROCKS
target: MYSQL

streams:
  my_table:
    sql: "SELECT columnB, columnA FROM tbl WHERE columnB > 6000"
    object: mysql.tbl
    mode: full-refresh

JSON Flattening

Automatically flatten nested JSON structures using the flatten key in source.options (see configuration docs):

sling run \
  --src-conn AWS_S3 \
  --src-stream s3://path/to/file.json \
  --tgt-object file://./target/models.csv \
  --src-options '{flatten: true}'
source: AWS_S3
target: FILE

streams:
  s3://path/to/file.json:
    object: file://./target/models.csv
    source_options:
      flatten: true

JMESPath Transforms

Extract and transform specific data from JSON responses using JMESPath expressions via the jmespath key in source.options. Here's an example extracting models information from a DBT manifest file:

sling run \
  --src-stream file://./target/manifest.json \
  --tgt-object file://./target/models.csv \
  --src-options '{
      jmespath: "nodes.*.{resource_type: resource_type, database: database, schema: schema, name: name, relation_name: relation_name, original_file_path: original_file_path, materialized: config.materialized }",
      flatten: true
    }'
source: LOCAL
target: LOCAL

streams:
  file://./target/manifest.json:
    object: file://./target/models.csv
    source_options:
      jmespath: "nodes.*.{resource_type: resource_type, database: database, schema: schema, name: name, relation_name: relation_name, original_file_path: original_file_path, materialized: config.materialized }"
      flatten: true

Schema Evolution

When using Sling to extract/load data in a incremental manner, it will attempt to match whatever columns are present in both the source stream and target table. If an extra column is present in the source stream, it will add it in the target table. If no columns match from source stream at all, it will error. At least the primary_key or update_key must be present in the target table.

See below for a simple example, mimicking the addition and removal of columns.

# Initial data

$ echo 'a,b,c
1,2,3
4,5,6' > test1.csv

$ sling run \
  --src-stream file://./test1.csv \
  --tgt-conn postgres \
  --tgt-object public.test1

<...log output omitted>

$ sling run \
  --src-conn postgres \
  --src-stream public.test1 \
  --stdout

a,b,c,_sling_loaded_at
1,2,3,1707869559
4,5,6,1707869559
# test2.csv is missing column b

echo 'a,c
7,8' > test2.csv

$ sling run \
  --src-stream file://./test2.csv \
  --tgt-conn postgres \
  --tgt-object public.test1 \
  --mode incremental \
  --primary-key a

<...log output omitted>

$ sling run \
  --src-conn postgres \
  --src-stream public.test1 \
  --stdout

a,b,c,_sling_loaded_at
1,2,3,1707869559
4,5,6,1707869559
7,,8,1707869689
# test3.csv is missing column b, c and has extra column d

$ echo 'a,d
9,10' > test3.csv

$ sling run \
  --src-stream file://./test3.csv \
  --tgt-conn postgres \
  --tgt-object public.test1 \
  --mode incremental \
  --primary-key a

<...log output omitted>

$ sling run \
  --src-conn postgres \
  --src-stream public.test1 \
  --stdout

a,b,c,_sling_loaded_at,d
1,2,3,1707869559,
4,5,6,1707869559,
7,,8,1707869689,
9,,,1707870320,10

We can see that sling handled the changes properly, in a non-destructive manner. If the source stream were from a database, the same rules would apply, whether a column disappeared or appeared.

Legacy Transform Syntax (Before v1.4.17)

Note: This is the legacy syntax for transforms used before v1.4.17. While still supported for backwards compatibility, we recommend using the new Transform Input Structures for new implementations as they provide much more powerful capabilities and access to functions.

These are legacy single-column transforms that can be applied to clean and modify data:

  • parse_bit: Parses binary data as bits

  • parse_fix: Parses FIX (Financial Information eXchange) protocol messages into JSON format

  • parse_uuid: Parses 16-byte UUID into string format

  • parse_ms_uuid: Parses 16-byte Microsoft UUID into string format

  • replace_0x00: Replaces null characters (0x00) with an empty string

  • replace_accents: Replaces accented characters with their non-accented equivalents

  • replace_non_printable: Replaces or removes non-printable characters

  • trim_space: Removes leading and trailing whitespace

  • empty_as_null: If value is empty, set as null

Last updated

Was this helpful?