# Hooks / Steps

Hooks are powerful mechanisms in Sling that allow you to execute custom actions before (pre-hooks) or after (post-hooks) a replication stream, as well as at the start or end of the replication parent cycle (before first stream and/or after last stream). They enable you to extend and customize your data pipeline with various operations such as data validation, notifications, file management, and custom processing.

{% hint style="success" %}
Hooks are the same as Steps, when using sling in [Pipeline](/concepts/pipeline.md) mode.

Furthermore, Sling Hooks integrate seamlessly with the [Sling VSCode Extension](/sling-cli/vscode.md). The extension provides schema validation, auto-completion, hover documentation, and diagnostics for your hooks configurations, making it easier to author and debug complex workflows.
{% endhint %}

Some typical operations include:

## Stream Level

**Pre-Hooks**: Execute before the replication stream run starts

* Validate prerequisites
* Download necessary files
* Set up configurations
* Perform cleanup operations

**Post-Hooks**: Execute after the replication stream run completes

* Validate results
* Send notifications
* Upload processed files
* Clean up temporary files
* Log completion status

**Pre/Post-Merge-Hooks**: Execute in transaction session, before/after data is loaded/merged into final table

* Set specific session setting and configuration
* Alter table holding the temporary data, prior to merge
* Run Specific SQL queries on other tables

## Available Hook Types

| Hook Type   | Description                                                | Documentation                                      |
| ----------- | ---------------------------------------------------------- | -------------------------------------------------- |
| Check       | Validate conditions and control flow                       | [Check Hook](/concepts/hooks/check.md)             |
| Command     | Run any command/process                                    | [Command Hook](/concepts/hooks/command.md)         |
| Copy        | Transfer files between local or remote storage connections | [Copy Hook](/concepts/hooks/copy.md)               |
| Delete      | Remove files from local or remote storage connections      | [Delete Hook](/concepts/hooks/delete.md)           |
| Group       | Run sequences of steps or loop over values                 | [Group Hook](/concepts/hooks/group.md)             |
| HTTP        | Make HTTP requests to external services                    | [HTTP Hook](/concepts/hooks/http.md)               |
| Inspect     | Inspect a file or folder                                   | [Inspect Hook](/concepts/hooks/inspect.md)         |
| List        | List files in folder                                       | [List Hook](/concepts/hooks/list.md)               |
| Log         | Output custom messages and create audit trails             | [Log Hook](/concepts/hooks/log.md)                 |
| Query       | Execute SQL queries against any defined connection         | [Query Hook](/concepts/hooks/query.md)             |
| Replication | Run a Replication                                          | [Replication Hook](/concepts/hooks/replication.md) |
| Routine     | Execute reusable step sequences from external files        | [Routine Hook](/concepts/hooks/routine.md)         |
| Store       | Store values for later in-process access                   | [Store Hook](/concepts/hooks/store.md)             |
| Read        | Read contents of files from storage connections            | [Read Step](/concepts/hooks/read.md)               |
| Write       | Write content to files in storage connections              | [Write Step](/concepts/hooks/write.md)             |

## Hook Configuration

Hooks can be configured in two locations:

* At the `defaults` level (applies to all streams)
* At the individual `stream` level (overrides defaults)

**Stream level Hooks**

We can use the following structure to decare hooks with the `hooks` key, under the `defaults` branch or under any stream branch.

```yaml
defaults:
  ...

streams:
  my_stream:
    hooks:
      # Prior to stream beginning
      pre:
        - type: log
          # hook configuration...

      # Inside session/transaction, before merge, right after BEGIN
      pre_merge:
        - type: query
          # hook configuration...

      # Inside session/transaction, after merge, right before COMMIT
      post_merge:
        - type: query
          # hook configuration...

      # After stream finishes
      post:
        - type: http
          # hook configuration...
```

**Replication level Hooks**

We can also define hooks to run at the replication file level, meaning before any of the streams run and/or after all the streams have ran. For replication level hooks, we must declare the `start` and `end` hooks at the root of the YAML configuration.

```yaml
# replication level hooks need to be set at the root of the YAML
hooks:
  start:
    - type: query
      # hook configuration...

  end:
    - type: http
      # hook configuration...

defaults:
  ...

streams:
  ...
```

