Query

Query hooks allow you to execute SQL queries against any defined connection in your environment. This is particularly useful for pre and post-processing tasks, data validation, and maintaining metadata.

Configuration

- type: query
  connection: target_db   # Connection name (required)
  query:      "SELECT * FROM table"  # Required
  transient:  false       # Optional: Use transient connection
  on_failure: abort       # Optional: abort/warn/quiet/skip
  id:         my_id       # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

connection

Yes

The name of the connection to execute the query against

query

Yes

The SQL query to execute (can also be the path of a local .sql file with file:// prefix)

transient

No

Whether to use a transient connection (default: false)

on_failure

No

What to do if the query fails (abort/warn/quiet/skip)

Output

When the query hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
query: "SELECT * FROM table"  # The executed query
connection: "target_db"  # The connection used
columns: ["column1", "column2", ...]  # List of column names from the result
result:  # Array of records from the query result
  - column1: value1
    column2: value2
  - column1: value3
    column2: value4

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.query} - The executed query

  • {state.hook_id.connection} - The connection used

  • {state.hook_id.columns} - List of column names

  • {state.hook_id.result} - Array of result records

  • {state.hook_id.result[0].column_name} - Access specific values from the result

Examples

Update Status Table

Track when a replication starts by updating a status table:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        INSERT INTO replication_status (
          stream_name, 
          start_time, 
          status
        ) VALUES (
          '{run.stream.name}',
          '{run.start_time}',
          'RUNNING'
        )

    - type: query
      connection: target_db
      query: file://path/to/file.sql

Data Quality Check

Verify data quality after loading and raise an alert if issues are found:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        WITH quality_check AS (
          SELECT 
            COUNT(*) as invalid_records
          FROM {run.object.full_name}
          WHERE email IS NULL 
            OR LENGTH(email) < 5 
            OR email NOT LIKE '%@%.%'
        )
        INSERT INTO data_quality_alerts (
          table_name,
          check_time,
          invalid_count,
          total_count
        )
        SELECT 
          '{run.object.full_name}',
          CURRENT_TIMESTAMP,
          invalid_records,
          {run.total_rows}
        FROM quality_check
        WHERE invalid_records > 0

Cleanup Old Data

Clean up old data before loading new data in incremental mode:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        DELETE FROM {run.object.full_name}
        WHERE created_date < DATEADD(day, -90, CURRENT_DATE)
      on_failure: warn

Update Metadata

Update a metadata table after successful load:

hooks:
  post:
    - type: query
      connection: target_db
      if: run.status == "success"
      query: |
        MERGE INTO table_metadata t
        USING (
          SELECT 
            '{run.object.full_name}' as table_name,
            {run.total_rows} as row_count,
            '{run.end_time}' as last_updated
        ) s
        ON t.table_name = s.table_name
        WHEN MATCHED THEN
          UPDATE SET 
            row_count = s.total_rows,
            last_updated = s.last_updated
        WHEN NOT MATCHED THEN
          INSERT (table_name, row_count, last_updated)
          VALUES (s.table_name, s.total_rows, s.last_updated)

Validate and Rollback

Check data consistency and rollback if issues are found:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        DO $$
        DECLARE
          duplicate_count INT;
        BEGIN
          SELECT COUNT(*) INTO duplicate_count
          FROM (
            SELECT customer_id, COUNT(*)
            FROM {run.object.full_name}
            GROUP BY customer_id
            HAVING COUNT(*) > 1
          ) t;
          
          IF duplicate_count > 0 THEN
            RAISE EXCEPTION 'Found % duplicate customer records', duplicate_count;
          END IF;
        END $$;
      on_failure: abort

Aggregate Statistics

Calculate and store aggregated statistics after data load:

hooks:
  post:
    - type: query
      connection: target_db
      if: run.status == "success"
      query: |
        INSERT INTO sales_summary (
          date,
          total_sales,
          avg_order_value,
          order_count
        )
        SELECT
          DATE(order_date),
          SUM(amount),
          AVG(amount),
          COUNT(*)
        FROM {run.object.full_name}
        WHERE order_date >= CURRENT_DATE - INTERVAL '1 day'
        GROUP BY DATE(order_date)
        ON CONFLICT (date) DO UPDATE
        SET
          total_sales = EXCLUDED.total_sales,
          avg_order_value = EXCLUDED.avg_order_value,
          order_count = EXCLUDED.order_count

Last updated