# Use Hooks Data

Hooks can be used to query data and pass it to API endpoints for iteration. This pattern is powerful when you need to:

* Fetch a list of IDs or parameters from a database
* Use those values to make corresponding API calls
* Coordinate data between multiple sources

Learn more: [Query Hook](/concepts/hooks/query.md) | [Store Hook](/concepts/hooks/store.md) | [API Iteration](/concepts/api-specs/request.md#iteration-looping-requests)

## How Hook-Driven API Iteration Works

API endpoints can iterate over data provided via hooks using `context.store`:

1. Use a `query` hook with `into` parameter to fetch records and store them in the store
2. Reference the stored records in your API spec using `context.store.variable_name`
3. The API endpoint iterates over each record, making one API call per record
4. Each record's fields are accessible as `state.record_field`

The data flow:

```
Database Query ➡️ Hook Store ➡️ context.store ➡️ API Iteration ➡️ Target Database
```

## Query-Driven Ticker Data Collection

Fetch ticker symbols from a database and retrieve market data for each one from an API.

**Spec File** (`polygon.yaml`)

{% code title="polygon.yaml" overflow="wrap" %}

```yaml
name: Polygon
description: Polygon.io provides real-time and historical market data

defaults:
  state:
    base_url: https://api.polygon.io
  request:
    headers:
      Authorization: 'Bearer {require(secrets.api_key, "Polygon API key required")}'
    rate: 10
    concurrency: 3

endpoints:
  # This endpoint iterates over records provided via context.store
  options_daily_ticker_summary:
    description: "Daily open/close summary for options tickers"
    docs: https://polygon.io/docs/rest/options/aggregates/daily-ticker-summary

    # Require the hook to provide ticker_date_records
    iterate:
      over: require(context.store.ticker_date_records, "Must provide ticker_date_records via hook")
      into: "state.ticker_date_record"
      concurrency: 10

    state:
      # Extract fields from each record
      date: '{date_format(state.ticker_date_record.date, "%Y-%m-%d")}'
      ticker: '{require(state.ticker_date_record.ticker)}'

    request:
      url: '{state.base_url}/v1/open-close/{state.ticker}/{state.date}'

    response:
      records:
        jmespath: "@"  # Single object response
        primary_key: ["symbol", "from"]

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication with Query Hook**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_POLYGON_API
target: MY_TARGET_DB

hooks:
  start:
    # Query database to get list of tickers and dates to fetch
    - type: query
      connection: MY_TARGET_DB
      query: |
        SELECT
          ticker,
          date
        FROM public.active_tickers
        WHERE date >= CURRENT_DATE - INTERVAL '7 days'
        AND data_fetched = false
        ORDER BY date DESC, ticker
        LIMIT 100
      # Store results in the store for API to consume
      into: "ticker_date_records"

streams:
  # The endpoint will iterate over each ticker/date combination
  options_daily_ticker_summary:
    object: market_data.options_daily
```

{% endcode %}

Each record from the query (ticker + date) triggers one API call. The endpoint makes 100 API calls in this example.

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream
from sling.hooks import HookQuery, HookMap

replication = Replication(
    source='MY_POLYGON_API',
    target='MY_TARGET_DB',
    streams={
        'options_daily_ticker_summary': ReplicationStream(
            object='market_data.options_daily'
        )
    },
    hooks=HookMap(
        start=[
            HookQuery(
                connection='MY_TARGET_DB',
                query='''
                    SELECT ticker, date
                    FROM public.active_tickers
                    WHERE date >= CURRENT_DATE - INTERVAL '7 days'
                    AND data_fetched = false
                    LIMIT 100
                ''',
                into='ticker_date_records'
            )
        ]
    )
)

replication.run()
```

{% endcode %}

## Date Range Generation with Store Hook

Use store hooks to build date ranges that drive API iteration.

**Spec File** (`analytics_api.yaml`)

{% code title="analytics\_api.yaml" overflow="wrap" %}

```yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://api.analytics.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  daily_metrics:
    description: "Get daily metrics for dates provided via context.store"

    iterate:
      over: require(context.store.date_list, "Must provide date_list via hook")
      into: "state.current_date"
      concurrency: 5

    state:
      date: '{date_format(state.current_date, "%Y-%m-%d")}'

    request:
      url: "{state.base_url}/metrics/daily"
      parameters:
        date: "{state.date}"

    response:
      records:
        jmespath: "data.metrics[]"
        primary_key: ["metric_id", "date"]

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication with Store Hook**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

hooks:
  start:
    # Generate list of dates to process
    - type: store
      key: date_list
      value: >
        {range(
          date_format(date_add(now(), -7, "day"), "%Y-%m-%d"),
          date_format(now(), "%Y-%m-%d"),
          "1d"
        )}

streams:
  daily_metrics:
    object: analytics.daily_metrics
```

{% endcode %}

The `range()` function generates an array of dates, stored in `context.store.date_list`, which the API endpoint iterates over.

## Customer IDs with Record Enrichment

Query customer IDs and use them to fetch detailed customer data from an API.

**Spec File** (`customers_api.yaml`)

{% code title="customers\_api.yaml" overflow="wrap" %}

```yaml
name: "Customers API"

defaults:
  state:
    base_url: https://api.customers.com/v2
  request:
    headers:
      X-API-Key: "{secrets.api_key}"

endpoints:
  customer_details:
    description: "Get detailed customer information by ID"

    iterate:
      over: require(context.store.customer_records, "Must provide customer_records via hook")
      into: "state.customer_record"
      concurrency: 10

    state:
      customer_id: '{state.customer_record.customer_id}'
      # Can also access other fields from the query
      region: '{state.customer_record.region}'

    request:
      url: "{state.base_url}/customers/{state.customer_id}"
      parameters:
        include_details: "true"
        region: "{state.region}"

    response:
      records:
        jmespath: "@"
        primary_key: ["customer_id"]

      processors:
        # Add the region from our query to the response
        - expression: state.region
          output: record.source_region

    overrides:
      mode: incremental
```

{% endcode %}

***

**Using Replication**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

streams:
  customer_details:
    object: public.customer_details

    hooks:
      pre:
        # Query customers that need enrichment
        - type: query
          connection: MY_TARGET_DB
          query: |
            SELECT
              c.customer_id,
              c.region,
              c.last_updated
            FROM public.customers c
            LEFT JOIN public.customer_details cd
              ON c.customer_id = cd.customer_id
            WHERE cd.customer_id IS NULL
              OR c.last_updated > cd.details_fetched_at
            LIMIT 500
          into: "customer_records"

      post:
        # Mark customers as enriched
        - type: query
          connection: MY_TARGET_DB
          if: run.status == "success"
          query: |
            UPDATE public.customer_details
            SET details_fetched_at = CURRENT_TIMESTAMP
            WHERE customer_id IN (
              SELECT DISTINCT customer_id
              FROM public.customer_details
              WHERE details_fetched_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
            )
```

{% endcode %}

## Processor Output to Hooks Integration

Use processor outputs (`env.*` and `context.store.*`) to pass aggregated data from API responses to hooks for validation, logging, or conditional logic.

**Spec File** (`orders_api.yaml`)

{% code title="orders\_api.yaml" overflow="wrap" %}

```yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.orders.com/v1
  request:
    headers:
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  orders:
    description: "Fetch orders with metadata tracking"

    request:
      url: "{state.base_url}/orders"
      parameters:
        status: "completed"
        limit: 100

    response:
      records:
        jmespath: "data.orders[]"
        primary_key: ["order_id"]

      processors:
        # Store the maximum timestamp in environment variable
        - expression: "record.updated_at"
          output: "env.MAX_ORDER_TIMESTAMP"
          aggregation: "maximum"

        # Store the first order ID in replication store
        - expression: "record.order_id"
          output: "context.store.first_order_id"
          aggregation: "first"

        # Store the last order ID in replication store
        - expression: "record.order_id"
          output: "context.store.last_order_id"
          aggregation: "last"

        # Track total order count
        - expression: "1"
          output: "context.store.order_count"
          aggregation: "collect"

    overrides:
      mode: full-refresh
```

{% endcode %}

***

**Using Replication with End Hooks**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_ORDERS_API
target: MY_TARGET_DB

streams:
  orders:
    object: sales.orders

hooks:
  start:
    - type: log
      message: "Starting order sync..."

  end:
    # Validate that we got data
    - type: check
      check: store.first_order_id != nil
      on_failure: fail
      message: "No orders were processed!"

    # Log processing summary using values from processors
    - type: log
      message: |
        Orders sync completed successfully!
        ========================================
        First Order ID: {store.first_order_id}
        Last Order ID:  {store.last_order_id}
        Total Orders:   {len(store.order_count)}
        Max Timestamp:  {env.MAX_ORDER_TIMESTAMP}

    # Update metadata table with sync information
    - type: query
      connection: MY_TARGET_DB
      if: execution.status.error == 0
      query: |
        INSERT INTO sales.sync_metadata (
          sync_date,
          first_order_id,
          last_order_id,
          order_count,
          max_timestamp
        ) VALUES (
          CURRENT_TIMESTAMP,
          '{store.first_order_id}',
          '{store.last_order_id}',
          {len(store.order_count)},
          '{env.MAX_ORDER_TIMESTAMP}'
        )

    # Conditional validation based on order count
    - type: check
      check: len(store.order_count) >= 10
      on_failure: log
      message: "Warning: Fewer than 10 orders processed ({len(store.order_count)})"
```

{% endcode %}

***

**Using Python**

{% code title="orders\_sync.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream
from sling.hooks import HookLog, HookCheck, HookQuery, HookMap

replication = Replication(
    source='MY_ORDERS_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='sales.orders'
        )
    },
    hooks=HookMap(
        start=[
            HookLog(message="Starting order sync...")
        ],
        end=[
            HookCheck(
                check='store.first_order_id != nil',
                on_failure='fail',
                message='No orders were processed!'
            ),
            HookLog(
                message='''Orders sync completed successfully!
First Order ID: {store.first_order_id}
Last Order ID:  {store.last_order_id}
Total Orders:   {len(store.order_count)}
Max Timestamp:  {env.MAX_ORDER_TIMESTAMP}'''
            ),
            HookQuery(
                connection='MY_TARGET_DB',
                if_='execution.status.error == 0',
                query='''INSERT INTO sales.sync_metadata
                         (sync_date, first_order_id, last_order_id, order_count, max_timestamp)
                         VALUES (CURRENT_TIMESTAMP, '{store.first_order_id}',
                                '{store.last_order_id}', {len(store.order_count)},
                                '{env.MAX_ORDER_TIMESTAMP}')'''
            )
        ]
    )
)

replication.run()
```

{% endcode %}

This pattern is useful for:

* Tracking metadata about API responses (record counts, date ranges, etc.)
* Validating data quality before committing to the target
* Logging detailed sync information
* Conditional hook execution based on aggregated values
* Updating audit or metadata tables with sync statistics

## Setting Environment Variables for API Authentication

Use store hooks with `env.*` prefix to set environment variables that are available during API spec rendering. This is particularly powerful when you need to:

* Dynamically compute authentication parameters before API calls
* Inject values into authentication blocks before spec compilation
* Pass computed values to dynamic endpoints
* Use values from database queries or previous API calls in authentication

The key advantage: environment variables set via `env.*` hooks are available **before** the API spec is compiled/rendered, making them usable in authentication blocks and dynamic\_endpoints blocks.

**Spec File** (`api_with_dynamic_auth.yaml`)

{% code title="api\_with\_dynamic\_auth.yaml" overflow="wrap" %}

```yaml
name: "API with Dynamic Auth"
description: "Demonstrates using environment variables set by hooks in authentication"

# Environment variables set by hooks are available here during rendering
authentication:
  type: basic
  username: "{env.COMPUTED_USERNAME}"
  password: "{env.COMPUTED_PASSWORD}"

defaults:
  state:
    base_url: https://api.example.com/v1
  request:
    headers:
      X-Environment: "{env.DEPLOYMENT_ENV}"

endpoints:
  users:
    description: "Fetch users with dynamically configured authentication"

    request:
      url: "{state.base_url}/users"
      parameters:
        # Can also use env vars in request parameters
        api_version: "{env.API_VERSION}"

    response:
      records:
        jmespath: "data.users[]"
        primary_key: ["user_id"]

# Environment variables are also available in dynamic endpoints
dynamic_endpoints:
  - iterate: '{split(env.RESOURCE_LIST, ",")}'
    into: "state.resource_name"
    endpoint:
      name: "data_{state.resource_name}"
      request:
        url: "{state.base_url}/{state.resource_name}"
        headers:
          X-Resource-Token: "{env.RESOURCE_TOKEN}"
      response:
        records:
          jmespath: "data[]"
```

{% endcode %}

***

**Using Pipeline with Environment Variable Setup**

Running with Sling: `sling run -p /path/to/pipeline.yaml`

{% code title="pipeline.yaml" overflow="wrap" %}

```yaml
steps:
  # Step 1: Set environment variables before replication runs
  - type: store
    key: env.COMPUTED_USERNAME
    value: "api_user_prod"

  - type: store
    key: env.COMPUTED_PASSWORD
    value: >
      {base64_encode("secure_password_123")}

  - type: store
    key: env.DEPLOYMENT_ENV
    value: "production"

  - type: store
    key: env.API_VERSION
    value: "v2"

  # Step 2: Query database to get resource list
  - type: query
    connection: MY_CONFIG_DB
    query: |
      SELECT string_agg(resource_name, ',') as resources
      FROM api_resources
      WHERE enabled = true
    into: resource_query

  # Step 3: Set resource list/token as environment variable
  - type: store
    map:
      env.RESOURCE_LIST: '{store.resource_query[0].resources}'
      env.RESOURCE_TOKEN: '{sha256(env.COMPUTED_USERNAME + "-" + now())}'

  # Step 4: Log the configuration
  - type: log
    message: |
      API Configuration:
      - Username: {env.COMPUTED_USERNAME}
      - Environment: {env.DEPLOYMENT_ENV}
      - API Version: {env.API_VERSION}
      - Resources: {env.RESOURCE_LIST}

  # Step 5: Run replication with dynamically configured API
  - type: replication
    path: /path/to/replication.yaml
```

{% endcode %}

***

**Replication File**

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API_WITH_DYNAMIC_AUTH
target: MY_TARGET_DB

streams:
  users:
    object: public.api_users

  # Dynamic endpoints will be created based on RESOURCE_LIST
  # e.g., data_customers, data_products, data_orders
```

{% endcode %}

***

**Key Benefits**

1. **Dynamic Authentication**: Compute credentials at runtime (e.g., from database queries, secrets managers, or transformations)
2. **Pre-Compilation Configuration**: Environment variables set via `env.*` hooks are available during API spec rendering, allowing use in:
   * Authentication blocks
   * Dynamic endpoint definitions
   * Default configurations
3. **Separation of Concerns**: Keep sensitive or dynamic values out of spec files
4. **Database-Driven Configuration**: Query databases to determine which endpoints to call or which credentials to use
5. **Multi-Environment Support**: Dynamically configure API behavior based on deployment environment without changing spec files

**Common Use Cases:**

* Rotating API keys fetched from a secrets manager
* Environment-specific endpoints (dev/staging/prod)
* Database-driven resource lists for dynamic endpoints
* Computed authentication tokens based on current state
* Multi-tenant API configurations


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/examples/api-to-database/with-hooks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