## Common Hook Properties

All hook types share some common properties:

| Property     | Description                                                                       | Required                 |
| ------------ | --------------------------------------------------------------------------------- | ------------------------ |
| `type`       | The type of hook (`query`/ `http`/ `check`/ `copy` / `delete`/ `log` / `inspect`) | Yes                      |
| `if`         | Optional condition to determine if the hook should execute                        | No                       |
| `id`         | a specify identifier to refer to the hook output data.                            | No                       |
| `on_failure` | What to do if the hook fails (`abort`/ `warn`/ `quiet`/`skip`/`break`)            | No (defaults to `abort`) |

## Variables Available

* `runtime_state` - Contains all state variables available
* `state.*` - All hooks output state information (keyed by hook id)
* `store.*` - All stored values from previous hooks
* `env.*` - All variables defined in the `env`
* `timestamp.*` - Various timestamp parts information
* `execution.*` - Replication run level information
* `source.*` - Source connection information
* `target.*` - Target connection information
* `stream.*` - Current source stream info
* `object.*` - Current target object info
* `runs.*` - All runs information (keyed by stream run id)
* `run.*` - Current stream run information

### Nested Fields

<details>

<summary><code>timestamp.*</code> Fields</summary>

| Field       | Type     | Description                        | Example                            |
| ----------- | -------- | ---------------------------------- | ---------------------------------- |
| `timestamp` | datetime | Full timestamp object              | `2025-01-19T08:27:31.473303-05:00` |
| `unix`      | integer  | Unix epoch timestamp               | `1737286051`                       |
| `file_name` | string   | Timestamp formatted for file names | `2025_01_19_082731`                |
| `rfc3339`   | string   | RFC3339 formatted timestamp        | `2025-01-19T08:27:31-05:00`        |
| `date`      | string   | Date only                          | `2025-01-19`                       |
| `datetime`  | string   | Date and time                      | `2025-01-19 08:27:31`              |
| `YYYY`      | string   | Four-digit year                    | `2025`                             |
| `YY`        | string   | Two-digit year                     | `25`                               |
| `MMM`       | string   | Three-letter month abbreviation    | `Jan`                              |
| `MM`        | string   | Two-digit month                    | `01`                               |
| `DD`        | string   | Two-digit day                      | `19`                               |
| `DDD`       | string   | Three-letter day abbreviation      | `Sun`                              |
| `HH`        | string   | Two-digit hour (24-hour format)    | `08`                               |

</details>

<details>

<summary><code>execution.*</code> Fields</summary>

| Field              | Type        | Description                                | Example                            |
| ------------------ | ----------- | ------------------------------------------ | ---------------------------------- |
| `id`               | string      | Unique execution identifier                | `2rxeplXz2UqdIML1NncvWKNQuwD`      |
| `file_path`        | string      | Path to the replication configuration file | `/path/to/replication.yaml`        |
| `file_name`        | string      | Name to the replication configuration file | `replication.yaml`                 |
| `total_bytes`      | integer     | Total bytes processed across all runs      | `6050`                             |
| `total_rows`       | integer     | Total rows processed across all runs       | `34`                               |
| `status.count`     | integer     | Total number of streams                    | `1`                                |
| `status.success`   | integer     | Number of successful streams               | `1`                                |
| `status.running`   | integer     | Number of running streams                  | `0`                                |
| `status.skipped`   | integer     | Number of skipped streams                  | `0`                                |
| `status.cancelled` | integer     | Number of cancelled streams                | `0`                                |
| `status.warning`   | integer     | Number of streams with warnings            | `0`                                |
| `status.error`     | integer     | Number of errored streams                  | `0`                                |
| `start_time`       | datetime    | Execution start time                       | `2025-01-19T08:27:22.988403-05:00` |
| `end_time`         | datetime    | Execution end time                         | `2025-01-19T08:27:31.472684-05:00` |
| `duration`         | integer     | Execution duration in seconds              | `8`                                |
| `error`            | string/null | Error message if execution failed          | `null`                             |

</details>

<details>

<summary><code>source.*</code> / <code>target.*</code> Connection Fields</summary>

