Hooks / Steps

Execute custom actions throughout your replication or pipeline

Hooks are powerful mechanisms in Sling that allow you to execute custom actions before (pre-hooks) or after (post-hooks) a replication stream, as well as at the start or end of the replication parent cycle (before first stream and/or after last stream). They enable you to extend and customize your data pipeline with various operations such as data validation, notifications, file management, and custom processing.

Some typical operations include:

Pre-Hooks: Execute before the replication stream run starts

  • Validate prerequisites

  • Download necessary files

  • Set up configurations

  • Perform cleanup operations

Post-Hooks: Execute after the replication stream run completes

  • Validate results

  • Send notifications

  • Upload processed files

  • Clean up temporary files

  • Log completion status

Available Hook Types

Hook Type
Description
Documentation

Check

Validate conditions and control flow

Command

Run any command/process

Copy

Transfer files between local or remote storage connections

Delete

Remove files from local or remote storage connections

Group

Run sequences of steps or loop over values

HTTP

Make HTTP requests to external services

Inspect

Inspect a file or folder

List

List files in folder

Log

Output custom messages and create audit trails

Query

Execute SQL queries against any defined connection

Replication

Run a Replication

Hook Configuration

Hooks can be configured in two locations:

  • At the defaults level (applies to all streams)

  • At the individual stream level (overrides defaults)

Stream level Hooks

We can use the following structure to decare hooks with the hooks key, under the defaults branch or under any stream branch.

defaults:
  ...

streams:
  my_stream:
    hooks:
      pre:
        - type: query
          # hook configuration...

      post:
        - type: http
          # hook configuration...

Replication level Hooks

We can also define hooks to run at the replication file level, meaning before any of the streams run and/or after all the streams have ran. For replication level hooks, we must declare the start and end hooks at the root of the YAML configuration.

# replication level hooks need to be set at the root of the YAML
hooks:
  start:
    - type: query
      # hook configuration...

  end:
    - type: http
      # hook configuration...

defaults:
  ...

streams:
  ...

Common Hook Properties

All hook types share some common properties:

Property
Description
Required

type

The type of hook (query/ http/ check/ copy / delete/ log / inspect)

Yes

if

Optional condition to determine if the hook should execute

No

id

a specify identifier to refer to the hook output data.

No

on_failure

What to do if the hook fails (abort/ warn/ quiet/skip)

No (defaults to abort)

Variables Available

  • runtime_state - Contains all state variables available

  • env.* - All variables defined in the env

  • timestamp.* - Various timestamp parts information

  • source.* - Source connection information

  • execution.*- Replication run level information

  • target.* - Target connection information

  • stream.* - Current source stream info

  • object.* - Current target object info

  • state.* - All hooks output state information

  • runs.* - All runs information

  • run.* - Current stream run information

The best way to view any available variables is to print the runtime_state variable.

For example, using the log hook as shown below will print all available variables.

- type: log
  message: '{runtime_state}'

Shows something like below JSON payload.

