Query

Query hooks allow you to execute SQL queries against any defined connection in your environment. This is particularly useful for pre and post-processing tasks, data validation, and maintaining metadata.

Configuration

- type: query
  connection: target_db   # Connection name (required)
  query:      "SELECT * FROM table"  # Required
  into:       "my_results"  # Optional: Store results in variable
  transient:  false       # Optional: Use transient connection
  transaction: true      # Optional: Use transaction, or set isolation level
  on_failure: abort       # Optional: abort/warn/quiet/skip
  id:         my_id       # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

connection

Yes

The name of the connection to execute the query against

query

Yes

The SQL query to execute (can also be the path of a local .sql file with file:// prefix)

into

No

Variable name to store the query results. If not specified, results are included in hook output.

transient

No

Whether to use a transient connection (default: false)

transaction

No

Whether to use transaction, and optionally set isolation level. Ideal for multiple statements. Supported values: default, true ( same as default), read_uncommitted, read_committed, repeatable_read, serializable

on_failure

No

What to do if the query fails (abort/warn/quiet/skip)

Output

When the query hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
query: "SELECT * FROM table"  # The executed query
connection: "target_db"  # The connection used
columns: ["column1", "column2", ...]  # List of column names from the result
result:  # Array of records from the query result (only if 'into' is not specified)
  - column1: value1
    column2: value2
  - column1: value3
    column2: value4

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.query} - The executed query

  • {state.hook_id.connection} - The connection used

  • {state.hook_id.columns} - List of column names

  • {state.hook_id.result} - Array of result records (only if 'into' is not specified)

  • {state.hook_id.result[0].column_name} - Access specific values from the result

  • {store.variable_name} - Stored results when using 'into' parameter

Examples

Store Query Results for Later Use

Execute a query and store results for use in subsequent hooks:

hooks:
  pre:
    - type: query
      connection: source_db
      query: |
        SELECT 
          table_name,
          last_updated,
          row_count
        FROM metadata_table
        WHERE table_name = '{run.stream.name}'
      into: "table_metadata"
      
    - type: log
      message: "Last updated: {store.table_metadata[0].last_updated}, Row count: {store.table_metadata[0].row_count}"
      

Transactional Session Settings

Use pre_merge and post_merge (formally pre_sql and post_sql) (available in v1.4.24+) for tight session settings. Below example uses the same connection session/transaction to run custom SET queries.

source: mssql
target: mssql2022

streams:
  dbo.identity_source:
    object: dbo.identity_target

    hooks:
      pre_merge:
        - type: query
          connection: '{target.name}'
          query: "SET IDENTITY_INSERT {run.object.full_name} ON"

      post_merge:
        - type: query
          connection: '{target.name}'
          query: "SET IDENTITY_INSERT {run.object.full_name} OFF"

Get Configuration Values

Retrieve configuration values from database and use them in other hooks:

hooks:
  pre:
    - type: query
      connection: config_db
      query: |
        SELECT 
          config_key,
          config_value
        FROM application_config
        WHERE environment = '{env.ENV_NAME}'
      into: "app_config"
      
    - type: write
      to: "local/temp/runtime_config.json"
      content: "{store.app_config}"

Conditional Processing Based on Query Results

Use query results to control subsequent hook execution:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        SELECT COUNT(*) as record_count
        FROM {run.object.full_name}
        WHERE DATE(created_at) = CURRENT_DATE
      into: "daily_count"
      
    - type: query
      connection: target_db
      if: "store.daily_count[0].record_count > 1000"
      query: |
        DELETE FROM {run.object.full_name}
        WHERE DATE(created_at) = CURRENT_DATE
        ORDER BY created_at
        LIMIT 500

Update Status Table

Track when a replication starts by updating a status table:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        INSERT INTO replication_status (
          stream_name, 
          start_time, 
          status
        ) VALUES (
          '{run.stream.name}',
          '{run.start_time}',
          'RUNNING'
        )

    - type: query
      connection: target_db
      query: file://path/to/file.sql

Data Quality Check

Verify data quality after loading and raise an alert if issues are found:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        WITH quality_check AS (
          SELECT 
            COUNT(*) as invalid_records
          FROM {run.object.full_name}
          WHERE email IS NULL 
            OR LENGTH(email) < 5 
            OR email NOT LIKE '%@%.%'
        )
        INSERT INTO data_quality_alerts (
          table_name,
          check_time,
          invalid_count,
          total_count
        )
        SELECT 
          '{run.object.full_name}',
          CURRENT_TIMESTAMP,
          invalid_records,
          {run.total_rows}
        FROM quality_check
        WHERE invalid_records > 0

Cleanup Old Data

Clean up old data before loading new data in incremental mode:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        DELETE FROM {run.object.full_name}
        WHERE created_date < DATEADD(day, -90, CURRENT_DATE)
      on_failure: warn

Update Metadata

Update a metadata table after successful load:

hooks:
  post:
    - type: query
      connection: target_db
      if: run.status == "success"
      query: |
        MERGE INTO table_metadata t
        USING (
          SELECT 
            '{run.object.full_name}' as table_name,
            {run.total_rows} as total_rows,
            '{run.end_time}' as last_updated
        ) s
        ON t.table_name = s.table_name
        WHEN MATCHED THEN
          UPDATE SET 
            total_rows = s.total_rows,
            last_updated = s.last_updated
        WHEN NOT MATCHED THEN
          INSERT (table_name, total_rows, last_updated)
          VALUES (s.table_name, s.total_rows, s.last_updated)

Validate and Rollback

Check data consistency and rollback if issues are found:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        DO $$
        DECLARE
          duplicate_count INT;
        BEGIN
          SELECT COUNT(*) INTO duplicate_count
          FROM (
            SELECT customer_id, COUNT(*)
            FROM {run.object.full_name}
            GROUP BY customer_id
            HAVING COUNT(*) > 1
          ) t;
          
          IF duplicate_count > 0 THEN
            RAISE EXCEPTION 'Found % duplicate customer records', duplicate_count;
          END IF;
        END $$;
      on_failure: abort

Aggregate Statistics

Calculate and store aggregated statistics after data load:

hooks:
  post:
    - type: query
      connection: target_db
      if: run.status == "success"
      query: |
        INSERT INTO sales_summary (
          date,
          total_sales,
          avg_order_value,
          order_count
        )
        SELECT
          DATE(order_date),
          SUM(amount),
          AVG(amount),
          COUNT(*)
        FROM {run.object.full_name}
        WHERE order_date >= CURRENT_DATE - INTERVAL '1 day'
        GROUP BY DATE(order_date)
        ON CONFLICT (date) DO UPDATE
        SET
          total_sales = EXCLUDED.total_sales,
          avg_order_value = EXCLUDED.avg_order_value,
          order_count = EXCLUDED.order_count

Transaction Support

Execute multiple operations within a transaction with specific isolation level:

hooks:
  pre:
    - type: query
      connection: target_db
      transaction: repeatable_read  # Ensures consistent reads within transaction
      query: |
        -- Multiple operations executed as a single transaction
        DELETE FROM staging_table WHERE process_date < CURRENT_DATE - 7;
        
        INSERT INTO staging_table (id, name, process_date)
        SELECT id, name, CURRENT_DATE
        FROM source_table
        WHERE status = 'active';
        
        UPDATE process_log 
        SET last_run = CURRENT_TIMESTAMP, 
            records_processed = (SELECT COUNT(*) FROM staging_table)
        WHERE process_name = 'daily_staging';

When using transactions, if any statement fails, all operations within the transaction will be rolled back automatically.

Last updated

Was this helpful?