| Field       | Type   | Description          | Example (Source) | Example (Target) |
| ----------- | ------ | -------------------- | ---------------- | ---------------- |
| `name`      | string | Connection name      | `aws_s3`         | `postgres`       |
| `type`      | string | Connection type      | `s3`             | `postgres`       |
| `kind`      | string | Connection kind      | `file`           | `database`       |
| `bucket`    | string | S3/GCS bucket name   | `my-bucket-1`    | \`\`             |
| `container` | string | Azure container name | \`\`             | \`\`             |
| `database`  | string | Database name        | \`\`             | `postgres`       |
| `instance`  | string | Database instance    | \`\`             | \`\`             |
| `schema`    | string | Default schema       | \`\`             | `public`         |

</details>

<details>

<summary><code>stream.*</code> Fields</summary>

| Field          | Type   | Description                        | Example                                                                                           |
| -------------- | ------ | ---------------------------------- | ------------------------------------------------------------------------------------------------- |
| `file_folder`  | string | Parent folder of the file          | `update_dt_year=2018`                                                                             |
| `file_name`    | string | Name of the file                   | `update_dt_month=11`                                                                              |
| `file_ext`     | string | File extension                     | `parquet`                                                                                         |
| `file_path`    | string | Full file path                     | `test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11`                   |
| `name`         | string | Stream name pattern                | `test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/`                                |
| `description`  | string | Stream description (if provided)   | \`\`                                                                                              |
| `schema`       | string | Schema name (for database sources) | \`\`                                                                                              |
| `schema_lower` | string | Schema name in lowercase           | \`\`                                                                                              |
| `schema_upper` | string | Schema name in uppercase           | \`\`                                                                                              |
| `table`        | string | Table name (for database sources)  | \`\`                                                                                              |
| `table_lower`  | string | Table name in lowercase            | \`\`                                                                                              |
| `table_upper`  | string | Table name in uppercase            | \`\`                                                                                              |
| `full_name`    | string | Full stream identifier             | `s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/` |

</details>

<details>

<summary><code>object.*</code> Fields</summary>

| Field            | Type   | Description               | Example                                     |
| ---------------- | ------ | ------------------------- | ------------------------------------------- |
| `schema`         | string | Target schema name        | `public`                                    |
| `table`          | string | Target table name         | `test1k_postgres_pg_parquet`                |
| `name`           | string | Quoted object name        | `"public"."test1k_postgres_pg_parquet"`     |
| `full_name`      | string | Full quoted object name   | `"public"."test1k_postgres_pg_parquet"`     |
| `temp_schema`    | string | Temporary schema name     | `public`                                    |
| `temp_table`     | string | Temporary table name      | `test1k_postgres_pg_parquet_tmp`            |
| `temp_full_name` | string | Full temporary table name | `"public"."test1k_postgres_pg_parquet_tmp"` |

</details>

<details>

<summary><code>run.*</code> Fields</summary>

| Field                   | Type        | Description                                  | Example                                 |
| ----------------------- | ----------- | -------------------------------------------- | --------------------------------------- |
| `id`                    | string      | Run identifier                               | `test_public_test1k`                    |
| `stream.*`              | object      | Stream information (see stream fields above) | `{...}`                                 |
| `object.*`              | object      | Object information (see object fields above) | `{...}`                                 |
| `total_bytes`           | integer     | Total bytes processed in this run            | `6050`                                  |
| `total_rows`            | integer     | Total rows processed in this run             | `34`                                    |
| `status`                | string      | Run status                                   | `success`                               |
| `start_time`            | datetime    | Run start time                               | `2025-01-19T08:27:22.988403-05:00`      |
| `end_time`              | datetime    | Run end time                                 | `2025-01-19T08:27:31.472684-05:00`      |
| `duration`              | integer     | Run duration in seconds                      | `8`                                     |
| `incremental_value`     | any         | The incremental value used                   | `2025-01-19T08:27:31.472684-05:00`      |
| `range`                 | string      | The start/end range values used              | `2025-01-01,2025-02-01`                 |
| `error`                 | string/null | Error message if run failed                  | `null`                                  |
| `config.mode`           | string      | Replication mode                             | `incremental`                           |
| `config.object`         | string      | Target object                                | `"public"."test1k_postgres_pg_parquet"` |
| `config.primary_key`    | array       | Primary key columns                          | `["id"]`                                |
| `config.update_key`     | string      | Update key column                            | `update_dt`                             |
| `config.source_options` | object      | Source-specific options                      | `{}`                                    |
| `config.target_options` | object      | Target-specific options                      | `{}`                                    |

</details>

### `runtime_state` Payload

The best way to view any available variables is to print the `runtime_state` variable.

For example, using the `log` hook as shown below will print all available variables.

```yaml
- type: log
  message: '{runtime_state}'
