Sling
Slingdata.ioBlogGithubHelp!
  • Introduction
  • Sling CLI
    • Installation
    • Environment
    • Running Sling
    • Global Variables
    • CLI Pro
  • Sling Platform
    • Sling Platform
      • Architecture
      • Agents
      • Connections
      • Editor
      • API
      • Deploy from CLI
  • Concepts
    • Replications
      • Structure
      • Modes
      • Source Options
      • Target Options
      • Columns
      • Transforms
      • Runtime Variables
      • Tags & Wildcards
    • Hooks / Steps
      • Check
      • Command
      • Copy
      • Delete
      • Group
      • Http
      • Inspect
      • List
      • Log
      • Query
      • Replication
      • Store
      • Read
      • Write
    • Pipelines
    • Data Quality
      • Constraints
  • Examples
    • File to Database
      • Custom SQL
      • Incremental
    • Database to Database
      • Custom SQL
      • Incremental
      • Backfill
    • Database to File
      • Incremental
    • Sling + Python 🚀
  • Connections
    • Database Connections
      • Athena
      • BigTable
      • BigQuery
      • Cloudflare D1
      • Clickhouse
      • Databricks
      • DuckDB
      • DuckLake
      • Iceberg
      • MotherDuck
      • MariaDB
      • MongoDB
      • Elasticsearch
      • MySQL
      • Oracle
      • Postgres
      • Prometheus
      • Proton
      • Redshift
      • S3 Tables
      • StarRocks
      • SQLite
      • SQL Server
      • Snowflake
      • Trino
    • Storage Connections
      • AWS S3
      • Azure Storage
      • Backblaze B2
      • Cloudflare R2
      • DigitalOcean Spaces
      • FTP
      • Google Drive
      • Google Storage
      • Local Storage
      • Min.IO
      • SFTP
      • Wasabi
Powered by GitBook
On this page
  • Configuration
  • Properties
  • Output
  • Examples
  • Store Query Results for Later Use
  • Get Configuration Values
  • Conditional Processing Based on Query Results
  • Update Status Table
  • Data Quality Check
  • Cleanup Old Data
  • Update Metadata
  • Validate and Rollback
  • Aggregate Statistics
  1. Concepts
  2. Hooks / Steps

Query

Query hooks allow you to execute SQL queries against any defined connection in your environment. This is particularly useful for pre and post-processing tasks, data validation, and maintaining metadata.

Configuration

- type: query
  connection: target_db   # Connection name (required)
  query:      "SELECT * FROM table"  # Required
  into:       "my_results"  # Optional: Store results in variable
  transient:  false       # Optional: Use transient connection
  on_failure: abort       # Optional: abort/warn/quiet/skip
  id:         my_id       # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

connection

Yes

The name of the connection to execute the query against

query

Yes

The SQL query to execute (can also be the path of a local .sql file with file:// prefix)

into

No

Variable name to store the query results. If not specified, results are included in hook output.

transient

No

Whether to use a transient connection (default: false)

on_failure

No

What to do if the query fails (abort/warn/quiet/skip)

Output

When the query hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
query: "SELECT * FROM table"  # The executed query
connection: "target_db"  # The connection used
columns: ["column1", "column2", ...]  # List of column names from the result
result:  # Array of records from the query result (only if 'into' is not specified)
  - column1: value1
    column2: value2
  - column1: value3
    column2: value4

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.query} - The executed query

  • {state.hook_id.connection} - The connection used

  • {state.hook_id.columns} - List of column names

  • {state.hook_id.result} - Array of result records (only if 'into' is not specified)

  • {state.hook_id.result[0].column_name} - Access specific values from the result

  • {store.variable_name} - Stored results when using 'into' parameter

Examples

Store Query Results for Later Use

Execute a query and store results for use in subsequent hooks:

hooks:
  pre:
    - type: query
      connection: source_db
      query: |
        SELECT 
          table_name,
          last_updated,
          row_count
        FROM metadata_table
        WHERE table_name = '{run.stream.name}'
      into: "table_metadata"
      
    - type: log
      message: "Last updated: {store.table_metadata[0].last_updated}, Row count: {store.table_metadata[0].row_count}"
      

Get Configuration Values

