Query
Query hooks allow you to execute SQL queries against any defined connection in your environment. This is particularly useful for pre and post-processing tasks, data validation, and maintaining metadata.
Configuration
- type: query
connection: target_db # Connection name (required)
query: "SELECT * FROM table" # Required
into: "my_results" # Optional: Store results in variable
transient: false # Optional: Use transient connection
transaction: true # Optional: Use transaction, or set isolation level
on_failure: abort # Optional: abort/warn/quiet/skip
id: my_id # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.Properties
connection
Yes
The name of the connection to execute the query against
query
Yes
The SQL query to execute (can also be the path of a local .sql file with file:// prefix)
into
No
Variable name to store the query results. If not specified, results are included in hook output.
transient
No
Whether to use a transient connection (default: false)
transaction
No
Whether to use transaction, and optionally set isolation level. Ideal for multiple statements. Supported values: default, true ( same as default), read_uncommitted, read_committed, repeatable_read, serializable
on_failure
No
What to do if the query fails (abort/warn/quiet/skip)
Output
When the query hook executes successfully, it returns the following output that can be accessed in subsequent hooks:
status: success # Status of the hook execution
query: "SELECT * FROM table" # The executed query
connection: "target_db" # The connection used
columns: ["column1", "column2", ...] # List of column names from the result
result: # Array of records from the query result (only if 'into' is not specified)
- column1: value1
column2: value2
- column1: value3
column2: value4You can access these values in subsequent hooks using the following syntax (jmespath):
{state.hook_id.status}- Status of the hook execution{state.hook_id.query}- The executed query{state.hook_id.connection}- The connection used{state.hook_id.columns}- List of column names{state.hook_id.result}- Array of result records (only if 'into' is not specified){state.hook_id.result[0].column_name}- Access specific values from the result{store.variable_name}- Stored results when using 'into' parameter
Examples
Store Query Results for Later Use
Execute a query and store results for use in subsequent hooks:
hooks:
pre:
- type: query
connection: source_db
query: |
SELECT
table_name,
last_updated,
row_count
FROM metadata_table
WHERE table_name = '{run.stream.name}'
into: "table_metadata"
- type: log
message: "Last updated: {store.table_metadata[0].last_updated}, Row count: {store.table_metadata[0].row_count}"
Transactional Session Settings
Use pre_merge and post_merge (formally pre_sql and post_sql) (available in v1.4.24+) for tight session settings. Below example uses the same connection session/transaction to run custom SET queries.
source: mssql
target: mssql2022
streams:
dbo.identity_source:
object: dbo.identity_target
hooks:
pre_merge:
- type: query
connection: '{target.name}'
query: "SET IDENTITY_INSERT {run.object.full_name} ON"
post_merge:
- type: query
connection: '{target.name}'
query: "SET IDENTITY_INSERT {run.object.full_name} OFF"Get Configuration Values
Retrieve configuration values from database and use them in other hooks:
hooks:
pre:
- type: query
connection: config_db
query: |
SELECT
config_key,
config_value
FROM application_config
WHERE environment = '{env.ENV_NAME}'
into: "app_config"
- type: write
to: "local/temp/runtime_config.json"
content: "{store.app_config}"Conditional Processing Based on Query Results
Use query results to control subsequent hook execution:
hooks:
pre:
- type: query
connection: target_db
query: |
SELECT COUNT(*) as record_count
FROM {run.object.full_name}
WHERE DATE(created_at) = CURRENT_DATE
into: "daily_count"
- type: query
connection: target_db
if: "store.daily_count[0].record_count > 1000"
query: |
DELETE FROM {run.object.full_name}
WHERE DATE(created_at) = CURRENT_DATE
ORDER BY created_at
LIMIT 500Update Status Table
Track when a replication starts by updating a status table:
hooks:
pre:
- type: query
connection: target_db
query: |
INSERT INTO replication_status (
stream_name,
start_time,
status
) VALUES (
'{run.stream.name}',
'{run.start_time}',
'RUNNING'
)
- type: query
connection: target_db
query: file://path/to/file.sqlData Quality Check
Verify data quality after loading and raise an alert if issues are found:
hooks:
post:
- type: query
connection: target_db
query: |
WITH quality_check AS (
SELECT
COUNT(*) as invalid_records
FROM {run.object.full_name}
WHERE email IS NULL
OR LENGTH(email) < 5
OR email NOT LIKE '%@%.%'
)
INSERT INTO data_quality_alerts (
table_name,
check_time,
invalid_count,
total_count
)
SELECT
'{run.object.full_name}',
CURRENT_TIMESTAMP,
invalid_records,
{run.total_rows}
FROM quality_check
WHERE invalid_records > 0Cleanup Old Data
Clean up old data before loading new data in incremental mode:
hooks:
pre:
- type: query
connection: target_db
query: |
DELETE FROM {run.object.full_name}
WHERE created_date < DATEADD(day, -90, CURRENT_DATE)
on_failure: warnUpdate Metadata
Update a metadata table after successful load:
hooks:
post:
- type: query
connection: target_db
if: run.status == "success"
query: |
MERGE INTO table_metadata t
USING (
SELECT
'{run.object.full_name}' as table_name,
{run.total_rows} as total_rows,
'{run.end_time}' as last_updated
) s
ON t.table_name = s.table_name
WHEN MATCHED THEN
UPDATE SET
total_rows = s.total_rows,
last_updated = s.last_updated
WHEN NOT MATCHED THEN
INSERT (table_name, total_rows, last_updated)
VALUES (s.table_name, s.total_rows, s.last_updated)Validate and Rollback
Check data consistency and rollback if issues are found:
hooks:
post:
- type: query
connection: target_db
query: |
DO $$
DECLARE
duplicate_count INT;
BEGIN
SELECT COUNT(*) INTO duplicate_count
FROM (
SELECT customer_id, COUNT(*)
FROM {run.object.full_name}
GROUP BY customer_id
HAVING COUNT(*) > 1
) t;
IF duplicate_count > 0 THEN
RAISE EXCEPTION 'Found % duplicate customer records', duplicate_count;
END IF;
END $$;
on_failure: abortAggregate Statistics
Calculate and store aggregated statistics after data load:
hooks:
post:
- type: query
connection: target_db
if: run.status == "success"
query: |
INSERT INTO sales_summary (
date,
total_sales,
avg_order_value,
order_count
)
SELECT
DATE(order_date),
SUM(amount),
AVG(amount),
COUNT(*)
FROM {run.object.full_name}
WHERE order_date >= CURRENT_DATE - INTERVAL '1 day'
GROUP BY DATE(order_date)
ON CONFLICT (date) DO UPDATE
SET
total_sales = EXCLUDED.total_sales,
avg_order_value = EXCLUDED.avg_order_value,
order_count = EXCLUDED.order_countTransaction Support
Execute multiple operations within a transaction with specific isolation level:
hooks:
pre:
- type: query
connection: target_db
transaction: repeatable_read # Ensures consistent reads within transaction
query: |
-- Multiple operations executed as a single transaction
DELETE FROM staging_table WHERE process_date < CURRENT_DATE - 7;
INSERT INTO staging_table (id, name, process_date)
SELECT id, name, CURRENT_DATE
FROM source_table
WHERE status = 'active';
UPDATE process_log
SET last_run = CURRENT_TIMESTAMP,
records_processed = (SELECT COUNT(*) FROM staging_table)
WHERE process_name = 'daily_staging';When using transactions, if any statement fails, all operations within the transaction will be rolled back automatically.
Last updated
Was this helpful?