```

Shows something like below JSON payload.

<details>

<summary><code>runtime_state</code> Payload</summary>

```json
{
  "state": {
    "end-01": {},
    "start-01": {
      "level": "info",
      "message": "{...}",
      "status": "success"
    },
    "start-02": {
      "path": "sling-state/test/r.19",
      "status": "success"
    }
  },
  "store": {
    "my_key": "my_value"
  },
  "env": {
    "RESET": "true",
    "SLING_STATE": "aws_s3/sling-state/test/r.19"
  },
  "timestamp": {
    "timestamp": "2025-01-19T08:27:31.473303-05:00",
    "unix": 1737286051,
    "file_name": "2025_01_19_082731",
    "rfc3339": "2025-01-19T08:27:31-05:00",
    "date": "2025-01-19",
    "datetime": "2025-01-19 08:27:31",
    "YYYY": "2025",
    "YY": "25",
    "MMM": "Jan",
    "MM": "01",
    "DD": "19",
    "DDD": "Sun",
    "HH": "08"
  },
  "source": {
    "name": "aws_s3",
    "type": "s3",
    "kind": "file",
    "bucket": "my-bucket-1",
    "container": "",
    "database": "",
    "instance": "",
    "schema": ""
  },
  "target": {
    "name": "postgres",
    "type": "postgres",
    "kind": "database",
    "bucket": "",
    "container": "",
    "database": "postgres",
    "instance": "",
    "schema": "public"
  },
  "stream": {
    "file_folder": "update_dt_year=2018",
    "file_name": "update_dt_month=11",
    "file_ext": "parquet",
    "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
    "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
    "schema": "",
    "schema_lower": "",
    "schema_upper": "",
    "table": "",
    "table_lower": "",
    "table_upper": "",
    "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
  },
  "object": {
    "schema": "public",
    "table": "test1k_postgres_pg_parquet",
    "name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "temp_schema": "public",
    "temp_table": "test1k_postgres_pg_parquet_tmp",
    "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
  },
  "runs": {
    "test_public_test1k": {
      "id": "test_public_test1k",
      "stream": {
        "file_folder": "update_dt_year=2018",
        "file_name": "update_dt_month=11",
        "file_ext": "parquet",
        "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
        "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
        "schema": "",
        "schema_lower": "",
        "schema_upper": "",
        "table": "",
        "table_lower": "",
        "table_upper": "",
        "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
      },
      "object": {
        "schema": "public",
        "table": "test1k_postgres_pg_parquet",
        "name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "temp_schema": "public",
        "temp_table": "test1k_postgres_pg_parquet_tmp",
        "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
      },
      "total_bytes": 6050,
      "total_rows": 34,
      "status": "success",
      "start_time": "2025-01-19T08:27:22.988403-05:00",
      "end_time": "2025-01-19T08:27:31.472684-05:00",
      "duration": 8,
      "error": null,
      "config": {
        "mode": "incremental",
        "object": "\"public\".\"test1k_postgres_pg_parquet\"",
        "primary_key": [
          "id"
        ],
        "update_key": "update_dt",
        "source_options": {},
        "target_options": {},
        "single": false,
        "hooks": {}
      }
    }
  },
  "execution": {
    "id": "2rxeplXz2UqdIML1NncvWKNQuwD",
    "string": "/path/to/replication.yaml",
    "total_bytes": 6050,
    "total_rows": 34,
    "status": {
      "count": 1,
      "success": 1,
      "running": 0,
      "skipped": 0,
      "cancelled": 0,
      "warning": 0,
      "error": 0
    },
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 8,
    "error": null
  },
  "run": {
    "id": "test_public_test1k",
    "stream": {
      "file_folder": "update_dt_year=2018",
      "file_name": "update_dt_month=11",
      "file_ext": "parquet",
      "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
      "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
      "schema": "",
      "schema_lower": "",
      "schema_upper": "",
      "table": "",
      "table_lower": "",
      "table_upper": "",
      "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
    },
    "object": {
      "schema": "public",
      "table": "test1k_postgres_pg_parquet",
      "name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "temp_schema": "public",
      "temp_table": "test1k_postgres_pg_parquet_tmp",
      "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
    },
    "total_bytes": 6050,
    "total_rows": 34,
    "status": "success",
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 8,
    "error": null,
    "config": {
      "mode": "incremental",
      "object": "\"public\".\"test1k_postgres_pg_parquet\"",
      "primary_key": [
        "id"
      ],
      "update_key": "update_dt",
      "source_options": {},
      "target_options": {},
      "single": false,
      "hooks": {}
    }
  }
}
```

</details>

Furthermore, we can access any data-point using a [`jmespath` expression](https://jmespath.org/):

* `state["start-02"].status` - Gets the status of a hook (returns `success`)
* `store.my_key` - Gets a stored value from the store (returns `my_value`)
* `run.total_rows` - Gets the number of rows processed in the current run (returns `34`)
* `run.duration` - Gets the duration of the current run in seconds (returns `8`)
* `timestamp.unix` - The epoch/unix timestamp (returns `1737286051`)
* `source.bucket` - Gets the source S3 bucket name (returns `my-bucket-1`)
* `target.database` - Gets the target database name (returns `postgres`)
* `run.config.primary_key[0]` - Gets the first primary key column (returns `id`)
* `stream.file_path` - Gets the current stream's file path (returns `test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11`)
* `stream.file_ext` - Gets the file extension (returns `parquet`)
* `stream.schema_lower` - Gets the stream schema name in lowercase
* `stream.table_upper` - Gets the stream table name in uppercase
* `object.temp_full_name` - Gets the temporary table full name (returns `"public"."test1k_postgres_pg_parquet_tmp"`)
* `execution.status.error` - Gets the count of errored streams (returns `0`)
* `execution.total_bytes` - Gets the total bytes processed across all runs (returns `6050`)
* `runs["test_public_test1k"].status` - Gets the status of a specific run by ID (returns `success`)

## Complete Example

```yaml
# replication level hooks need to be set at the root of the YAML
hooks:
  # runs in order before replication starts.
  start:
      - type: query
        query: select ....
        id: my_query # can use `{state.my_query.result[0].col1}` later
        on_failure: abort
      
  # runs in order after all streams have completed.
  end:
      # check for any errors. if errored, do not proceed (break)
      - type: check
        check: execution.status.error == 0
        on_failure: break
      
      - type: query
        query: update ....
        into: result # can use `{result.col1}` later
        on_failure: abort

