Routine

Routine hooks allow you to execute reusable step sequences loaded from external YAML files. This is particularly useful for creating shared templates, standardizing common workflows, and maintaining DRY (Don't Repeat Yourself) principles across your data pipelines.

Configuration

- type: routine
  routine: "my_routine_name"    # Required: Name of the routine to execute
  params:                       # Optional: Parameters to pass to the routine
    param1: "value1"
    param2: "value2"
  env:                         # Optional: Environment variables for all steps
    ENV_VAR1: "value1"
    ENV_VAR2: "value2"
  on_failure: abort            # Optional: abort/warn/quiet/skip
  id: my_id                    # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

routine

Yes

Name of the routine to execute (must exist in a routine file)

params

No

Map of parameters to pass to the routine steps

env

No

Map of environment variables available to all steps in the routine

on_failure

No

What to do if any step fails (abort/warn/quiet/skip)

Routine File Structure

Routines are loaded from YAML files in the directory specified by the SLING_ROUTINES_DIR environment variable. Each file can contain multiple named routines:

# Example: /path/to/routines/common_tasks.yaml
routines:
  # required_params: [table_name, connection]
  validate_and_log:
    - type: log
      message: "Starting validation for {params.table_name}"
    - type: query
      connection: "{params.connection}"
      query: "SELECT COUNT(*) as count FROM {params.table_name}"
      id: count_check
    - type: log
      message: "Table has {count_check.result[0].count} rows"

  cleanup_temp_tables:
    - type: log
      message: "Cleaning up temporary tables"
    - type: query
      connection: target_db
      query: "DROP TABLE IF EXISTS temp_staging"
    - type: query
      connection: target_db
      query: "DROP TABLE IF EXISTS temp_backup"

Required Parameters

You can specify required parameters for a routine using a comment above the routine name. This ensures that all necessary parameters are provided when the routine is called:

routines:
  # required_params: [table_name, connection, backup_path]
  backup_table:
    - type: log
      message: "Backing up {params.table_name} to {params.backup_path}"
    - type: copy
      from: "{params.connection}/{params.table_name}"
      to: "{params.backup_path}/{params.table_name}"

If a required parameter is not provided when calling the routine, the execution will fail with an error message listing the missing parameters:

# This will fail because 'backup_path' is missing
- type: routine
  routine: "backup_table"
  params:
    table_name: "customers"
    connection: "prod_db"
    # Missing: backup_path

Error message:

routine (backup_table) requires params that were not provided: ["backup_path"]

Notes about required_params:

  • Must be specified as a YAML comment directly above the routine name

  • Use YAML array syntax: # required_params: [param1, param2, param3]

  • Parameter names should match exactly what you reference in the routine steps

  • Validation happens before the routine steps are executed

  • Helps prevent runtime errors and provides clear feedback about missing configuration

Now call in a replication or pipeline:

# replication
hooks:
  post:
    - type: routine
      routine: "validate_and_log"
      params:
        table_name: "{run.object.name}"
        connection: "{target.name}"
      on_failure: abort

env:
  SLING_ROUTINES_DIR: /path/to/routines
---

# pipeline
steps:
  - type: routine
    routine: cleanup_temp_tables

env:
  SLING_ROUTINES_DIR: /path/to/routines

Important Notes:

  • The SLING_ROUTINES_DIR environment variable must be set

  • Only .yaml and .yml files are considered

  • Routine names must be unique across all files in the directory

  • The directory is scanned recursively, so feel free to create sub-folders

You can set the env var in the env section of a replication or pipeline, or as a regular environment variables before running sling:

# Mac/Unix
export SLING_ROUTINES_DIR=path/to/dir

# Windows Powershell
$env:SLING_ROUTINES_DIR = 'C:/path/to/dir'

Output

When the routine hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success      # Status of the hook execution
routine: "my_routine"  # The routine name that was executed

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.routine} - The routine name that was executed

