Hooks / Steps

Execute custom actions throughout your replication or pipeline

Hooks are powerful mechanisms in Sling that allow you to execute custom actions before (pre-hooks) or after (post-hooks) a replication stream, as well as at the start or end of the replication parent cycle (before first stream and/or after last stream). They enable you to extend and customize your data pipeline with various operations such as data validation, notifications, file management, and custom processing.

Some typical operations include:

Stream Level

Pre-Hooks: Execute before the replication stream run starts

  • Validate prerequisites

  • Download necessary files

  • Set up configurations

  • Perform cleanup operations

Post-Hooks: Execute after the replication stream run completes

  • Validate results

  • Send notifications

  • Upload processed files

  • Clean up temporary files

  • Log completion status

Pre/Post-Merge-Hooks: Execute in transaction session, before/after data is loaded/merged into final table

  • Set specific session setting and configuration

  • Alter table holding the temporary data, prior to merge

  • Run Specific SQL queries on other tables

Available Hook Types

Hook Type
Description
Documentation

Check

Validate conditions and control flow

Command

Run any command/process

Copy

Transfer files between local or remote storage connections

Delete

Remove files from local or remote storage connections

Group

Run sequences of steps or loop over values

HTTP

Make HTTP requests to external services

Inspect

Inspect a file or folder

List

List files in folder

Log

Output custom messages and create audit trails

Query

Execute SQL queries against any defined connection

Replication

Run a Replication

Routine

Execute reusable step sequences from external files

Store

Store values for later in-process access

Read

Read contents of files from storage connections

Write

Write content to files in storage connections

Hook Configuration

Hooks can be configured in two locations:

  • At the defaults level (applies to all streams)

  • At the individual stream level (overrides defaults)

Stream level Hooks

We can use the following structure to decare hooks with the hooks key, under the defaults branch or under any stream branch.

defaults:
  ...

streams:
  my_stream:
    hooks:
      # Prior to stream beginning
      pre:
        - type: log
          # hook configuration...

      # Inside session/transaction, before merge, right after BEGIN
      pre_merge:
        - type: query
          # hook configuration...

      # Inside session/transaction, after merge, right before COMMIT
      post_merge:
        - type: query
          # hook configuration...

      # After stream finishes
      post:
        - type: http
          # hook configuration...

Replication level Hooks

We can also define hooks to run at the replication file level, meaning before any of the streams run and/or after all the streams have ran. For replication level hooks, we must declare the start and end hooks at the root of the YAML configuration.

# replication level hooks need to be set at the root of the YAML
hooks:
  start:
    - type: query
      # hook configuration...

  end:
    - type: http
      # hook configuration...

defaults:
  ...

streams:
  ...

Common Hook Properties

All hook types share some common properties:

Property
Description
Required

type

The type of hook (query/ http/ check/ copy / delete/ log / inspect)

Yes

if

Optional condition to determine if the hook should execute

No

id

a specify identifier to refer to the hook output data.

No

on_failure

What to do if the hook fails (abort/ warn/ quiet/skip/break)

No (defaults to abort)

Variables Available

  • runtime_state - Contains all state variables available

  • state.* - All hooks output state information (keyed by hook id)

  • store.* - All stored values from previous hooks

  • env.* - All variables defined in the env

  • timestamp.* - Various timestamp parts information

  • execution.* - Replication run level information

  • source.* - Source connection information

  • target.* - Target connection information

  • stream.* - Current source stream info

  • object.* - Current target object info

  • runs.* - All runs information (keyed by stream run id)

  • run.* - Current stream run information

Nested Fields

timestamp.* Fields
Field
Type
Description
Example

timestamp

datetime

Full timestamp object

2025-01-19T08:27:31.473303-05:00

unix

integer

Unix epoch timestamp

1737286051

file_name

string

Timestamp formatted for file names

2025_01_19_082731

rfc3339

string

RFC3339 formatted timestamp

