Examples
This page provides practical examples of Sling Pipelines, demonstrating how to chain multiple steps for complex data workflows. These examples build upon the concepts from the Pipeline, Hooks and Functions documentation.
Each example includes:
A brief description
The YAML configuration
Key concepts demonstrated
Basic Pipeline with Logging and Replication
This simple pipeline logs the start, runs a replication, and logs the completion with runtime state.
steps:
- type: log
message: 'Starting pipeline execution on {date_format(now(), "%Y-%m-%d %H:%M:%S")}. Runtime state: {runtime_state}'
- type: replication
path: path/to/your/replication.yaml
id: main_replication
- type: log
message: 'Pipeline completed. Final state: {runtime_state}'
level: info
- type: command
command: 'echo "Replication status: {upper(state.main_replication.status)}"'
print: trueKey Concepts:
Basic sequencing of steps
Using
logfor monitoring withdate_formatandnowfunctionsRunning a
replicationas a stepAccessing state from previous steps
Executing system commands with
commandusingupperfunction
File Processing Pipeline with Looping
This pipeline lists files from S3, copies them to Azure, and logs the process using a group for looping.
steps:
- type: list
id: s3_files
location: aws_s3/your-bucket/files/
recursive: true
only: files
- type: group
loop: state.s3_files.result
steps:
- type: log
message: 'Processing file {loop.index + 1}: {loop.value.name} ({loop.value.size} bytes)'
- type: copy
from: '{loop.value.location}'
to: azure_storage/processed/{coalesce(loop.value.name, "unnamed_file")}.processed
id: file_copy
- type: log
if: state.file_copy.bytes_written > 0
message: 'Successfully copied {state.file_copy.bytes_written} bytes'
level: info
- type: log
message: 'Processed {length(state.s3_files.result)} files'Key Concepts:
Listing files with
listLooping with
groupandloopConditional logging with
logFile transfer using
copyAccessing loop variables (
loop.index,loop.value)Using functions like
upper,split_part,coalesce, andlengthin expressions
Data Quality Pipeline
This pipeline runs a replication, performs quality checks via queries, and notifies if issues are found.
steps:
- type: replication
path: replications/data_sync.yaml
id: data_sync
- type: query
connection: target_db
query: |
SELECT COUNT(*) as invalid_count
FROM target_schema.my_table
WHERE some_column IS NULL
id: quality_check
into: qc_results
- type: check
check: store.qc_results[0].invalid_count == 0
failure_message: 'Found {store.qc_results[0].invalid_count} invalid records'
on_failure: warn
- type: http
if: store.qc_results[0].invalid_count > 0
url: https://alerts.example.com/notify
method: POST
payload: |
{
"issue": "Data quality failure",
"details": "Invalid records: {store.qc_results[0].invalid_count}",
"table": "target_schema.my_table",
"checked_at": "{date_format(now(), "%Y-%m-%d")}"
}Key Concepts:
Running replications with
replicationExecuting database queries with
queryValidation with
checkSending notifications via
httpusingdate_formatfunctionStoring query results with
intoConditional execution
Accessing stored values with
store.
Cleanup and Archiving Pipeline
This pipeline archives files after processing and cleans up temporary data.
steps:
- type: list
id: temp_files
location: local//tmp/processing/
only: files
- type: group
loop: state.temp_files.result
steps:
- type: copy
from: '{loop.value.location}'
to: aws_s3/archive/{timestamp.YYYY}/{timestamp.MM}/{loop.value.name}'
- type: delete
location: '{loop.value.location}'
on_failure: warn
- type: log
message: 'Archived and deleted {length(state.temp_files.result)} files'Key Concepts:
File discovery using
listIterative processing with
groupArchiving files with
copyCleanup using
deleteLogging results with
logUsing timestamps in file paths
Error handling with
on_failure
Advanced Pipeline with Groups and Conditions
This pipeline uses nested groups, conditions, and multiple step types for a complex workflow.
steps:
- type: group
id: preparation
steps:
- type: log
message: 'Preparing environment'
- type: command
command: mkdir -p /tmp/processing
print: true
- type: replication
path: replications/main.yaml
id: main_rep
- type: group
if: state.main_rep.status == "success"
steps:
- type: query
connection: target_db
query: VACUUM ANALYZE {state.main_rep.object.full_name}
- type: log
message: 'Optimization complete'
- type: group
if: state.main_rep.status == "error"
steps:
- type: log
message: 'Error occurred: {state.main_rep.error}'
level: error
- type: http
url: https://errors.example.com/report
method: POST
payload: '{state.main_rep}'Key Concepts:
Organized workflows using
groupLogging with
logSystem commands via
commandData replication with
replicationDatabase optimization using
queryNested groups for organization
Conditional execution based on previous step status
Error handling branch
JSON serialization with
tojson
These examples demonstrate the flexibility of Sling Pipelines and how to use built-in functions in expressions. You can combine and extend them based on your specific needs. For more details on individual step types, refer to the Hooks documentation. For a full list of available functions, see the functions documentation.
Last updated
Was this helpful?