Examples

This page provides practical examples of Sling Pipelines, demonstrating how to chain multiple steps for complex data workflows. These examples build upon the concepts from the Pipeline, Hooks and Functions documentation.

Each example includes:

  • A brief description

  • The YAML configuration

  • Key concepts demonstrated

Basic Pipeline with Logging and Replication

This simple pipeline logs the start, runs a replication, and logs the completion with runtime state.

steps:
  - type: log
    message: 'Starting pipeline execution on {date_format(now(), "%Y-%m-%d %H:%M:%S")}. Runtime state: {runtime_state}'

  - type: replication
    path: path/to/your/replication.yaml
    id: main_replication

  - type: log
    message: 'Pipeline completed. Final state: {runtime_state}'
    level: info

  - type: command
    command: 'echo "Replication status: {upper(state.main_replication.status)}"'
    print: true

Key Concepts:

  • Basic sequencing of steps

  • Using log for monitoring with date_format and now functions

  • Running a replication as a step

  • Accessing state from previous steps

  • Executing system commands with command using upper function

File Processing Pipeline with Looping

This pipeline lists files from S3, copies them to Azure, and logs the process using a group for looping.

steps:
  - type: list
    id: s3_files
    location: aws_s3/your-bucket/files/
    recursive: true
    only: files

  - type: group
    loop: state.s3_files.result
    steps:
      - type: log
        message: 'Processing file {loop.index + 1}: {loop.value.name} ({loop.value.size} bytes)'

      - type: copy
        from: '{loop.value.location}'
        to: azure_storage/processed/{coalesce(loop.value.name, "unnamed_file")}.processed
        id: file_copy

      - type: log
        if: state.file_copy.bytes_written > 0
        message: 'Successfully copied {state.file_copy.bytes_written} bytes'
        level: info

  - type: log
    message: 'Processed {length(state.s3_files.result)} files'

Key Concepts:

  • Listing files with list

  • Looping with group and loop

  • Conditional logging with log

  • File transfer using copy

  • Accessing loop variables (loop.index, loop.value)

  • Using functions like upper, split_part, coalesce, and length in expressions

Data Quality Pipeline

This pipeline runs a replication, performs quality checks via queries, and notifies if issues are found.

steps:
  - type: replication
    path: replications/data_sync.yaml
    id: data_sync

  - type: query
    connection: target_db
    query: |
      SELECT COUNT(*) as invalid_count
      FROM target_schema.my_table
      WHERE some_column IS NULL
    id: quality_check
    into: qc_results

  - type: check
    check: store.qc_results[0].invalid_count == 0
    failure_message: 'Found {store.qc_results[0].invalid_count} invalid records'
    on_failure: warn

  - type: http
    if: store.qc_results[0].invalid_count > 0
    url: https://alerts.example.com/notify
    method: POST
    payload: |
      {
        "issue": "Data quality failure",
        "details": "Invalid records: {store.qc_results[0].invalid_count}",
        "table": "target_schema.my_table",
        "checked_at": "{date_format(now(), "%Y-%m-%d")}"
      }

Key Concepts:

  • Running replications with replication

  • Executing database queries with query

  • Validation with check

  • Sending notifications via http using date_format function

  • Storing query results with into

  • Conditional execution

  • Accessing stored values with store.

Cleanup and Archiving Pipeline

This pipeline archives files after processing and cleans up temporary data.

steps:
  - type: list
    id: temp_files
    location: local//tmp/processing/
    only: files

  - type: group
    loop: state.temp_files.result
    steps:
      - type: copy
        from: '{loop.value.location}'
        to: aws_s3/archive/{timestamp.YYYY}/{timestamp.MM}/{loop.value.name}'

      - type: delete
        location: '{loop.value.location}'
        on_failure: warn

  - type: log
    message: 'Archived and deleted {length(state.temp_files.result)} files'

Key Concepts:

  • File discovery using list

  • Iterative processing with group

  • Archiving files with copy

  • Cleanup using delete

  • Logging results with log

  • Using timestamps in file paths

  • Error handling with on_failure

Advanced Pipeline with Groups and Conditions

This pipeline uses nested groups, conditions, and multiple step types for a complex workflow.

steps:
  - type: group
    id: preparation
    steps:
      - type: log
        message: 'Preparing environment'

      - type: command
        command: mkdir -p /tmp/processing
        print: true

  - type: replication
    path: replications/main.yaml
    id: main_rep

  - type: group
    if: state.main_rep.status == "success"
    steps:
      - type: query
        connection: target_db
        query: VACUUM ANALYZE {state.main_rep.object.full_name}

      - type: log
        message: 'Optimization complete'

  - type: group
    if: state.main_rep.status == "error"
    steps:
      - type: log
        message: 'Error occurred: {state.main_rep.error}'
        level: error

      - type: http
        url: https://errors.example.com/report
        method: POST
        payload: '{state.main_rep}'

Key Concepts:

  • Organized workflows using group

  • Logging with log

  • System commands via command

  • Data replication with replication

  • Database optimization using query

  • Nested groups for organization

  • Conditional execution based on previous step status

  • Error handling branch

  • JSON serialization with tojson

These examples demonstrate the flexibility of Sling Pipelines and how to use built-in functions in expressions. You can combine and extend them based on your specific needs. For more details on individual step types, refer to the Hooks documentation. For a full list of available functions, see the functions documentation.

Last updated

Was this helpful?