Routine
Routine hooks allow you to execute reusable step sequences loaded from external YAML files. This is particularly useful for creating shared templates, standardizing common workflows, and maintaining DRY (Don't Repeat Yourself) principles across your data pipelines.
Configuration
- type: routine
routine: "my_routine_name" # Required: Name of the routine to execute
params: # Optional: Parameters to pass to the routine
param1: "value1"
param2: "value2"
env: # Optional: Environment variables for all steps
ENV_VAR1: "value1"
ENV_VAR2: "value2"
on_failure: abort # Optional: abort/warn/quiet/skip
id: my_id # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.Properties
routine
Yes
Name of the routine to execute (must exist in a routine file)
params
No
Map of parameters to pass to the routine steps
env
No
Map of environment variables available to all steps in the routine
on_failure
No
What to do if any step fails (abort/warn/quiet/skip)
Routine File Structure
Routines are loaded from YAML files in the directory specified by the SLING_ROUTINES_DIR environment variable. Each file can contain multiple named routines:
# Example: /path/to/routines/common_tasks.yaml
routines:
# required_params: [table_name, connection]
validate_and_log:
- type: log
message: "Starting validation for {params.table_name}"
- type: query
connection: "{params.connection}"
query: "SELECT COUNT(*) as count FROM {params.table_name}"
id: count_check
- type: log
message: "Table has {count_check.result[0].count} rows"
cleanup_temp_tables:
- type: log
message: "Cleaning up temporary tables"
- type: query
connection: target_db
query: "DROP TABLE IF EXISTS temp_staging"
- type: query
connection: target_db
query: "DROP TABLE IF EXISTS temp_backup"Required Parameters
You can specify required parameters for a routine using a comment above the routine name. This ensures that all necessary parameters are provided when the routine is called:
routines:
# required_params: [table_name, connection, backup_path]
backup_table:
- type: log
message: "Backing up {params.table_name} to {params.backup_path}"
- type: copy
from: "{params.connection}/{params.table_name}"
to: "{params.backup_path}/{params.table_name}"If a required parameter is not provided when calling the routine, the execution will fail with an error message listing the missing parameters:
# This will fail because 'backup_path' is missing
- type: routine
routine: "backup_table"
params:
table_name: "customers"
connection: "prod_db"
# Missing: backup_pathError message:
routine (backup_table) requires params that were not provided: ["backup_path"]Notes about required_params:
Must be specified as a YAML comment directly above the routine name
Use YAML array syntax:
# required_params: [param1, param2, param3]Parameter names should match exactly what you reference in the routine steps
Validation happens before the routine steps are executed
Helps prevent runtime errors and provides clear feedback about missing configuration
Now call in a replication or pipeline:
# replication
hooks:
post:
- type: routine
routine: "validate_and_log"
params:
table_name: "{run.object.name}"
connection: "{target.name}"
on_failure: abort
env:
SLING_ROUTINES_DIR: /path/to/routines
---
# pipeline
steps:
- type: routine
routine: cleanup_temp_tables
env:
SLING_ROUTINES_DIR: /path/to/routinesImportant Notes:
The
SLING_ROUTINES_DIRenvironment variable must be setOnly
.yamland.ymlfiles are consideredRoutine names must be unique across all files in the directory
The directory is scanned recursively, so feel free to create sub-folders
You can set the env var in the env section of a replication or pipeline, or as a regular environment variables before running sling:
# Mac/Unix
export SLING_ROUTINES_DIR=path/to/dir
# Windows Powershell
$env:SLING_ROUTINES_DIR = 'C:/path/to/dir'Output
When the routine hook executes successfully, it returns the following output that can be accessed in subsequent hooks:
status: success # Status of the hook execution
routine: "my_routine" # The routine name that was executedYou can access these values in subsequent hooks using the following syntax (jmespath):
{state.hook_id.status}- Status of the hook execution{state.hook_id.routine}- The routine name that was executed
Returning Custom Output from Routines
Routines can return custom output values using the store hook with the output. prefix. Any key starting with output. will be accessible to the caller after the routine completes:
In the routine definition:
# File: /path/to/routines/calculations.yaml
routines:
# required_params: [input_value]
calculate_metrics:
- type: log
message: "Processing input: {params.input_value}"
id: process_log
- type: query
connection: "{params.connection}"
query: "SELECT COUNT(*) as total_count FROM {params.table_name}"
id: count_query
# Store output values that will be returned to caller
- type: store
map:
output.log_message: '{state.process_log.message}'
output.row_count: '{state.count_query.result[0].total_count}'
output.timestamp: '{timestamp.iso}'Calling the routine and accessing output:
steps:
- type: routine
id: metrics_run
routine: "calculate_metrics"
params:
input_value: "test_data"
connection: "postgres"
table_name: "public.customers"
# Access the custom output values
- type: log
message: "Routine returned: {state.metrics_run.log_message}"
- type: log
message: "Total rows: {state.metrics_run.row_count}"
- type: check
check: state.metrics_run.row_count > 0
failure_message: "Expected rows to be greater than 0"
# Use output values in subsequent steps
- type: http
url: "https://api.example.com/metrics"
method: POST
payload:
row_count: "{state.metrics_run.row_count}"
processed_at: "{state.metrics_run.timestamp}"Key Points:
Use
storehook withkey: output.<name>to define return valuesAccess via
{state.routine_id.<name>}after the routine completesAny data type can be returned (strings, numbers, objects, arrays)
Multiple output values can be set using multiple
storehooksOutput values are only accessible if the routine completes successfully
Within routine steps, you can access:
{params.param_name}- Parameters passed to the routine{env.ENV_VAR}- Environment variables set for the routine
Examples
Basic Routine Usage
Execute a simple routine without parameters:
# In your pipeline/replication hooks:
hooks:
post:
- type: routine
routine: "cleanup_temp_tables"
on_failure: warnRoutine with Parameters
Pass dynamic parameters to a reusable routine:
# In your pipeline/replication hooks:
hooks:
post:
- type: routine
routine: "validate_and_log"
params:
table_name: "{run.object.name}"
connection: "{target.name}"
on_failure: abortRoutine with Environment Variables
Set environment variables for all steps in the routine:
hooks:
post:
- type: routine
routine: "backup_and_archive"
params:
source_table: "{run.stream.name}"
env:
BACKUP_DATE: "{timestamp.date}"
ENVIRONMENT: "{target.environment}"
on_failure: abortConditional Routine Execution
Execute a routine based on conditions:
hooks:
post:
- type: routine
if: run.total_rows > 10000
routine: "large_table_optimization"
params:
table_name: "{run.object.name}"
row_count: "{run.total_rows}"Chaining Routines
Execute multiple routines in sequence:
hooks:
post:
- type: routine
id: validation
routine: "validate_data_quality"
params:
table_name: "{run.object.name}"
- type: routine
if: validation.status == "success"
routine: "send_success_notification"
params:
table_name: "{run.object.name}"
validated_at: "{timestamp.iso}"Routine with Output Values
Create a routine that returns custom values for use in subsequent steps:
Define the routine:
# File: /path/to/routines/data_quality.yaml
routines:
# required_params: [connection, table_name]
check_data_quality:
- type: query
connection: "{params.connection}"
query: "SELECT COUNT(*) as total_rows FROM {params.table_name}"
id: count_check
- type: query
connection: "{params.connection}"
query: "SELECT COUNT(*) as null_rows FROM {params.table_name} WHERE primary_key IS NULL"
id: null_check
# Calculate quality score
- type: store
key: output.total_rows
value: '{state.count_check.result[0].total_rows}'
- type: store
key: output.null_rows
value: '{state.null_check.result[0].null_rows}'
- type: store
key: output.quality_passed
value: '{state.null_check.result[0].null_rows == 0}'
- type: log
message: "Quality check: {state.output.total_rows} total rows, {state.output.null_rows} null rows"Use the routine and its outputs:
steps:
- type: routine
id: quality_check
routine: "check_data_quality"
params:
connection: "prod_db"
table_name: "customers"
# Use the output values in conditional logic
- type: log
if: quality_check.quality_passed == true
message: "✓ Quality check passed! Processed {quality_check.total_rows} rows"
- type: log
if: quality_check.quality_passed == false
message: "✗ Quality check failed! Found {quality_check.null_rows} rows with null primary keys"
# Fail the pipeline if quality check didn't pass
- type: check
check: state.quality_check.quality_passed == true
failure_message: "Data quality check failed: {state.quality_check.null_rows} rows have null primary keys"
# Send notification with quality metrics
- type: http
url: "https://api.slack.com/webhooks/your-webhook"
method: POST
payload:
text: "Data quality report for customers table"
blocks:
- type: section
text:
type: mrkdwn
text: |
*Total Rows:* {state.quality_check.total_rows}
*Null Rows:* {state.quality_check.null_rows}
*Status:* {state.quality_check.quality_passed}Complex Routine Example
Create a comprehensive routine file for data processing:
# File: /path/to/routines/data_processing.yaml
routines:
# required_params: [table_name, target_connection, replication_config, stream_name]
full_table_refresh:
- type: log
message: "Starting full refresh for {params.table_name}"
- type: query
connection: "{params.target_connection}"
query: "TRUNCATE TABLE {params.table_name}"
id: truncate_step
- type: replication
path: "{params.replication_config}"
streams: ["{params.stream_name}"]
mode: "full-refresh"
id: load_step
- type: query
connection: "{params.target_connection}"
query: |
UPDATE metadata.refresh_log
SET last_refresh = CURRENT_TIMESTAMP,
row_count = (SELECT COUNT(*) FROM {params.table_name})
WHERE table_name = '{params.table_name}'
id: update_metadata
- type: log
message: "Refresh complete: {update_metadata.result[0].row_count} rows loaded"
# required_params: [target_connection, replication_config, stream_name, table_name, primary_key]
incremental_with_dedup:
- type: log
message: "Starting incremental load with deduplication"
- type: query
connection: "{params.target_connection}"
query: "CREATE TABLE IF NOT EXISTS {params.table_name}_staging LIKE {params.table_name}"
- type: replication
path: "{params.replication_config}"
streams: ["{params.stream_name}"]
mode: "incremental"
id: incremental_load
- type: query
connection: "{params.target_connection}"
query: |
DELETE FROM {params.table_name}
WHERE {params.primary_key} IN (
SELECT {params.primary_key} FROM {params.table_name}_staging
)
- type: query
connection: "{params.target_connection}"
query: |
INSERT INTO {params.table_name}
SELECT * FROM {params.table_name}_staging
- type: query
connection: "{params.target_connection}"
query: "DROP TABLE {params.table_name}_staging"Usage:
hooks:
post:
- type: routine
routine: "full_table_refresh"
params:
table_name: "customers"
target_connection: "prod_db"
replication_config: "configs/customers.yaml"
stream_name: "public.customers"Last updated
Was this helpful?