runtime_state Payload
{
  "state": {
    "end-01": {},
    "start-01": {
      "level": "info",
      "message": "{...}",
      "status": "success"
    },
    "start-02": {
      "path": "sling-state/test/r.19",
      "status": "success"
    }
  },
  "env": {
    "RESET": "true",
    "SLING_STATE": "aws_s3/sling-state/test/r.19"
  },
  "timestamp": {
    "timestamp": "2025-01-19T08:27:31.473303-05:00",
    "unix": 1737286051,
    "file_name": "2025_01_19_082731",
    "rfc3339": "2025-01-19T08:27:31-05:00",
    "date": "2025-01-19",
    "datetime": "2025-01-19 08:27:31",
    "YYYY": "2025",
    "YY": "25",
    "MMM": "Jan",
    "MM": "01",
    "DD": "19",
    "DDD": "Sun",
    "HH": "08"
  },
  "source": {
    "name": "aws_s3",
    "type": "s3",
    "kind": "file",
    "bucket": "my-bucket-1"
  },
  "target": {
    "name": "postgres",
    "type": "postgres",
    "kind": "database",
    "database": "postgres",
    "schema": "public"
  },
  "stream": {
    "file_folder": "update_dt_year=2018",
    "file_name": "update_dt_month=11",
    "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
    "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
    "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
  },
  "object": {
    "schema": "public",
    "table": "test1k_postgres_pg_parquet",
    "name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "full_name": "\"public\".\"test1k_postgres_pg_parquet\""
  },
  "runs": {
    "test_public_test1k": {
      "stream": {
        "file_folder": "update_dt_year=2018",
        "file_name": "update_dt_month=11",
        "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
        "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
        "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
      },
      "object": {
        "schema": "public",
        "table": "test1k_postgres_pg_parquet",
        "name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "full_name": "\"public\".\"test1k_postgres_pg_parquet\""
      },
      "total_bytes": 6050,
      "total_rows": 34,
      "status": "success",
      "start_time": "2025-01-19T08:27:22.988403-05:00",
      "end_time": "2025-01-19T08:27:31.472684-05:00",
      "config": {
        "mode": "incremental",
        "object": "\"public\".\"test1k_postgres_pg_parquet\"",
        "primary_key": [
          "id"
        ],
        "update_key": "update_dt",
        "source_options": {},
        "target_options": {},
        "single": false,
        "hooks": {}
      }
    }
  },
  "execution": {
    "id": "2rxeplXz2UqdIML1NncvWKNQuwD",
    "status": {
      "count": 1,
      "success": 1,
      "running": 0,
      "skipped": 0,
      "cancelled": 0,
      "warning": 0,
      "error": 0
    },
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 2
  },
  "run": {
    "stream": {
      "file_folder": "update_dt_year=2018",
      "file_name": "update_dt_month=11",
      "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
      "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
      "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
    },
    "object": {
      "schema": "public",
      "table": "test1k_postgres_pg_parquet",
      "name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "full_name": "\"public\".\"test1k_postgres_pg_parquet\""
    },
    "total_bytes": 6050,
    "total_rows": 34,
    "status": "success",
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "config": {
      "mode": "incremental",
      "object": "\"public\".\"test1k_postgres_pg_parquet\"",
      "primary_key": [
        "id"
      ],
      "update_key": "update_dt",
      "source_options": {},
      "target_options": {},
      "single": false,
      "hooks": {}
    }
  }
}

Furthermore, we can access any data-point using a jmespath expression:

  • state["start-02"].status - Gets the status of a hook (returns success)

  • run.total_rows - Gets the number of rows processed in the current run (returns 34)

  • timestamp.unix - The epoch/unix timestamp (return 1737286051)

  • source.bucket - Gets the source S3 bucket name (returns my-bucket-1)

  • run.config.primary_key[0] - Gets the first primary key column (returns id)

  • stream.file_path - Gets the current stream's file path (returns test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11)

Complete Example

# replication level hooks need to be set at the root of the YAML
hooks:
  # runs in order before replication starts.
  start:
      - type: query
        query: select ....
        id: my_query # can use `{state.my_query.result[0].col1}` later
        on_failure: abort
      
  # runs in order after all streams have completed.
  end:
      - type: query
        query: update ....
        into: result # can use `{result.col1}` later
        on_failure: abort

defaults:
  hooks:
    pre:
      - type: query
        connection: source_db
        query: "UPDATE status SET running = true"
        on_failure: abort

    post:
      - type: check
        check: "run.total_rows > 0"
        on_failure: warn
      
      - type: http
        if: run.status == "success"
        url: "https://api.example.com/webhook"
        method: POST
        payload: | # my_query.result will serialize into an array of objects
          {
            "status": "complete",
            "name": "{ my_query.result[0].name }"
            "records": {my_query.result}}
          }

streams:
  public.users:
    hooks:
      # runs in order before stream run
      pre:
        - type: query
          query: update ....
          on_failure: abort

      # runs in order after stream run
      post:
        - type: http
          url: https://my.webhook/path
          method: POST
          payload: |
            {"result": "{run.status}"}

Best Practices

  1. Error Handling: Specify appropriate on_failure behavior. The default value is abort.

  2. Validation: Use check hooks to validate prerequisites and results

  3. Logging: Implement log hooks for better observability

  4. Cleanup: Use delete hooks to manage temporary / old files

  5. Modularity: Break down complex operations into multiple hooks

  6. Conditions: Use if conditions to control hook execution

  7. Environment Awareness: Consider different environments in hook configurations

Last updated