Query hooks allow you to execute SQL queries against any defined connection in your environment. This is particularly useful for pre and post-processing tasks, data validation, and maintaining metadata.
Configuration
- type: query
connection: target_db # Connection name (required)
query: "SELECT * FROM table" # Required
transient: false # Optional: Use transient connection
on_failure: abort # Optional: abort/warn/quiet/skip
id: my_id # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.
Properties
Property
Required
Description
connection
Yes
The name of the connection to execute the query against
query
Yes
The SQL query to execute (can also be the path of a local .sql file with file:// prefix)
transient
No
Whether to use a transient connection (default: false)
on_failure
No
What to do if the query fails (abort/warn/quiet/skip)
Output
When the query hook executes successfully, it returns the following output that can be accessed in subsequent hooks:
status: success # Status of the hook execution
query: "SELECT * FROM table" # The executed query
connection: "target_db" # The connection used
columns: ["column1", "column2", ...] # List of column names from the result
result: # Array of records from the query result
- column1: value1
column2: value2
- column1: value3
column2: value4
You can access these values in subsequent hooks using the following syntax (jmespath):
{state.hook_id.status} - Status of the hook execution
{state.hook_id.query} - The executed query
{state.hook_id.connection} - The connection used
{state.hook_id.columns} - List of column names
{state.hook_id.result} - Array of result records
{state.hook_id.result[0].column_name} - Access specific values from the result
Examples
Update Status Table
Track when a replication starts by updating a status table:
Verify data quality after loading and raise an alert if issues are found:
hooks:
post:
- type: query
connection: target_db
query: |
WITH quality_check AS (
SELECT
COUNT(*) as invalid_records
FROM {run.object.full_name}
WHERE email IS NULL
OR LENGTH(email) < 5
OR email NOT LIKE '%@%.%'
)
INSERT INTO data_quality_alerts (
table_name,
check_time,
invalid_count,
total_count
)
SELECT
'{run.object.full_name}',
CURRENT_TIMESTAMP,
invalid_records,
{run.total_rows}
FROM quality_check
WHERE invalid_records > 0
Cleanup Old Data
Clean up old data before loading new data in incremental mode:
hooks:
pre:
- type: query
connection: target_db
query: |
DELETE FROM {run.object.full_name}
WHERE created_date < DATEADD(day, -90, CURRENT_DATE)
on_failure: warn
Update Metadata
Update a metadata table after successful load:
hooks:
post:
- type: query
connection: target_db
if: run.status == "success"
query: |
MERGE INTO table_metadata t
USING (
SELECT
'{run.object.full_name}' as table_name,
{run.total_rows} as row_count,
'{run.end_time}' as last_updated
) s
ON t.table_name = s.table_name
WHEN MATCHED THEN
UPDATE SET
row_count = s.total_rows,
last_updated = s.last_updated
WHEN NOT MATCHED THEN
INSERT (table_name, row_count, last_updated)
VALUES (s.table_name, s.total_rows, s.last_updated)
Validate and Rollback
Check data consistency and rollback if issues are found:
hooks:
post:
- type: query
connection: target_db
query: |
DO $$
DECLARE
duplicate_count INT;
BEGIN
SELECT COUNT(*) INTO duplicate_count
FROM (
SELECT customer_id, COUNT(*)
FROM {run.object.full_name}
GROUP BY customer_id
HAVING COUNT(*) > 1
) t;
IF duplicate_count > 0 THEN
RAISE EXCEPTION 'Found % duplicate customer records', duplicate_count;
END IF;
END $$;
on_failure: abort
Aggregate Statistics
Calculate and store aggregated statistics after data load:
hooks:
post:
- type: query
connection: target_db
if: run.status == "success"
query: |
INSERT INTO sales_summary (
date,
total_sales,
avg_order_value,
order_count
)
SELECT
DATE(order_date),
SUM(amount),
AVG(amount),
COUNT(*)
FROM {run.object.full_name}
WHERE order_date >= CURRENT_DATE - INTERVAL '1 day'
GROUP BY DATE(order_date)
ON CONFLICT (date) DO UPDATE
SET
total_sales = EXCLUDED.total_sales,
avg_order_value = EXCLUDED.avg_order_value,
order_count = EXCLUDED.order_count