Hooks / Steps
Execute custom actions throughout your replication or pipeline
Hooks are powerful mechanisms in Sling that allow you to execute custom actions before (pre-hooks) or after (post-hooks) a replication stream, as well as at the start or end of the replication parent cycle (before first stream and/or after last stream). They enable you to extend and customize your data pipeline with various operations such as data validation, notifications, file management, and custom processing.
Hooks are the same as Steps, when using sling in Pipeline mode.
Furthermore, Sling Hooks integrate seamlessly with the Sling VSCode Extension. The extension provides schema validation, auto-completion, hover documentation, and diagnostics for your hooks configurations, making it easier to author and debug complex workflows.
Some typical operations include:
Stream Level
Pre-Hooks: Execute before the replication stream run starts
Validate prerequisites
Download necessary files
Set up configurations
Perform cleanup operations
Post-Hooks: Execute after the replication stream run completes
Validate results
Send notifications
Upload processed files
Clean up temporary files
Log completion status
Pre/Post-Merge-Hooks: Execute in transaction session, before/after data is loaded/merged into final table
Set specific session setting and configuration
Alter table holding the temporary data, prior to merge
Run Specific SQL queries on other tables
Available Hook Types
Hook Configuration
Hooks can be configured in two locations:
At the
defaultslevel (applies to all streams)At the individual
streamlevel (overrides defaults)
Stream level Hooks
We can use the following structure to decare hooks with the hooks key, under the defaults branch or under any stream branch.
defaults:
...
streams:
my_stream:
hooks:
# Prior to stream beginning
pre:
- type: log
# hook configuration...
# Inside session/transaction, before merge, right after BEGIN
pre_merge:
- type: query
# hook configuration...
# Inside session/transaction, after merge, right before COMMIT
post_merge:
- type: query
# hook configuration...
# After stream finishes
post:
- type: http
# hook configuration...Replication level Hooks
We can also define hooks to run at the replication file level, meaning before any of the streams run and/or after all the streams have ran. For replication level hooks, we must declare the start and end hooks at the root of the YAML configuration.
# replication level hooks need to be set at the root of the YAML
hooks:
start:
- type: query
# hook configuration...
end:
- type: http
# hook configuration...
defaults:
...
streams:
...Common Hook Properties
All hook types share some common properties:
type
The type of hook (query/ http/ check/ copy / delete/ log / inspect)
Yes
if
Optional condition to determine if the hook should execute
No
id
a specify identifier to refer to the hook output data.
No
on_failure
What to do if the hook fails (abort/ warn/ quiet/skip/break)
No (defaults to abort)
Variables Available
runtime_state- Contains all state variables availablestate.*- All hooks output state information (keyed by hook id)store.*- All stored values from previous hooksenv.*- All variables defined in theenvtimestamp.*- Various timestamp parts informationexecution.*- Replication run level informationsource.*- Source connection informationtarget.*- Target connection informationstream.*- Current source stream infoobject.*- Current target object inforuns.*- All runs information (keyed by stream run id)run.*- Current stream run information
Nested Fields
runtime_state Payload
runtime_state PayloadThe best way to view any available variables is to print the runtime_state variable.
For example, using the log hook as shown below will print all available variables.
- type: log
message: '{runtime_state}'Shows something like below JSON payload.
Furthermore, we can access any data-point using a jmespath expression:
state["start-02"].status- Gets the status of a hook (returnssuccess)store.my_key- Gets a stored value from the store (returnsmy_value)run.total_rows- Gets the number of rows processed in the current run (returns34)run.duration- Gets the duration of the current run in seconds (returns8)timestamp.unix- The epoch/unix timestamp (returns1737286051)source.bucket- Gets the source S3 bucket name (returnsmy-bucket-1)target.database- Gets the target database name (returnspostgres)run.config.primary_key[0]- Gets the first primary key column (returnsid)stream.file_path- Gets the current stream's file path (returnstest/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11)stream.file_ext- Gets the file extension (returnsparquet)stream.schema_lower- Gets the stream schema name in lowercasestream.table_upper- Gets the stream table name in uppercaseobject.temp_full_name- Gets the temporary table full name (returns"public"."test1k_postgres_pg_parquet_tmp")execution.status.error- Gets the count of errored streams (returns0)execution.total_bytes- Gets the total bytes processed across all runs (returns6050)runs["test_public_test1k"].status- Gets the status of a specific run by ID (returnssuccess)
Complete Example
# replication level hooks need to be set at the root of the YAML
hooks:
# runs in order before replication starts.
start:
- type: query
query: select ....
id: my_query # can use `{state.my_query.result[0].col1}` later
on_failure: abort
# runs in order after all streams have completed.
end:
# check for any errors. if errored, do not proceed (break)
- type: check
check: execution.status.error == 0
on_failure: break
- type: query
query: update ....
into: result # can use `{result.col1}` later
on_failure: abort
defaults:
hooks:
pre:
- type: query
connection: source_db
query: "UPDATE status SET running = true"
on_failure: abort
post:
- type: check
check: "run.total_rows > 0"
on_failure: warn
- type: http
if: run.status == "success"
url: "https://api.example.com/webhook"
method: POST
payload: | # my_query.result will serialize into an array of objects
{
"status": "complete",
"name": "{ my_query.result[0].name }"
"records": {my_query.result}}
}
streams:
public.users:
hooks:
# runs in order before stream run
pre:
- type: query
query: update ....
on_failure: abort
# runs in order after stream run
post:
- type: http
url: https://my.webhook/path
method: POST
payload: |
{"result": "{run.status}"}Best Practices
Error Handling: Specify appropriate
on_failurebehavior. The default value isabort.Validation: Use
checkhooks to validate prerequisites and resultsLogging: Implement
loghooks for better observabilityCleanup: Use
deletehooks to manage temporary / old filesModularity: Break down complex operations into multiple hooks
Conditions: Use
ifconditions to control hook executionEnvironment Awareness: Consider different environments in hook configurations
Last updated
Was this helpful?