defaults:
  hooks:
    pre:
      - type: query
        connection: source_db
        query: "UPDATE status SET running = true"
        on_failure: abort

    post:
      - type: check
        check: "run.total_rows > 0"
        on_failure: warn
      
      - type: http
        if: run.status == "success"
        url: "https://api.example.com/webhook"
        method: POST
        payload: | # my_query.result will serialize into an array of objects
          {
            "status": "complete",
            "name": "{ my_query.result[0].name }"
            "records": {my_query.result}}
          }

streams:
  public.users:
    hooks:
      # runs in order before stream run
      pre:
        - type: query
          query: update ....
          on_failure: abort

      # runs in order after stream run
      post:
        - type: http
          url: https://my.webhook/path
          method: POST
          payload: |
            {"result": "{run.status}"}
```

## Best Practices

1. **Error Handling**: Specify appropriate `on_failure` behavior. The default value is `abort`.
2. **Validation**: Use `check` hooks to validate prerequisites and results
3. **Logging**: Implement `log` hooks for better observability
4. **Cleanup**: Use `delete` hooks to manage temporary / old files
5. **Modularity**: Break down complex operations into multiple hooks
6. **Conditions**: Use `if` conditions to control hook execution
7. **Environment Awareness**: Consider different environments in hook configurations


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/concepts/hooks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
