# Hooks / Steps

Hooks are powerful mechanisms in Sling that allow you to execute custom actions before (pre-hooks) or after (post-hooks) a replication stream, as well as at the start or end of the replication parent cycle (before first stream and/or after last stream). They enable you to extend and customize your data pipeline with various operations such as data validation, notifications, file management, and custom processing.

{% hint style="success" %}
Hooks are the same as Steps, when using sling in [Pipeline](https://docs.slingdata.io/concepts/pipeline) mode.

Furthermore, Sling Hooks integrate seamlessly with the [Sling VSCode Extension](https://docs.slingdata.io/sling-cli/vscode). The extension provides schema validation, auto-completion, hover documentation, and diagnostics for your hooks configurations, making it easier to author and debug complex workflows.
{% endhint %}

Some typical operations include:

## Stream Level

**Pre-Hooks**: Execute before the replication stream run starts

* Validate prerequisites
* Download necessary files
* Set up configurations
* Perform cleanup operations

**Post-Hooks**: Execute after the replication stream run completes

* Validate results
* Send notifications
* Upload processed files
* Clean up temporary files
* Log completion status

**Pre/Post-Merge-Hooks**: Execute in transaction session, before/after data is loaded/merged into final table

* Set specific session setting and configuration
* Alter table holding the temporary data, prior to merge
* Run Specific SQL queries on other tables

## Available Hook Types

| Hook Type   | Description                                                | Documentation                                                            |
| ----------- | ---------------------------------------------------------- | ------------------------------------------------------------------------ |
| Check       | Validate conditions and control flow                       | [Check Hook](https://docs.slingdata.io/concepts/hooks/check)             |
| Command     | Run any command/process                                    | [Command Hook](https://docs.slingdata.io/concepts/hooks/command)         |
| Copy        | Transfer files between local or remote storage connections | [Copy Hook](https://docs.slingdata.io/concepts/hooks/copy)               |
| Delete      | Remove files from local or remote storage connections      | [Delete Hook](https://docs.slingdata.io/concepts/hooks/delete)           |
| Group       | Run sequences of steps or loop over values                 | [Group Hook](https://docs.slingdata.io/concepts/hooks/group)             |
| HTTP        | Make HTTP requests to external services                    | [HTTP Hook](https://docs.slingdata.io/concepts/hooks/http)               |
| Inspect     | Inspect a file or folder                                   | [Inspect Hook](https://docs.slingdata.io/concepts/hooks/inspect)         |
| List        | List files in folder                                       | [List Hook](https://docs.slingdata.io/concepts/hooks/list)               |
| Log         | Output custom messages and create audit trails             | [Log Hook](https://docs.slingdata.io/concepts/hooks/log)                 |
| Query       | Execute SQL queries against any defined connection         | [Query Hook](https://docs.slingdata.io/concepts/hooks/query)             |
| Replication | Run a Replication                                          | [Replication Hook](https://docs.slingdata.io/concepts/hooks/replication) |
| Routine     | Execute reusable step sequences from external files        | [Routine Hook](https://docs.slingdata.io/concepts/hooks/routine)         |
| Store       | Store values for later in-process access                   | [Store Hook](https://docs.slingdata.io/concepts/hooks/store)             |
| Read        | Read contents of files from storage connections            | [Read Step](https://docs.slingdata.io/concepts/hooks/read)               |
| Write       | Write content to files in storage connections              | [Write Step](https://docs.slingdata.io/concepts/hooks/write)             |

## Hook Configuration

Hooks can be configured in two locations:

* At the `defaults` level (applies to all streams)
* At the individual `stream` level (overrides defaults)

**Stream level Hooks**

We can use the following structure to decare hooks with the `hooks` key, under the `defaults` branch or under any stream branch.

```yaml
defaults:
  ...

streams:
  my_stream:
    hooks:
      # Prior to stream beginning
      pre:
        - type: log
          # hook configuration...

      # Inside session/transaction, before merge, right after BEGIN
      pre_merge:
        - type: query
          # hook configuration...

      # Inside session/transaction, after merge, right before COMMIT
      post_merge:
        - type: query
          # hook configuration...

      # After stream finishes
      post:
        - type: http
          # hook configuration...
```

**Replication level Hooks**

We can also define hooks to run at the replication file level, meaning before any of the streams run and/or after all the streams have ran. For replication level hooks, we must declare the `start` and `end` hooks at the root of the YAML configuration.

```yaml
# replication level hooks need to be set at the root of the YAML
hooks:
  start:
    - type: query
      # hook configuration...

  end:
    - type: http
      # hook configuration...

defaults:
  ...

streams:
  ...
```

## Common Hook Properties

All hook types share some common properties:

| Property     | Description                                                                       | Required                 |
| ------------ | --------------------------------------------------------------------------------- | ------------------------ |
| `type`       | The type of hook (`query`/ `http`/ `check`/ `copy` / `delete`/ `log` / `inspect`) | Yes                      |
| `if`         | Optional condition to determine if the hook should execute                        | No                       |
| `id`         | a specify identifier to refer to the hook output data.                            | No                       |
| `on_failure` | What to do if the hook fails (`abort`/ `warn`/ `quiet`/`skip`/`break`)            | No (defaults to `abort`) |

## Variables Available

* `runtime_state` - Contains all state variables available
* `state.*` - All hooks output state information (keyed by hook id)
* `store.*` - All stored values from previous hooks
* `env.*` - All variables defined in the `env`
* `timestamp.*` - Various timestamp parts information
* `execution.*` - Replication run level information
* `source.*` - Source connection information
* `target.*` - Target connection information
* `stream.*` - Current source stream info
* `object.*` - Current target object info
* `runs.*` - All runs information (keyed by stream run id)
* `run.*` - Current stream run information

### Nested Fields

<details>

<summary><code>timestamp.*</code> Fields</summary>

| Field       | Type     | Description                        | Example                            |
| ----------- | -------- | ---------------------------------- | ---------------------------------- |
| `timestamp` | datetime | Full timestamp object              | `2025-01-19T08:27:31.473303-05:00` |
| `unix`      | integer  | Unix epoch timestamp               | `1737286051`                       |
| `file_name` | string   | Timestamp formatted for file names | `2025_01_19_082731`                |
| `rfc3339`   | string   | RFC3339 formatted timestamp        | `2025-01-19T08:27:31-05:00`        |
| `date`      | string   | Date only                          | `2025-01-19`                       |
| `datetime`  | string   | Date and time                      | `2025-01-19 08:27:31`              |
| `YYYY`      | string   | Four-digit year                    | `2025`                             |
| `YY`        | string   | Two-digit year                     | `25`                               |
| `MMM`       | string   | Three-letter month abbreviation    | `Jan`                              |
| `MM`        | string   | Two-digit month                    | `01`                               |
| `DD`        | string   | Two-digit day                      | `19`                               |
| `DDD`       | string   | Three-letter day abbreviation      | `Sun`                              |
| `HH`        | string   | Two-digit hour (24-hour format)    | `08`                               |

</details>

<details>

<summary><code>execution.*</code> Fields</summary>

| Field              | Type        | Description                                | Example                            |
| ------------------ | ----------- | ------------------------------------------ | ---------------------------------- |
| `id`               | string      | Unique execution identifier                | `2rxeplXz2UqdIML1NncvWKNQuwD`      |
| `file_path`        | string      | Path to the replication configuration file | `/path/to/replication.yaml`        |
| `file_name`        | string      | Name to the replication configuration file | `replication.yaml`                 |
| `total_bytes`      | integer     | Total bytes processed across all runs      | `6050`                             |
| `total_rows`       | integer     | Total rows processed across all runs       | `34`                               |
| `status.count`     | integer     | Total number of streams                    | `1`                                |
| `status.success`   | integer     | Number of successful streams               | `1`                                |
| `status.running`   | integer     | Number of running streams                  | `0`                                |
| `status.skipped`   | integer     | Number of skipped streams                  | `0`                                |
| `status.cancelled` | integer     | Number of cancelled streams                | `0`                                |
| `status.warning`   | integer     | Number of streams with warnings            | `0`                                |
| `status.error`     | integer     | Number of errored streams                  | `0`                                |
| `start_time`       | datetime    | Execution start time                       | `2025-01-19T08:27:22.988403-05:00` |
| `end_time`         | datetime    | Execution end time                         | `2025-01-19T08:27:31.472684-05:00` |
| `duration`         | integer     | Execution duration in seconds              | `8`                                |
| `error`            | string/null | Error message if execution failed          | `null`                             |

</details>

<details>

<summary><code>source.*</code> / <code>target.*</code> Connection Fields</summary>

| Field       | Type   | Description          | Example (Source) | Example (Target) |
| ----------- | ------ | -------------------- | ---------------- | ---------------- |
| `name`      | string | Connection name      | `aws_s3`         | `postgres`       |
| `type`      | string | Connection type      | `s3`             | `postgres`       |
| `kind`      | string | Connection kind      | `file`           | `database`       |
| `bucket`    | string | S3/GCS bucket name   | `my-bucket-1`    | \`\`             |
| `container` | string | Azure container name | \`\`             | \`\`             |
| `database`  | string | Database name        | \`\`             | `postgres`       |
| `instance`  | string | Database instance    | \`\`             | \`\`             |
| `schema`    | string | Default schema       | \`\`             | `public`         |

</details>

<details>

<summary><code>stream.*</code> Fields</summary>

| Field          | Type   | Description                        | Example                                                                                           |
| -------------- | ------ | ---------------------------------- | ------------------------------------------------------------------------------------------------- |
| `file_folder`  | string | Parent folder of the file          | `update_dt_year=2018`                                                                             |
| `file_name`    | string | Name of the file                   | `update_dt_month=11`                                                                              |
| `file_ext`     | string | File extension                     | `parquet`                                                                                         |
| `file_path`    | string | Full file path                     | `test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11`                   |
| `name`         | string | Stream name pattern                | `test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/`                                |
| `description`  | string | Stream description (if provided)   | \`\`                                                                                              |
| `schema`       | string | Schema name (for database sources) | \`\`                                                                                              |
| `schema_lower` | string | Schema name in lowercase           | \`\`                                                                                              |
| `schema_upper` | string | Schema name in uppercase           | \`\`                                                                                              |
| `table`        | string | Table name (for database sources)  | \`\`                                                                                              |
| `table_lower`  | string | Table name in lowercase            | \`\`                                                                                              |
| `table_upper`  | string | Table name in uppercase            | \`\`                                                                                              |
| `full_name`    | string | Full stream identifier             | `s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/` |

</details>

<details>

<summary><code>object.*</code> Fields</summary>

| Field            | Type   | Description               | Example                                     |
| ---------------- | ------ | ------------------------- | ------------------------------------------- |
| `schema`         | string | Target schema name        | `public`                                    |
| `table`          | string | Target table name         | `test1k_postgres_pg_parquet`                |
| `name`           | string | Quoted object name        | `"public"."test1k_postgres_pg_parquet"`     |
| `full_name`      | string | Full quoted object name   | `"public"."test1k_postgres_pg_parquet"`     |
| `temp_schema`    | string | Temporary schema name     | `public`                                    |
| `temp_table`     | string | Temporary table name      | `test1k_postgres_pg_parquet_tmp`            |
| `temp_full_name` | string | Full temporary table name | `"public"."test1k_postgres_pg_parquet_tmp"` |

</details>

<details>

<summary><code>run.*</code> Fields</summary>

| Field                   | Type        | Description                                  | Example                                 |
| ----------------------- | ----------- | -------------------------------------------- | --------------------------------------- |
| `id`                    | string      | Run identifier                               | `test_public_test1k`                    |
| `stream.*`              | object      | Stream information (see stream fields above) | `{...}`                                 |
| `object.*`              | object      | Object information (see object fields above) | `{...}`                                 |
| `total_bytes`           | integer     | Total bytes processed in this run            | `6050`                                  |
| `total_rows`            | integer     | Total rows processed in this run             | `34`                                    |
| `status`                | string      | Run status                                   | `success`                               |
| `start_time`            | datetime    | Run start time                               | `2025-01-19T08:27:22.988403-05:00`      |
| `end_time`              | datetime    | Run end time                                 | `2025-01-19T08:27:31.472684-05:00`      |
| `duration`              | integer     | Run duration in seconds                      | `8`                                     |
| `incremental_value`     | any         | The incremental value used                   | `2025-01-19T08:27:31.472684-05:00`      |
| `range`                 | string      | The start/end range values used              | `2025-01-01,2025-02-01`                 |
| `error`                 | string/null | Error message if run failed                  | `null`                                  |
| `config.mode`           | string      | Replication mode                             | `incremental`                           |
| `config.object`         | string      | Target object                                | `"public"."test1k_postgres_pg_parquet"` |
| `config.primary_key`    | array       | Primary key columns                          | `["id"]`                                |
| `config.update_key`     | string      | Update key column                            | `update_dt`                             |
| `config.source_options` | object      | Source-specific options                      | `{}`                                    |
| `config.target_options` | object      | Target-specific options                      | `{}`                                    |

</details>

### `runtime_state` Payload

The best way to view any available variables is to print the `runtime_state` variable.

For example, using the `log` hook as shown below will print all available variables.

```yaml
- type: log
  message: '{runtime_state}'
```

Shows something like below JSON payload.

<details>

<summary><code>runtime_state</code> Payload</summary>

```json
{
  "state": {
    "end-01": {},
    "start-01": {
      "level": "info",
      "message": "{...}",
      "status": "success"
    },
    "start-02": {
      "path": "sling-state/test/r.19",
      "status": "success"
    }
  },
  "store": {
    "my_key": "my_value"
  },
  "env": {
    "RESET": "true",
    "SLING_STATE": "aws_s3/sling-state/test/r.19"
  },
  "timestamp": {
    "timestamp": "2025-01-19T08:27:31.473303-05:00",
    "unix": 1737286051,
    "file_name": "2025_01_19_082731",
    "rfc3339": "2025-01-19T08:27:31-05:00",
    "date": "2025-01-19",
    "datetime": "2025-01-19 08:27:31",
    "YYYY": "2025",
    "YY": "25",
    "MMM": "Jan",
    "MM": "01",
    "DD": "19",
    "DDD": "Sun",
    "HH": "08"
  },
  "source": {
    "name": "aws_s3",
    "type": "s3",
    "kind": "file",
    "bucket": "my-bucket-1",
    "container": "",
    "database": "",
    "instance": "",
    "schema": ""
  },
  "target": {
    "name": "postgres",
    "type": "postgres",
    "kind": "database",
    "bucket": "",
    "container": "",
    "database": "postgres",
    "instance": "",
    "schema": "public"
  },
  "stream": {
    "file_folder": "update_dt_year=2018",
    "file_name": "update_dt_month=11",
    "file_ext": "parquet",
    "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
    "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
    "schema": "",
    "schema_lower": "",
    "schema_upper": "",
    "table": "",
    "table_lower": "",
    "table_upper": "",
    "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
  },
  "object": {
    "schema": "public",
    "table": "test1k_postgres_pg_parquet",
    "name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
    "temp_schema": "public",
    "temp_table": "test1k_postgres_pg_parquet_tmp",
    "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
  },
  "runs": {
    "test_public_test1k": {
      "id": "test_public_test1k",
      "stream": {
        "file_folder": "update_dt_year=2018",
        "file_name": "update_dt_month=11",
        "file_ext": "parquet",
        "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
        "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
        "schema": "",
        "schema_lower": "",
        "schema_upper": "",
        "table": "",
        "table_lower": "",
        "table_upper": "",
        "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
      },
      "object": {
        "schema": "public",
        "table": "test1k_postgres_pg_parquet",
        "name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
        "temp_schema": "public",
        "temp_table": "test1k_postgres_pg_parquet_tmp",
        "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
      },
      "total_bytes": 6050,
      "total_rows": 34,
      "status": "success",
      "start_time": "2025-01-19T08:27:22.988403-05:00",
      "end_time": "2025-01-19T08:27:31.472684-05:00",
      "duration": 8,
      "error": null,
      "config": {
        "mode": "incremental",
        "object": "\"public\".\"test1k_postgres_pg_parquet\"",
        "primary_key": [
          "id"
        ],
        "update_key": "update_dt",
        "source_options": {},
        "target_options": {},
        "single": false,
        "hooks": {}
      }
    }
  },
  "execution": {
    "id": "2rxeplXz2UqdIML1NncvWKNQuwD",
    "string": "/path/to/replication.yaml",
    "total_bytes": 6050,
    "total_rows": 34,
    "status": {
      "count": 1,
      "success": 1,
      "running": 0,
      "skipped": 0,
      "cancelled": 0,
      "warning": 0,
      "error": 0
    },
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 8,
    "error": null
  },
  "run": {
    "id": "test_public_test1k",
    "stream": {
      "file_folder": "update_dt_year=2018",
      "file_name": "update_dt_month=11",
      "file_ext": "parquet",
      "file_path": "test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11",
      "name": "test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/",
      "schema": "",
      "schema_lower": "",
      "schema_upper": "",
      "table": "",
      "table_lower": "",
      "table_upper": "",
      "full_name": "s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/"
    },
    "object": {
      "schema": "public",
      "table": "test1k_postgres_pg_parquet",
      "name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "full_name": "\"public\".\"test1k_postgres_pg_parquet\"",
      "temp_schema": "public",
      "temp_table": "test1k_postgres_pg_parquet_tmp",
      "temp_full_name": "\"public\".\"test1k_postgres_pg_parquet_tmp\""
    },
    "total_bytes": 6050,
    "total_rows": 34,
    "status": "success",
    "start_time": "2025-01-19T08:27:22.988403-05:00",
    "end_time": "2025-01-19T08:27:31.472684-05:00",
    "duration": 8,
    "error": null,
    "config": {
      "mode": "incremental",
      "object": "\"public\".\"test1k_postgres_pg_parquet\"",
      "primary_key": [
        "id"
      ],
      "update_key": "update_dt",
      "source_options": {},
      "target_options": {},
      "single": false,
      "hooks": {}
    }
  }
}
```

</details>

Furthermore, we can access any data-point using a [`jmespath` expression](https://jmespath.org/):

* `state["start-02"].status` - Gets the status of a hook (returns `success`)
* `store.my_key` - Gets a stored value from the store (returns `my_value`)
* `run.total_rows` - Gets the number of rows processed in the current run (returns `34`)
* `run.duration` - Gets the duration of the current run in seconds (returns `8`)
* `timestamp.unix` - The epoch/unix timestamp (returns `1737286051`)
* `source.bucket` - Gets the source S3 bucket name (returns `my-bucket-1`)
* `target.database` - Gets the target database name (returns `postgres`)
* `run.config.primary_key[0]` - Gets the first primary key column (returns `id`)
* `stream.file_path` - Gets the current stream's file path (returns `test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11`)
* `stream.file_ext` - Gets the file extension (returns `parquet`)
* `stream.schema_lower` - Gets the stream schema name in lowercase
* `stream.table_upper` - Gets the stream table name in uppercase
* `object.temp_full_name` - Gets the temporary table full name (returns `"public"."test1k_postgres_pg_parquet_tmp"`)
* `execution.status.error` - Gets the count of errored streams (returns `0`)
* `execution.total_bytes` - Gets the total bytes processed across all runs (returns `6050`)
* `runs["test_public_test1k"].status` - Gets the status of a specific run by ID (returns `success`)

## Complete Example

```yaml
# replication level hooks need to be set at the root of the YAML
hooks:
  # runs in order before replication starts.
  start:
      - type: query
        query: select ....
        id: my_query # can use `{state.my_query.result[0].col1}` later
        on_failure: abort
      
  # runs in order after all streams have completed.
  end:
      # check for any errors. if errored, do not proceed (break)
      - type: check
        check: execution.status.error == 0
        on_failure: break
      
      - type: query
        query: update ....
        into: result # can use `{result.col1}` later
        on_failure: abort

defaults:
  hooks:
    pre:
      - type: query
        connection: source_db
        query: "UPDATE status SET running = true"
        on_failure: abort

    post:
      - type: check
        check: "run.total_rows > 0"
        on_failure: warn
      
      - type: http
        if: run.status == "success"
        url: "https://api.example.com/webhook"
        method: POST
        payload: | # my_query.result will serialize into an array of objects
          {
            "status": "complete",
            "name": "{ my_query.result[0].name }"
            "records": {my_query.result}}
          }

streams:
  public.users:
    hooks:
      # runs in order before stream run
      pre:
        - type: query
          query: update ....
          on_failure: abort

      # runs in order after stream run
      post:
        - type: http
          url: https://my.webhook/path
          method: POST
          payload: |
            {"result": "{run.status}"}
```

## Best Practices

1. **Error Handling**: Specify appropriate `on_failure` behavior. The default value is `abort`.
2. **Validation**: Use `check` hooks to validate prerequisites and results
3. **Logging**: Implement `log` hooks for better observability
4. **Cleanup**: Use `delete` hooks to manage temporary / old files
5. **Modularity**: Break down complex operations into multiple hooks
6. **Conditions**: Use `if` conditions to control hook execution
7. **Environment Awareness**: Consider different environments in hook configurations
