Use Hooks Data

Examples of using data from hooks/steps to drive API iteration

Hooks can be used to query data and pass it to API endpoints for iteration. This pattern is powerful when you need to:

  • Fetch a list of IDs or parameters from a database

  • Use those values to make corresponding API calls

  • Coordinate data between multiple sources

Learn more: Query Hook | Store Hook | API Iteration

How Hook-Driven API Iteration Works

API endpoints can iterate over data provided via hooks using context.store:

  1. Use a query hook with into parameter to fetch records and store them in the store

  2. Reference the stored records in your API spec using context.store.variable_name

  3. The API endpoint iterates over each record, making one API call per record

  4. Each record's fields are accessible as state.record_field

The data flow:

Database Query ➡️ Hook Store ➡️ context.store ➡️ API Iteration ➡️ Target Database

Query-Driven Ticker Data Collection

Fetch ticker symbols from a database and retrieve market data for each one from an API.

Spec File (polygon.yaml)

polygon.yaml
name: Polygon
description: Polygon.io provides real-time and historical market data

defaults:
  state:
    base_url: https://api.polygon.io
  request:
    headers:
      Authorization: 'Bearer {require(secrets.api_key, "Polygon API key required")}'
    rate: 10
    concurrency: 3

endpoints:
  # This endpoint iterates over records provided via context.store
  options_daily_ticker_summary:
    description: "Daily open/close summary for options tickers"
    docs: https://polygon.io/docs/rest/options/aggregates/daily-ticker-summary

    # Require the hook to provide ticker_date_records
    iterate:
      over: require(context.store.ticker_date_records, "Must provide ticker_date_records via hook")
      into: "state.ticker_date_record"
      concurrency: 10

    state:
      # Extract fields from each record
      date: '{date_format(state.ticker_date_record.date, "%Y-%m-%d")}'
      ticker: '{require(state.ticker_date_record.ticker)}'

    request:
      url: '{state.base_url}/v1/open-close/{state.ticker}/{state.date}'

    response:
      records:
        jmespath: "@"  # Single object response
        primary_key: ["symbol", "from"]

    overrides:
      mode: incremental

Using Replication with Query Hook

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_POLYGON_API
target: MY_TARGET_DB

hooks:
  start:
    # Query database to get list of tickers and dates to fetch
    - type: query
      connection: MY_TARGET_DB
      query: |
        SELECT
          ticker,
          date
        FROM public.active_tickers
        WHERE date >= CURRENT_DATE - INTERVAL '7 days'
        AND data_fetched = false
        ORDER BY date DESC, ticker
        LIMIT 100
      # Store results in the store for API to consume
      into: "ticker_date_records"

streams:
  # The endpoint will iterate over each ticker/date combination
  options_daily_ticker_summary:
    object: market_data.options_daily

Each record from the query (ticker + date) triggers one API call. The endpoint makes 100 API calls in this example.


Using Python

api_to_database.py
from sling import Replication, ReplicationStream
from sling.hooks import HookQuery, HookMap

replication = Replication(
    source='MY_POLYGON_API',
    target='MY_TARGET_DB',
    streams={
        'options_daily_ticker_summary': ReplicationStream(
            object='market_data.options_daily'
        )
    },
    hooks=HookMap(
        start=[
            HookQuery(
                connection='MY_TARGET_DB',
                query='''
                    SELECT ticker, date
                    FROM public.active_tickers
                    WHERE date >= CURRENT_DATE - INTERVAL '7 days'
                    AND data_fetched = false
                    LIMIT 100
                ''',
                into='ticker_date_records'
            )
        ]
    )
)

replication.run()

Date Range Generation with Store Hook

Use store hooks to build date ranges that drive API iteration.

Spec File (analytics_api.yaml)

analytics_api.yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://api.analytics.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  daily_metrics:
    description: "Get daily metrics for dates provided via context.store"

    iterate:
      over: require(context.store.date_list, "Must provide date_list via hook")
      into: "state.current_date"
      concurrency: 5

    state:
      date: '{date_format(state.current_date, "%Y-%m-%d")}'

    request:
      url: "{state.base_url}/metrics/daily"
      parameters:
        date: "{state.date}"

    response:
      records:
        jmespath: "data.metrics[]"
        primary_key: ["metric_id", "date"]

    overrides:
      mode: incremental

Using Replication with Store Hook

replication.yaml
source: MY_API
target: MY_TARGET_DB

hooks:
  start:
    # Generate list of dates to process
    - type: store
      key: date_list
      value: >
        {range(
          date_format(date_add(now(), -7, "day"), "%Y-%m-%d"),
          date_format(now(), "%Y-%m-%d"),
          "1d"
        )}

streams:
  daily_metrics:
    object: analytics.daily_metrics

The range() function generates an array of dates, stored in context.store.date_list, which the API endpoint iterates over.

Customer IDs with Record Enrichment

Query customer IDs and use them to fetch detailed customer data from an API.

Spec File (customers_api.yaml)

customers_api.yaml
name: "Customers API"

