Parallel Chunking ⚡

Chunking is a feature in Sling that breaks down large data transfers into smaller, manageable parts. This is particularly useful for optimizing performance, managing resources, and enabling parallel processing during incremental and backfill operations. Chunking is available with Sling CLI Pro or on a Platform plan.

Supported Chunk Types

Sling supports several chunking strategies via the source_options.chunk_size or source_options.chunk_count parameters:

Time-based chunks:

  • Hours: e.g., 6h

  • Days: e.g., 7d

  • Weeks: e.g., 1w

  • Months: e.g., 1m

  • Years: e.g., 1y

Numeric chunks:

  • Integer ranges: e.g., 1000 for chunks of 1000 records

Count-based chunks (v1.4.14+):

  • Specific number of chunks: e.g., 5 to split data into 5 equal parts

Expression-based chunks (v1.4.14+):

  • Custom expressions: e.g., mod(abs(hashtext(column_name)), {chunk_count})

Each chunk is processed independently, allowing for parallel execution when combined with SLING_THREADS.

Chunking in Different Modes

Chunking works across all replication modes (full-refresh, truncate, incremental, and backfill), helping process large datasets by breaking them into smaller batches. This is useful for memory management, progress tracking, and reducing source database load.

Time-Range Chunking

Time-range chunking splits data based on date/time columns across different modes:

source: postgres
target: snowflake

defaults:
  primary_key: id
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.orders:
    mode: full-refresh
    update_key: created_at
    source_options:
      chunk_size: 1m  # Process 1 month at a time
  
  public.events:
    mode: truncate
    update_key: event_timestamp
    source_options:
      chunk_size: 7d  # Process 1 week at a time
  
  public.logs:
    mode: incremental
    update_key: updated_at
    source_options:
      chunk_size: 1d  # Process 1 day at a time

env:
  SLING_THREADS: 4  # Enable parallel processing

Numeric-Range Chunking

Numeric-range chunking splits data based on numeric columns like IDs:

source: postgres
target: snowflake

defaults:
  primary_key: id
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.products:
    mode: full-refresh
    update_key: id
    source_options:
      chunk_size: 1000  # Process 1000 records per chunk
  
  public.customers:
    mode: truncate
    update_key: customer_id
    source_options:
      chunk_size: 500   # Process 500 customers per chunk

env:
  SLING_THREADS: 4  # Enable parallel processing

Count-based Chunking

Count-based chunking splits data into a specific number of equal chunks:

source: postgres
target: snowflake

defaults:
  primary_key: id
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.large_table:
    mode: full-refresh
    update_key: updated_at
    source_options:
      chunk_count: 10  # Split the dataset into 10 equal chunks
  
  public.historical_data:
    mode: incremental
    update_key: created_at
    source_options:
      chunk_count: 5   # Split into 5 chunks for processing

env:
  SLING_THREADS: 4  # Enable parallel processing

Chunking by Expression

Expression-based chunking allows you to define custom SQL expressions to distribute data across chunks using the chunk_expr parameter. This works across all modes and is particularly useful for:

  • Hash-based distribution for even data splitting

  • Custom partitioning logic based on specific columns

  • Complex expressions that don't rely on sequential values

  • No update key needed

source: postgres
target: snowflake

defaults:
  primary_key: id
  object: new_schema.{stream_schema}_{stream_table}

streams:
  public.customers:
    mode: full-refresh
    source_options:
      chunk_expr: mod(abs(hashtext(coalesce(first_name, ''))), {chunk_count})
      chunk_count: 4  # Creates 4 chunks with hash-based distribution
  
  public.orders:
    mode: truncate
    source_options:
      chunk_expr: mod(customer_id, {chunk_count})
      chunk_count: 6  # Creates 6 chunks based on customer_id modulo

  public.events:
    mode: incremental
    update_key: created_at
    source_options:
      chunk_expr: case when event_type = 'premium' then 0 else mod(user_id, {chunk_count} - 1) + 1 end
      chunk_count: 5  # Premium events in chunk 0, others distributed in chunks 1-4
  
  public.products:
    mode: full-refresh
    source_options:
      chunk_expr: mod(abs(hashtext(category || product_name)), {chunk_count})
      chunk_count: 3  # Hash-based on category + product name

env:
  SLING_THREADS: 4  # Enable parallel processing

Mixed Chunking Strategies

source: postgres
target: oracle