2025-01-19T08:27:31-05:00

date

string

Date only

2025-01-19

datetime

string

Date and time

2025-01-19 08:27:31

YYYY

string

Four-digit year

2025

YY

string

Two-digit year

25

MMM

string

Three-letter month abbreviation

Jan

MM

string

Two-digit month

01

DD

string

Two-digit day

19

DDD

string

Three-letter day abbreviation

Sun

HH

string

Two-digit hour (24-hour format)

08

execution.* Fields
Field
Type
Description
Example

id

string

Unique execution identifier

2rxeplXz2UqdIML1NncvWKNQuwD

string

string

Path to the replication configuration file

/path/to/replication.yaml

total_bytes

integer

Total bytes processed across all runs

6050

total_rows

integer

Total rows processed across all runs

34

status.count

integer

Total number of streams

1

status.success

integer

Number of successful streams

1

status.running

integer

Number of running streams

0

status.skipped

integer

Number of skipped streams

0

status.cancelled

integer

Number of cancelled streams

0

status.warning

integer

Number of streams with warnings

0

status.error

integer

Number of errored streams

0

start_time

datetime

Execution start time

2025-01-19T08:27:22.988403-05:00

end_time

datetime

Execution end time

2025-01-19T08:27:31.472684-05:00

duration

integer

Execution duration in seconds

8

error

string/null

Error message if execution failed

null

source.* / target.* Connection Fields
Field
Type
Description
Example (Source)
Example (Target)

name

string

Connection name

aws_s3

postgres

type

string

Connection type

s3

postgres

kind

string

Connection kind

file

database

bucket

string

S3/GCS bucket name

my-bucket-1

``

container

string

Azure container name

``

``

database

string

Database name

``

postgres

instance

string

Database instance

``

``

schema

string

Default schema

``

public

stream.* Fields
Field
Type
Description
Example

file_folder

string

Parent folder of the file

update_dt_year=2018

file_name

string

Name of the file

update_dt_month=11

file_ext

string

File extension

parquet

file_path

string

Full file path

test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11

name

string

Stream name pattern

test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/

schema

string

Schema name (for database sources)

``

schema_lower

string

Schema name in lowercase

``

schema_upper

string

Schema name in uppercase

``

table

string

Table name (for database sources)

``

table_lower

string

Table name in lowercase

``

table_upper

string

Table name in uppercase

``

full_name

string

Full stream identifier

s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/

object.* Fields
Field
Type
Description
Example

schema

string

Target schema name

public

table

string

Target table name

test1k_postgres_pg_parquet

name

string

Quoted object name

"public"."test1k_postgres_pg_parquet"

full_name

string

Full quoted object name

"public"."test1k_postgres_pg_parquet"

temp_schema

string

Temporary schema name

public

temp_table

string

Temporary table name

test1k_postgres_pg_parquet_tmp

temp_full_name

string

Full temporary table name

"public"."test1k_postgres_pg_parquet_tmp"

run.* Fields
Field
Type
Description
Example

id

string

Run identifier

test_public_test1k

stream.*

object

Stream information (see stream fields above)

{...}

object.*

object

Object information (see object fields above)

{...}

total_bytes

integer

Total bytes processed in this run

6050

total_rows

integer

Total rows processed in this run

34

status

string

Run status

success

start_time

datetime

Run start time

2025-01-19T08:27:22.988403-05:00

end_time

datetime

Run end time

2025-01-19T08:27:31.472684-05:00

duration

integer

Run duration in seconds

8

incremental_value

any

The incremental value used

2025-01-19T08:27:31.472684-05:00

range

string

The start/end range values used

2025-01-01,2025-02-01

error

string/null

Error message if run failed

null

config.mode

string

Replication mode

incremental

config.object

string

Target object

"public"."test1k_postgres_pg_parquet"

config.primary_key

array

Primary key columns

["id"]

config.update_key

string

Update key column

update_dt

config.source_options