defaults:
  state:
    base_url: https://api.customers.com/v2
  request:
    headers:
      X-API-Key: "{secrets.api_key}"

endpoints:
  customer_details:
    description: "Get detailed customer information by ID"

    iterate:
      over: require(context.store.customer_records, "Must provide customer_records via hook")
      into: "state.customer_record"
      concurrency: 10

    state:
      customer_id: '{state.customer_record.customer_id}'
      # Can also access other fields from the query
      region: '{state.customer_record.region}'

    request:
      url: "{state.base_url}/customers/{state.customer_id}"
      parameters:
        include_details: "true"
        region: "{state.region}"

    response:
      records:
        jmespath: "@"
        primary_key: ["customer_id"]

      processors:
        # Add the region from our query to the response
        - expression: state.region
          output: record.source_region

    overrides:
      mode: incremental

Using Replication

replication.yaml
source: MY_API
target: MY_TARGET_DB

streams:
  customer_details:
    object: public.customer_details

    hooks:
      pre:
        # Query customers that need enrichment
        - type: query
          connection: MY_TARGET_DB
          query: |
            SELECT
              c.customer_id,
              c.region,
              c.last_updated
            FROM public.customers c
            LEFT JOIN public.customer_details cd
              ON c.customer_id = cd.customer_id
            WHERE cd.customer_id IS NULL
              OR c.last_updated > cd.details_fetched_at
            LIMIT 500
          into: "customer_records"

      post:
        # Mark customers as enriched
        - type: query
          connection: MY_TARGET_DB
          if: run.status == "success"
          query: |
            UPDATE public.customer_details
            SET details_fetched_at = CURRENT_TIMESTAMP
            WHERE customer_id IN (
              SELECT DISTINCT customer_id
              FROM public.customer_details
              WHERE details_fetched_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
            )

Processor Output to Hooks Integration

Use processor outputs (env.* and context.store.*) to pass aggregated data from API responses to hooks for validation, logging, or conditional logic.

Spec File (orders_api.yaml)

orders_api.yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.orders.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  orders:
    description: "Fetch orders with metadata tracking"

    request:
      url: "{state.base_url}/orders"
      parameters:
        status: "completed"
        limit: 100

    response:
      records:
        jmespath: "data.orders[]"
        primary_key: ["order_id"]

      processors:
        # Store the maximum timestamp in environment variable
        - expression: "record.updated_at"
          output: "env.MAX_ORDER_TIMESTAMP"
          aggregation: "maximum"

        # Store the first order ID in replication store
        - expression: "record.order_id"
          output: "context.store.first_order_id"
          aggregation: "first"

        # Store the last order ID in replication store
        - expression: "record.order_id"
          output: "context.store.last_order_id"
          aggregation: "last"

        # Track total order count
        - expression: "1"
          output: "context.store.order_count"
          aggregation: "collect"

    overrides:
      mode: full-refresh

Using Replication with End Hooks

replication.yaml
source: MY_ORDERS_API
target: MY_TARGET_DB

streams:
  orders:
    object: sales.orders

hooks:
  start:
    - type: log
      message: "Starting order sync..."

  end:
    # Validate that we got data
    - type: check
      check: store.first_order_id != nil
      on_failure: fail
      message: "No orders were processed!"

    # Log processing summary using values from processors
    - type: log
      message: |
        Orders sync completed successfully!
        ========================================
        First Order ID: {store.first_order_id}
        Last Order ID:  {store.last_order_id}
        Total Orders:   {len(store.order_count)}
        Max Timestamp:  {env.MAX_ORDER_TIMESTAMP}

    # Update metadata table with sync information
    - type: query
      connection: MY_TARGET_DB
      if: execution.status.error == 0
      query: |
        INSERT INTO sales.sync_metadata (
          sync_date,
          first_order_id,
          last_order_id,
          order_count,
          max_timestamp
        ) VALUES (
          CURRENT_TIMESTAMP,
          '{store.first_order_id}',
          '{store.last_order_id}',
          {len(store.order_count)},
          '{env.MAX_ORDER_TIMESTAMP}'
        )

    # Conditional validation based on order count
    - type: check
      check: len(store.order_count) >= 10
      on_failure: log
      message: "Warning: Fewer than 10 orders processed ({len(store.order_count)})"

Using Python

orders_sync.py
from sling import Replication, ReplicationStream
from sling.hooks import HookLog, HookCheck, HookQuery, HookMap

replication = Replication(
    source='MY_ORDERS_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='sales.orders'
        )
    },
    hooks=HookMap(
        start=[
            HookLog(message="Starting order sync...")
        ],
        end=[
            HookCheck(
                check='store.first_order_id != nil',
                on_failure='fail',
                message='No orders were processed!'
            ),
            HookLog(
                message='''Orders sync completed successfully!
First Order ID: {store.first_order_id}
Last Order ID:  {store.last_order_id}
Total Orders:   {len(store.order_count)}
Max Timestamp:  {env.MAX_ORDER_TIMESTAMP}'''
            ),
            HookQuery(
                connection='MY_TARGET_DB',
                if_='execution.status.error == 0',
                query='''INSERT INTO sales.sync_metadata
                         (sync_date, first_order_id, last_order_id, order_count, max_timestamp)
                         VALUES (CURRENT_TIMESTAMP, '{store.first_order_id}',
                                '{store.last_order_id}', {len(store.order_count)},
                                '{env.MAX_ORDER_TIMESTAMP}')'''
            )
        ]
    )
)