Retrieve configuration values from database and use them in other hooks:

hooks:
  pre:
    - type: query
      connection: config_db
      query: |
        SELECT 
          config_key,
          config_value
        FROM application_config
        WHERE environment = '{env.ENV_NAME}'
      into: "app_config"
      
    - type: write
      to: "local/temp/runtime_config.json"
      content: "{store.app_config | tojson}"

Conditional Processing Based on Query Results

Use query results to control subsequent hook execution:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        SELECT COUNT(*) as record_count
        FROM {run.object.full_name}
        WHERE DATE(created_at) = CURRENT_DATE
      into: "daily_count"
      
    - type: query
      connection: target_db
      if: "store.daily_count[0].record_count > 1000"
      query: |
        DELETE FROM {run.object.full_name}
        WHERE DATE(created_at) = CURRENT_DATE
        ORDER BY created_at
        LIMIT 500

Update Status Table

Track when a replication starts by updating a status table:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        INSERT INTO replication_status (
          stream_name, 
          start_time, 
          status
        ) VALUES (
          '{run.stream.name}',
          '{run.start_time}',
          'RUNNING'
        )

    - type: query
      connection: target_db
      query: file://path/to/file.sql

Data Quality Check

Verify data quality after loading and raise an alert if issues are found:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        WITH quality_check AS (
          SELECT 
            COUNT(*) as invalid_records
          FROM {run.object.full_name}
          WHERE email IS NULL 
            OR LENGTH(email) < 5 
            OR email NOT LIKE '%@%.%'
        )
        INSERT INTO data_quality_alerts (
          table_name,
          check_time,
          invalid_count,
          total_count
        )
        SELECT 
          '{run.object.full_name}',
          CURRENT_TIMESTAMP,
          invalid_records,
          {run.total_rows}
        FROM quality_check
        WHERE invalid_records > 0

Cleanup Old Data

Clean up old data before loading new data in incremental mode:

hooks:
  pre:
    - type: query
      connection: target_db
      query: |
        DELETE FROM {run.object.full_name}
        WHERE created_date < DATEADD(day, -90, CURRENT_DATE)
      on_failure: warn

Update Metadata

Update a metadata table after successful load:

hooks:
  post:
    - type: query
      connection: target_db
      if: run.status == "success"
      query: |
        MERGE INTO table_metadata t
        USING (
          SELECT 
            '{run.object.full_name}' as table_name,
            {run.total_rows} as row_count,
            '{run.end_time}' as last_updated
        ) s
        ON t.table_name = s.table_name
        WHEN MATCHED THEN
          UPDATE SET 
            row_count = s.total_rows,
            last_updated = s.last_updated
        WHEN NOT MATCHED THEN
          INSERT (table_name, row_count, last_updated)
          VALUES (s.table_name, s.total_rows, s.last_updated)

Validate and Rollback

Check data consistency and rollback if issues are found:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        DO $$
        DECLARE
          duplicate_count INT;
        BEGIN
          SELECT COUNT(*) INTO duplicate_count
          FROM (
            SELECT customer_id, COUNT(*)
            FROM {run.object.full_name}
            GROUP BY customer_id
            HAVING COUNT(*) > 1
          ) t;
          
          IF duplicate_count > 0 THEN
            RAISE EXCEPTION 'Found % duplicate customer records', duplicate_count;
          END IF;
        END $$;
      on_failure: abort

Aggregate Statistics

Calculate and store aggregated statistics after data load:

hooks:
  post:
    - type: query
      connection: target_db
      if: run.status == "success"
      query: |
        INSERT INTO sales_summary (
          date,
          total_sales,
          avg_order_value,
          order_count
        )
        SELECT
          DATE(order_date),
          SUM(amount),
          AVG(amount),
          COUNT(*)
        FROM {run.object.full_name}
        WHERE order_date >= CURRENT_DATE - INTERVAL '1 day'
        GROUP BY DATE(order_date)
        ON CONFLICT (date) DO UPDATE
        SET
          total_sales = EXCLUDED.total_sales,
          avg_order_value = EXCLUDED.avg_order_value,
          order_count = EXCLUDED.order_count
PreviousLogNextReplication

Last updated 3 days ago