object

Source-specific options

{}

config.target_options

object

Target-specific options

{}

runtime_state Payload

The best way to view any available variables is to print the runtime_state variable.

For example, using the log hook as shown below will print all available variables.

- type: log
  message: '{runtime_state}'

Shows something like below JSON payload.

runtime_state Payload
{
  "state": {
    "end-01": {},
    "start-01": {
      "level": "info",
      "message": "{...}",
      "status": "success"
    },
    "start-02": {
      "path": "sling-state/test/r.19",
      "status": "success"
    }
  },
  "store": {
    "my_key": "my_value"
  },
  "env": {
    "RESET": "true",
    "SLING_STATE": "aws_s3/sling-state/test/r.19"
  },
  "timestamp": {
    "timestamp": "2025-01-19T08:27:31.473303-05:00",
    "unix": 1737286051,
    "file_name": "2025_01_19_082731",
    "rfc3339": "2025-01-19T08:27:31-05:00",
    "date": "2025-01-19",
    "datetime": "2025-01-19 08:27:31",
    "YYYY": "2025",
    "YY": "25",
    "MMM": "Jan",
    "MM": "01",
    "DD": "19",
    "DDD": "Sun",
    "HH": "08"
  },
  "source": {
    "name": "aws_s3",
    "type": "s3",
    "kind": "file",
    "bucket": "my-bucket-1",
    "container": "",
    "database": "",
    "instance": "",
    "schema": ""
  },
  "target": {
    "name": "postgres",
    "type": "postgres",
    "kind": "database",
    "bucket": "",
    "container": "",
    "database": "postgres",
    "instance": "",
    "schema": "public"
  },
  "stream": {
    "file_folder": "update_dt_year=2018",
    "file_name": "update_dt_month=11",
    "file_ext": "parquet",
    "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
    "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
    "schema": "",
    "schema_lower": "",
    "schema_upper": "",
    "table": "",
    "table_lower": "",
    "table_upper": "",
    "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
  },
  "object": {
    "schema": "public",
    "table": "test1k_postgres_pg_parquet",
    "name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "temp_schema": "public",
    "temp_table": "test1k_postgres_pg_parquet_tmp",
    "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
  },
  "runs": {
    "test_public_test1k": {
      "id": "test_public_test1k",
      "stream": {
        "file_folder": "update_dt_year=2018",
        "file_name": "update_dt_month=11",
        "file_ext": "parquet",
        "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
        "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
        "schema": "",
        "schema_lower": "",
        "schema_upper": "",
        "table": "",
        "table_lower": "",
        "table_upper": "",
        "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
      },
      "object": {
        "schema": "public",
        "table": "test1k_postgres_pg_parquet",
        "name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "temp_schema": "public",
        "temp_table": "test1k_postgres_pg_parquet_tmp",
        "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
      },
      "total_bytes": 6050,
      "total_rows": 34,
      "status": "success",
      "start_time": "2025-01-19T08:27:22.988403-05:00",
      "end_time": "2025-01-19T08:27:31.472684-05:00",
      "duration": 8,
      "error": null,
      "config": {
        "mode": "incremental",
        "object": "\"public\".\"test1k_postgres_pg_parquet\"",
        "primary_key": [
          "id"
        ],
        "update_key": "update_dt",
        "source_options": {},
        "target_options": {},
        "single": false,
        "hooks": {}
      }
    }
  },
  "execution": {
    "id": "2rxeplXz2UqdIML1NncvWKNQuwD",
    "string": "/path/to/replication.yaml",
    "total_bytes": 6050,
    "total_rows": 34,
    "status": {
      "count": 1,
      "success": 1,
      "running": 0,
      "skipped": 0,
      "cancelled": 0,
      "warning": 0,
      "error": 0
    },
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 8,
    "error": null
  },
  "run": {
    "id": "test_public_test1k",
    "stream": {
      "file_folder": "update_dt_year=2018",
      "file_name": "update_dt_month=11",
      "file_ext": "parquet",
      "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
      "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
      "schema": "",
      "schema_lower": "",
      "schema_upper": "",
      "table": "",
      "table_lower": "",
      "table_upper": "",
      "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
    },
    "object": {
      "schema": "public",
      "table": "test1k_postgres_pg_parquet",
      "name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "temp_schema": "public",
      "temp_table": "test1k_postgres_pg_parquet_tmp",
      "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
    },
    "total_bytes": 6050,
    "total_rows": 34,
    "status": "success",
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 8,
    "error": null,
    "config": {
      "mode": "incremental",
      "object": "\"public\".\"test1k_postgres_pg_parquet\"",
      "primary_key": [
        "id"
      ],
      "update_key": "update_dt",
      "source_options": {},
      "target_options": {},
      "single": false,
      "hooks": {}
    }
  }
}