replication.run()

This pattern is useful for:

  • Tracking metadata about API responses (record counts, date ranges, etc.)

  • Validating data quality before committing to the target

  • Logging detailed sync information

  • Conditional hook execution based on aggregated values

  • Updating audit or metadata tables with sync statistics

Setting Environment Variables for API Authentication

Use store hooks with env.* prefix to set environment variables that are available during API spec rendering. This is particularly powerful when you need to:

  • Dynamically compute authentication parameters before API calls

  • Inject values into authentication blocks before spec compilation

  • Pass computed values to dynamic endpoints

  • Use values from database queries or previous API calls in authentication

The key advantage: environment variables set via env.* hooks are available before the API spec is compiled/rendered, making them usable in authentication blocks and dynamic_endpoints blocks.

Spec File (api_with_dynamic_auth.yaml)

api_with_dynamic_auth.yaml
name: "API with Dynamic Auth"
description: "Demonstrates using environment variables set by hooks in authentication"

# Environment variables set by hooks are available here during rendering
authentication:
  type: basic
  username: "{env.COMPUTED_USERNAME}"
  password: "{env.COMPUTED_PASSWORD}"

defaults:
  state:
    base_url: https://api.example.com/v1
  request:
    headers:
      X-Environment: "{env.DEPLOYMENT_ENV}"

endpoints:
  users:
    description: "Fetch users with dynamically configured authentication"

    request:
      url: "{state.base_url}/users"
      parameters:
        # Can also use env vars in request parameters
        api_version: "{env.API_VERSION}"

    response:
      records:
        jmespath: "data.users[]"
        primary_key: ["user_id"]

# Environment variables are also available in dynamic endpoints
dynamic_endpoints:
  - iterate: '{split(env.RESOURCE_LIST, ",")}'
    into: "state.resource_name"
    endpoint:
      name: "data_{state.resource_name}"
      request:
        url: "{state.base_url}/{state.resource_name}"
        headers:
          X-Resource-Token: "{env.RESOURCE_TOKEN}"
      response:
        records:
          jmespath: "data[]"

Using Pipeline with Environment Variable Setup

Running with Sling: sling run -p /path/to/pipeline.yaml

pipeline.yaml
steps:
  # Step 1: Set environment variables before replication runs
  - type: store
    key: env.COMPUTED_USERNAME
    value: "api_user_prod"

  - type: store
    key: env.COMPUTED_PASSWORD
    value: >
      {base64_encode("secure_password_123")}

  - type: store
    key: env.DEPLOYMENT_ENV
    value: "production"

  - type: store
    key: env.API_VERSION
    value: "v2"

  # Step 2: Query database to get resource list
  - type: query
    connection: MY_CONFIG_DB
    query: |
      SELECT string_agg(resource_name, ',') as resources
      FROM api_resources
      WHERE enabled = true
    into: resource_query

  # Step 3: Set resource list/token as environment variable
  - type: store
    map:
      env.RESOURCE_LIST: '{store.resource_query[0].resources}'
      env.RESOURCE_TOKEN: '{sha256(env.COMPUTED_USERNAME + "-" + now())}'

  # Step 4: Log the configuration
  - type: log
    message: |
      API Configuration:
      - Username: {env.COMPUTED_USERNAME}
      - Environment: {env.DEPLOYMENT_ENV}
      - API Version: {env.API_VERSION}
      - Resources: {env.RESOURCE_LIST}

  # Step 5: Run replication with dynamically configured API
  - type: replication
    path: /path/to/replication.yaml

Replication File

replication.yaml
source: MY_API_WITH_DYNAMIC_AUTH
target: MY_TARGET_DB

streams:
  users:
    object: public.api_users

  # Dynamic endpoints will be created based on RESOURCE_LIST
  # e.g., data_customers, data_products, data_orders

Key Benefits

  1. Dynamic Authentication: Compute credentials at runtime (e.g., from database queries, secrets managers, or transformations)

  2. Pre-Compilation Configuration: Environment variables set via env.* hooks are available during API spec rendering, allowing use in:

    • Authentication blocks

    • Dynamic endpoint definitions

    • Default configurations

  3. Separation of Concerns: Keep sensitive or dynamic values out of spec files

  4. Database-Driven Configuration: Query databases to determine which endpoints to call or which credentials to use

  5. Multi-Environment Support: Dynamically configure API behavior based on deployment environment without changing spec files

Common Use Cases:

  • Rotating API keys fetched from a secrets manager

  • Environment-specific endpoints (dev/staging/prod)

  • Database-driven resource lists for dynamic endpoints

  • Computed authentication tokens based on current state

  • Multi-tenant API configurations

Last updated

Was this helpful?