Hooks / Steps
Execute custom actions throughout your replication or pipeline
Hooks are powerful mechanisms in Sling that allow you to execute custom actions before (pre-hooks) or after (post-hooks) a replication stream, as well as at the start or end of the replication parent cycle (before first stream and/or after last stream). They enable you to extend and customize your data pipeline with various operations such as data validation, notifications, file management, and custom processing.
Hooks are the same as Steps, when using sling in Pipeline mode.
Furthermore, Sling Hooks integrate seamlessly with the Sling VSCode Extension. The extension provides schema validation, auto-completion, hover documentation, and diagnostics for your hooks configurations, making it easier to author and debug complex workflows.
Some typical operations include:
Stream Level
Pre-Hooks: Execute before the replication stream run starts
Validate prerequisites
Download necessary files
Set up configurations
Perform cleanup operations
Post-Hooks: Execute after the replication stream run completes
Validate results
Send notifications
Upload processed files
Clean up temporary files
Log completion status
Pre/Post-Merge-Hooks: Execute in transaction session, before/after data is loaded/merged into final table
Set specific session setting and configuration
Alter table holding the temporary data, prior to merge
Run Specific SQL queries on other tables
Available Hook Types
Hook Configuration
Hooks can be configured in two locations:
At the
defaultslevel (applies to all streams)At the individual
streamlevel (overrides defaults)
Stream level Hooks
We can use the following structure to decare hooks with the hooks key, under the defaults branch or under any stream branch.
Replication level Hooks
We can also define hooks to run at the replication file level, meaning before any of the streams run and/or after all the streams have ran. For replication level hooks, we must declare the start and end hooks at the root of the YAML configuration.
Common Hook Properties
All hook types share some common properties:
type
The type of hook (query/ http/ check/ copy / delete/ log / inspect)
Yes
if
Optional condition to determine if the hook should execute
No
id
a specify identifier to refer to the hook output data.
No
on_failure
What to do if the hook fails (abort/ warn/ quiet/skip/break)
No (defaults to abort)
Variables Available
runtime_state- Contains all state variables availablestate.*- All hooks output state information (keyed by hook id)store.*- All stored values from previous hooksenv.*- All variables defined in theenvtimestamp.*- Various timestamp parts informationexecution.*- Replication run level informationsource.*- Source connection informationtarget.*- Target connection informationstream.*- Current source stream infoobject.*- Current target object inforuns.*- All runs information (keyed by stream run id)run.*- Current stream run information
Nested Fields
timestamp.* Fields
timestamp
datetime
Full timestamp object
2025-01-19T08:27:31.473303-05:00
unix
integer
Unix epoch timestamp
1737286051
file_name
string
Timestamp formatted for file names
2025_01_19_082731
rfc3339
string
RFC3339 formatted timestamp
2025-01-19T08:27:31-05:00
date
string
Date only
2025-01-19
datetime
string
Date and time
2025-01-19 08:27:31
YYYY
string
Four-digit year
2025
YY
string
Two-digit year
25
MMM
string
Three-letter month abbreviation
Jan
MM
string
Two-digit month
01
DD
string
Two-digit day
19
DDD
string
Three-letter day abbreviation
Sun
HH
string
Two-digit hour (24-hour format)
08
execution.* Fields
id
string
Unique execution identifier
2rxeplXz2UqdIML1NncvWKNQuwD
file_path
string
Path to the replication configuration file
/path/to/replication.yaml
file_name
string
Name to the replication configuration file
replication.yaml
total_bytes
integer
Total bytes processed across all runs
6050
total_rows
integer
Total rows processed across all runs
34
status.count
integer
Total number of streams
1
status.success
integer
Number of successful streams
1
status.running
integer
Number of running streams
0
status.skipped
integer
Number of skipped streams
0
status.cancelled
integer
Number of cancelled streams
0
status.warning
integer
Number of streams with warnings
0
status.error
integer
Number of errored streams
0
start_time
datetime
Execution start time
2025-01-19T08:27:22.988403-05:00
end_time
datetime
Execution end time
2025-01-19T08:27:31.472684-05:00
duration
integer
Execution duration in seconds
8
error
string/null
Error message if execution failed
null
source.* / target.* Connection Fields
name
string
Connection name
aws_s3
postgres
type
string
Connection type
s3
postgres
kind
string
Connection kind
file
database
bucket
string
S3/GCS bucket name
my-bucket-1
``
container
string
Azure container name
``
``
database
string
Database name
``
postgres
instance
string
Database instance
``
``
schema
string
Default schema
``
public
stream.* Fields
file_folder
string
Parent folder of the file
update_dt_year=2018
file_name
string
Name of the file
update_dt_month=11
file_ext
string
File extension
parquet
file_path
string
Full file path
test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11
name
string
Stream name pattern
test/public_test1k_postgres_pg_parquet/{part_year}/{part_month}/
description
string
Stream description (if provided)
``
schema
string
Schema name (for database sources)
``
schema_lower
string
Schema name in lowercase
``
schema_upper
string
Schema name in uppercase
``
table
string
Table name (for database sources)
``
table_lower
string
Table name in lowercase
``
table_upper
string
Table name in uppercase
``
full_name
string
Full stream identifier
s3://my-bucket-1/test/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11/
object.* Fields
schema
string
Target schema name
public
table
string
Target table name
test1k_postgres_pg_parquet
name
string
Quoted object name
"public"."test1k_postgres_pg_parquet"
full_name
string
Full quoted object name
"public"."test1k_postgres_pg_parquet"
temp_schema
string
Temporary schema name
public
temp_table
string
Temporary table name
test1k_postgres_pg_parquet_tmp
temp_full_name
string
Full temporary table name
"public"."test1k_postgres_pg_parquet_tmp"
run.* Fields
id
string
Run identifier
test_public_test1k
stream.*
object
Stream information (see stream fields above)
{...}
object.*
object
Object information (see object fields above)
{...}
total_bytes
integer
Total bytes processed in this run
6050
total_rows
integer
Total rows processed in this run
34
status
string
Run status
success
start_time
datetime
Run start time
2025-01-19T08:27:22.988403-05:00
end_time
datetime
Run end time
2025-01-19T08:27:31.472684-05:00
duration
integer
Run duration in seconds
8
incremental_value
any
The incremental value used
2025-01-19T08:27:31.472684-05:00
range
string
The start/end range values used
2025-01-01,2025-02-01
error
string/null
Error message if run failed
null
config.mode
string
Replication mode
incremental
config.object
string
Target object
"public"."test1k_postgres_pg_parquet"
config.primary_key
array
Primary key columns
["id"]
config.update_key
string
Update key column
update_dt
config.source_options
object
Source-specific options
{}
config.target_options
object
Target-specific options
{}
runtime_state Payload
runtime_state PayloadThe best way to view any available variables is to print the runtime_state variable.
For example, using the log hook as shown below will print all available variables.
Shows something like below JSON payload.
Furthermore, we can access any data-point using a jmespath expression:
state["start-02"].status- Gets the status of a hook (returnssuccess)store.my_key- Gets a stored value from the store (returnsmy_value)run.total_rows- Gets the number of rows processed in the current run (returns34)run.duration- Gets the duration of the current run in seconds (returns8)timestamp.unix- The epoch/unix timestamp (returns1737286051)source.bucket- Gets the source S3 bucket name (returnsmy-bucket-1)target.database- Gets the target database name (returnspostgres)run.config.primary_key[0]- Gets the first primary key column (returnsid)stream.file_path- Gets the current stream's file path (returnstest/public_test1k_postgres_pg_parquet/update_dt_year=2018/update_dt_month=11)stream.file_ext- Gets the file extension (returnsparquet)stream.schema_lower- Gets the stream schema name in lowercasestream.table_upper- Gets the stream table name in uppercaseobject.temp_full_name- Gets the temporary table full name (returns"public"."test1k_postgres_pg_parquet_tmp")execution.status.error- Gets the count of errored streams (returns0)execution.total_bytes- Gets the total bytes processed across all runs (returns6050)runs["test_public_test1k"].status- Gets the status of a specific run by ID (returnssuccess)
Complete Example
Best Practices
Error Handling: Specify appropriate
on_failurebehavior. The default value isabort.Validation: Use
checkhooks to validate prerequisites and resultsLogging: Implement
loghooks for better observabilityCleanup: Use
deletehooks to manage temporary / old filesModularity: Break down complex operations into multiple hooks
Conditions: Use
ifconditions to control hook executionEnvironment Awareness: Consider different environments in hook configurations
Last updated
Was this helpful?