Furthermore, we can access any data-point using a jmespath expression:

  • state["start-02"].status - Gets the status of a hook (returns success)

  • store.my_key - Gets a stored value from the store (returns my_value)

  • run.total_rows - Gets the number of rows processed in the current run (returns 34)

  • run.duration - Gets the duration of the current run in seconds (returns 8)

  • timestamp.unix - The epoch/unix timestamp (returns 1737286051)

  • source.bucket - Gets the source S3 bucket name (returns my-bucket-1)

  • target.database - Gets the target database name (returns postgres)

  • run.config.primary_key[0] - Gets the first primary key column (returns id)

  • stream.file_path - Gets the current stream's file path (returns test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11)

  • stream.file_ext - Gets the file extension (returns parquet)

  • stream.schema_lower - Gets the stream schema name in lowercase

  • stream.table_upper - Gets the stream table name in uppercase

  • object.temp_full_name - Gets the temporary table full name (returns "public"."test1k_postgres_pg_parquet_tmp")

  • execution.status.error - Gets the count of errored streams (returns 0)

  • execution.total_bytes - Gets the total bytes processed across all runs (returns 6050)

  • runs["test_public_test1k"].status - Gets the status of a specific run by ID (returns success)

Complete Example

# replication level hooks need to be set at the root of the YAML
hooks:
  # runs in order before replication starts.
  start:
      - type: query
        query: select ....
        id: my_query # can use `{state.my_query.result[0].col1}` later
        on_failure: abort
      
  # runs in order after all streams have completed.
  end:
      # check for any errors. if errored, do not proceed (break)
      - type: check
        check: execution.status.error == 0
        on_failure: break
      
      - type: query
        query: update ....
        into: result # can use `{result.col1}` later
        on_failure: abort

defaults:
  hooks:
    pre:
      - type: query
        connection: source_db
        query: "UPDATE status SET running = true"
        on_failure: abort

    post:
      - type: check
        check: "run.total_rows > 0"
        on_failure: warn
      
      - type: http
        if: run.status == "success"
        url: "https://api.example.com/webhook"
        method: POST
        payload: | # my_query.result will serialize into an array of objects
          {
            "status": "complete",
            "name": "{ my_query.result[0].name }"
            "records": {my_query.result}}
          }

streams:
  public.users:
    hooks:
      # runs in order before stream run
      pre:
        - type: query
          query: update ....
          on_failure: abort

      # runs in order after stream run
      post:
        - type: http
          url: https://my.webhook/path
          method: POST
          payload: |
            {"result": "{run.status}"}

Best Practices

  1. Error Handling: Specify appropriate on_failure behavior. The default value is abort.

  2. Validation: Use check hooks to validate prerequisites and results

  3. Logging: Implement log hooks for better observability

  4. Cleanup: Use delete hooks to manage temporary / old files

  5. Modularity: Break down complex operations into multiple hooks

  6. Conditions: Use if conditions to control hook execution

  7. Environment Awareness: Consider different environments in hook configurations

Last updated

Was this helpful?