defaults:
  mode: incremental
  primary_key: id
  object: oracle.{stream_table_lower}
  target_options:
    use_bulk: false

streams:
  public.sales_data:
    update_key: sale_date
    source_options:
      chunk_size: 1m  # Monthly chunks
  
  public.user_activities:
    update_key: activity_id
    source_options:
      chunk_size: 50000  # 50k records per chunk
  
  public.dynamic_data:
    update_key: last_modified
    source_options:
      chunk_count: 8  # Split into 8 equal parts

env:
  SLING_THREADS: 4  # Enable parallel processing

For more on incremental mode basics, see incremental.md.

Chunking in Backfill Mode

Backfill mode with chunking allows loading historical data in smaller ranges, optimizing for large datasets.

source: postgres
target: oracle

defaults:
  mode: backfill
  object: oracle.{stream_table_lower}
  primary_key: [id]

streams:
  public.orders_mariadb_pg:
    update_key: update_dt
    source_options:
      range: '2018-11-01,2018-12-01'
      chunk_size: 10d  # Process in 10-day chunks

  public.orders_sqlserver_pg:
    update_key: date
    source_options:
      range: '2019-01-01,2019-06-01'
      chunk_size: 2m   # Process in 2-month chunks

  public.orders_snowflake_pg:
    update_key: id  # same as primary key
    source_options:
      range: '1,800'
      chunk_size: 200  # Process in chunks of 200 IDs

  public.large_table_pg:
    update_key: created_at
    source_options:
      range: '2023-01-01,2023-12-31'
      chunk_count: 6  # Split into 6 equal time-based chunks

  public.user_data_pg:
    update_key: user_id
    source_options:
      range: '1000,50000'
      chunk_count: 10  # Split into 10 equal numeric chunks

env:
  SLING_THREADS: 3  # Process 3 streams concurrently

Sling splits the specified range into smaller sub-ranges based on chunk_size or chunk_count. Each sub-range is processed as a separate stream.

For more on backfill mode, see backfill.md.

Chunking with Custom SQL

Combine custom SQL queries with chunking using the {incremental_where_cond} variable in your SQL.

source: MY_SOURCE_DB
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  my_schema.large_orders:
    sql: |
      select 
        o.id,
        o.order_date,
        o.customer_id,
        o.status,
        o.updated_at,
        c.name as customer_name
      from my_schema.orders o
      join my_schema.customers c on o.customer_id = c.id
      where o.finalized and ({incremental_where_cond})
      order by o.updated_at asc
    object: target_schema.enriched_orders_chunked
    source_options:
      chunk_size: 1m  # Process in monthly chunks

  my_schema.historical_events:
    sql: |
      select *
      from my_schema.events
      where type = 'init' and ({incremental_where_cond})
    object: target_schema.events_chunked
    update_key: event_timestamp
    source_options:
      chunk_count: 4  # Split into 4 equal chunks

  my_schema.user_activities:
    sql: |
      select *
      from my_schema.user_activities
      where {incremental_where_cond}
    object: target_schema.activities_chunked
    update_key: activity_id
    source_options:
      chunk_size: 10000  # Process 10,000 records per chunk
      range: '1,100000'  # Optional range for numeric chunking

  public.complex_query:
    sql: |
      select 
        u.id,
        u.username,
        u.created_at,
        p.purchase_count
      from users u
      join (
        select user_id, count(*) as purchase_count
        from purchases
        group by user_id
      ) p on u.id = p.user_id
      where u.plan = 'Pro'
        and ( {incremental_where_cond} )
      order by u.created_at asc
    object: target_schema.user_purchase_summary
    update_key: created_at
    source_options:
      chunk_expr: mod(abs(hashtext(u.username)), {chunk_count})  # Process by modulo of hash
      chunk_count: 8  # Creates 8 chunks based on username hash

env:
  SLING_THREADS: 4  # Enable parallel processing of chunks

For more on custom SQL, see custom-sql.md.

Key Benefits and Notes

  • Parallelism: Use SLING_THREADS for concurrent chunk processing

  • Error Recovery: Chunks allow resuming from failures without restarting everything

  • Progress Tracking: Better visibility for large operations

  • Requirements: Works with incremental and backfill modes; needs appropriate update_key type

  • Best Practices: Test chunk sizes for optimal performance; monitor resource usage

For complete replication mode details, see Replication Modes.

Last updated

Was this helpful?