Returning Custom Output from Routines

Routines can return custom output values using the store hook with the output. prefix. Any key starting with output. will be accessible to the caller after the routine completes:

In the routine definition:

# File: /path/to/routines/calculations.yaml
routines:
  # required_params: [input_value]
  calculate_metrics:
    - type: log
      message: "Processing input: {params.input_value}"
      id: process_log
    
    - type: query
      connection: "{params.connection}"
      query: "SELECT COUNT(*) as total_count FROM {params.table_name}"
      id: count_query
    
    # Store output values that will be returned to caller
    - type: store
      map:
        output.log_message: '{state.process_log.message}'
        output.row_count: '{state.count_query.result[0].total_count}'
        output.timestamp: '{timestamp.iso}'

Calling the routine and accessing output:

steps:
  - type: routine
    id: metrics_run
    routine: "calculate_metrics"
    params:
      input_value: "test_data"
      connection: "postgres"
      table_name: "public.customers"
  
  # Access the custom output values
  - type: log
    message: "Routine returned: {state.metrics_run.log_message}"
  
  - type: log
    message: "Total rows: {state.metrics_run.row_count}"
  
  - type: check
    check: state.metrics_run.row_count > 0
    failure_message: "Expected rows to be greater than 0"
  
  # Use output values in subsequent steps
  - type: http
    url: "https://api.example.com/metrics"
    method: POST
    payload:
      row_count: "{state.metrics_run.row_count}"
      processed_at: "{state.metrics_run.timestamp}"

Key Points:

  • Use store hook with key: output.<name> to define return values

  • Access via {state.routine_id.<name>} after the routine completes

  • Any data type can be returned (strings, numbers, objects, arrays)

  • Multiple output values can be set using multiple store hooks

  • Output values are only accessible if the routine completes successfully

Within routine steps, you can access:

  • {params.param_name} - Parameters passed to the routine

  • {env.ENV_VAR} - Environment variables set for the routine

Examples

Basic Routine Usage

Execute a simple routine without parameters:

# In your pipeline/replication hooks:
hooks:
  post:
    - type: routine
      routine: "cleanup_temp_tables"
      on_failure: warn

Routine with Parameters

Pass dynamic parameters to a reusable routine:

# In your pipeline/replication hooks:
hooks:
  post:
    - type: routine
      routine: "validate_and_log"
      params:
        table_name: "{run.object.name}"
        connection: "{target.name}"
      on_failure: abort

Routine with Environment Variables

Set environment variables for all steps in the routine:

hooks:
  post:
    - type: routine
      routine: "backup_and_archive"
      params:
        source_table: "{run.stream.name}"
      env:
        BACKUP_DATE: "{timestamp.date}"
        ENVIRONMENT: "{target.environment}"
      on_failure: abort

Conditional Routine Execution

Execute a routine based on conditions:

hooks:
  post:
    - type: routine
      if: run.total_rows > 10000
      routine: "large_table_optimization"
      params:
        table_name: "{run.object.name}"
        row_count: "{run.total_rows}"

Chaining Routines

Execute multiple routines in sequence:

hooks:
  post:
    - type: routine
      id: validation
      routine: "validate_data_quality"
      params:
        table_name: "{run.object.name}"

    - type: routine
      if: validation.status == "success"
      routine: "send_success_notification"
      params:
        table_name: "{run.object.name}"
        validated_at: "{timestamp.iso}"

Routine with Output Values

Create a routine that returns custom values for use in subsequent steps:

Define the routine:

# File: /path/to/routines/data_quality.yaml
routines:
  # required_params: [connection, table_name]
  check_data_quality:
    - type: query
      connection: "{params.connection}"
      query: "SELECT COUNT(*) as total_rows FROM {params.table_name}"
      id: count_check
    
    - type: query
      connection: "{params.connection}"
      query: "SELECT COUNT(*) as null_rows FROM {params.table_name} WHERE primary_key IS NULL"
      id: null_check
    
    # Calculate quality score
    - type: store
      key: output.total_rows
      value: '{state.count_check.result[0].total_rows}'
    
    - type: store
      key: output.null_rows
      value: '{state.null_check.result[0].null_rows}'
    
    - type: store
      key: output.quality_passed
      value: '{state.null_check.result[0].null_rows == 0}'
    
    - type: log
      message: "Quality check: {state.output.total_rows} total rows, {state.output.null_rows} null rows"

Use the routine and its outputs:

steps:
  - type: routine
    id: quality_check
    routine: "check_data_quality"
    params:
      connection: "prod_db"
      table_name: "customers"
  
  # Use the output values in conditional logic
  - type: log
    if: quality_check.quality_passed == true
    message: "✓ Quality check passed! Processed {quality_check.total_rows} rows"
  
  - type: log
    if: quality_check.quality_passed == false
    message: "✗ Quality check failed! Found {quality_check.null_rows} rows with null primary keys"
  
  # Fail the pipeline if quality check didn't pass
  - type: check
    check: state.quality_check.quality_passed == true
    failure_message: "Data quality check failed: {state.quality_check.null_rows} rows have null primary keys"
  
  # Send notification with quality metrics
  - type: http
    url: "https://api.slack.com/webhooks/your-webhook"
    method: POST
    payload:
      text: "Data quality report for customers table"
      blocks:
        - type: section
          text:
            type: mrkdwn
            text: |
              *Total Rows:* {state.quality_check.total_rows}
              *Null Rows:* {state.quality_check.null_rows}
              *Status:* {state.quality_check.quality_passed}

Complex Routine Example

Create a comprehensive routine file for data processing:

# File: /path/to/routines/data_processing.yaml
routines:
  # required_params: [table_name, target_connection, replication_config, stream_name]
  full_table_refresh:
    - type: log
      message: "Starting full refresh for {params.table_name}"

    - type: query
      connection: "{params.target_connection}"
      query: "TRUNCATE TABLE {params.table_name}"
      id: truncate_step

    - type: replication
      path: "{params.replication_config}"
      streams: ["{params.stream_name}"]
      mode: "full-refresh"
      id: load_step

    - type: query
      connection: "{params.target_connection}"
      query: |
        UPDATE metadata.refresh_log
        SET last_refresh = CURRENT_TIMESTAMP,
            row_count = (SELECT COUNT(*) FROM {params.table_name})
        WHERE table_name = '{params.table_name}'
      id: update_metadata

    - type: log
      message: "Refresh complete: {update_metadata.result[0].row_count} rows loaded"

  # required_params: [target_connection, replication_config, stream_name, table_name, primary_key]
  incremental_with_dedup:
    - type: log
      message: "Starting incremental load with deduplication"

    - type: query
      connection: "{params.target_connection}"
      query: "CREATE TABLE IF NOT EXISTS {params.table_name}_staging LIKE {params.table_name}"

    - type: replication
      path: "{params.replication_config}"
      streams: ["{params.stream_name}"]
      mode: "incremental"
      id: incremental_load

    - type: query
      connection: "{params.target_connection}"
      query: |
        DELETE FROM {params.table_name}
        WHERE {params.primary_key} IN (
          SELECT {params.primary_key} FROM {params.table_name}_staging
        )

    - type: query
      connection: "{params.target_connection}"
      query: |
        INSERT INTO {params.table_name}
        SELECT * FROM {params.table_name}_staging

    - type: query
      connection: "{params.target_connection}"
      query: "DROP TABLE {params.table_name}_staging"

Usage:

hooks:
  post:
    - type: routine
      routine: "full_table_refresh"
      params:
        table_name: "customers"
        target_connection: "prod_db"
        replication_config: "configs/customers.yaml"
        stream_name: "public.customers"

Last updated

Was this helpful?