# API to Database

Sling can extract data from REST and GraphQL APIs using YAML specification files. These specifications define how to authenticate, paginate, and extract data from API endpoints.

See [API Spec](/concepts/api-specs.md) for detailed information about building API specifications.

<details>

<summary>Basic REST API with Offset Pagination</summary>

This example shows a simple API with offset-based pagination.

Learn more: [Pagination](/concepts/api-specs/advanced.md#pagination) • [Response Processing](/concepts/api-specs/response.md)

**Spec File** (`users_api.yaml`)

{% code title="users\_api.yaml" overflow="wrap" %}

```yaml
name: "Users API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Accept: "application/json"

endpoints:
  users:
    state:
      limit: 100
      offset: 0  # Start at the beginning

    request:
      url: '{state.base_url}/users'
      method: GET
      parameters:
        limit: '{state.limit}'
        offset: '{state.offset}'  # Pass offset as query parameter

    pagination:
      next_state:
        # Increment offset by limit for next page
        offset: '{state.offset + state.limit}'
      # Stop when fewer records returned than requested
      stop_condition: length(response.records) < state.limit

    response:
      records:
        # Extract records array from response using JMESPath
        jmespath: "data[]"
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  users:
    object: public.users
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'users': ReplicationStream(
            object='public.users',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Cursor-Based Pagination API</summary>

This example demonstrates cursor-based pagination similar to Stripe's API pattern.

Learn more: [Pagination](/concepts/api-specs/advanced.md#pagination) • [Authentication](/concepts/api-specs/authentication.md)

**Spec File** (`transactions_api.yaml`)

{% code title="transactions\_api.yaml" overflow="wrap" %}

```yaml
name: "Transactions API"

defaults:
  state:
    base_url: https://api.example.com/v1
  request:
    headers:
      Accept: "application/json"
      # Bearer token from secrets defined in env.yaml
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  transactions:
    state:
      limit: 100
      starting_after: null  # First page has no cursor

    request:
      url: '{state.base_url}/transactions'
      method: GET
      parameters:
        limit: '{state.limit}'
        starting_after: '{state.starting_after}'

    pagination:
      next_state:
        # Use ID of last record as cursor for next page
        starting_after: '{response.records[-1].id}'
      # Stop when API indicates no more pages or no records returned
      stop_condition: response.json.has_more == false || length(response.records) < 1

    response:
      records:
        jmespath: "data[]"
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  transactions:
    object: public.transactions
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

# Set API credentials
os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'transactions': ReplicationStream(
            object='public.transactions',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Incremental Sync with Timestamps</summary>

This example shows how to fetch only new or updated records using timestamp-based incremental synchronization.

Learn more: [Incremental Sync](/concepts/api-specs/advanced.md#incremental-synchronization) • [Processors](/concepts/api-specs/advanced.md#data-processors)

**Spec File** (`orders_api.yaml`)

{% code title="orders\_api.yaml" overflow="wrap" %}

```yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_token}"

endpoints:
  orders:
    state:
      # Use last sync time if available, otherwise default to 30 days ago
      updated_at_min: '{coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z"))}'
      current_max_updated_at: '{state.updated_at_min}'
      limit: 100
      offset: 0

    # Persist last_updated_at for next run
    sync: [last_updated_at]

    request:
      url: '{state.base_url}/orders'
      method: GET
      parameters:
        # Filter API to only return records updated after this timestamp
        updated_at_min: '{state.updated_at_min}'
        limit: '{state.limit}'
        offset: '{state.offset}'

    pagination:
      next_state:
        offset: '{state.offset + state.limit}'
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
      processors:
        # Track the maximum updated_at timestamp across all records
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"  # Save highest timestamp for next sync
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  orders:
    object: public.orders

env:
  SLING_STATE: postgres/sling_state.my_api # one state table per replication to persist
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_TOKEN'] = 'your_api_token'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='public.orders',
            mode=Mode.INCREMENTAL,
            primary_key=['id'],
            update_key='updated_at'
        )
    },
    env={'SLING_STATE': 'postgres/sling_state.my_api'}
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Nested Data Extraction with JMESPath</summary>

This example demonstrates extracting nested data from complex API responses.

Learn more: [Response Processing](/concepts/api-specs/response.md) • [JMESPath Expressions](/concepts/api-specs/response.md#projection-and-transformation)

**Spec File** (`products_api.yaml`)

{% code title="products\_api.yaml" overflow="wrap" %}

```yaml
name: "Products API"

defaults:
  state:
    base_url: https://api.store.com/v2
  request:
    headers:
      Accept: "application/json"
      X-API-Key: '{secrets.api_key}'

endpoints:
  products:
    state:
      page: 1
      per_page: 50

    request:
      url: '{state.base_url}/products'
      method: GET
      parameters:
        page: '{state.page}'
        per_page: '{state.per_page}'

    pagination:
      next_state:
        page: '{state.page + 1}'
      stop_condition: length(response.records) < state.per_page

    response:
      records:
        # JMESPath projection to flatten and reshape nested data
        # Maps: product_id -> id, product_name -> name, pricing.amount -> price, etc.
        jmespath: >
          result.items[].{
            id: product_id,
            name: product_name,
            price: pricing.amount,
            currency: pricing.currency,
            category: metadata.category,
            tags: tags[].name
          }
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  products:
    object: public.products
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'products': ReplicationStream(
            object='public.products',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>POST Request with JSON Payload</summary>

This example shows how to send POST requests with a JSON body to an API.

Learn more: [Request Configuration](/concepts/api-specs/request.md) • [Dynamic Values](/concepts/api-specs/structure.md#state-variables)

**Spec File** (`analytics_api.yaml`)

{% code title="analytics\_api.yaml" overflow="wrap" %}

```yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://analytics.example.com/api
  request:
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer {secrets.access_token}"

endpoints:
  events:
    state:
      # Use env vars if available, otherwise default to last 7 days
      start_date: '{coalesce(env.START_DATE, date_format(date_add(now(), -7, "day"), "%Y-%m-%d"))}'
      end_date: '{coalesce(env.END_DATE, date_format(now(), "%Y-%m-%d"))}'

    request:
      url: '{state.base_url}/events/query'
      method: POST
      payload:
        # Request body sent as JSON
        date_range:
          start: '{state.start_date}'
          end: '{state.end_date}'
        metrics: ["views", "clicks", "conversions"]
        dimensions: ["event_name", "user_id"]

    response:
      records:
        jmespath: "data.events[]"
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  events:
    object: analytics.events

env:
  START_DATE: ${START_DATE}  # passed as env var
  END_DATE: ${END_DATE}      # passed as env var
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

os.environ['ACCESS_TOKEN'] = 'your_access_token'
os.environ['START_DATE'] = '2024-01-01'
os.environ['END_DATE'] = '2024-01-31'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'events': ReplicationStream(
            object='analytics.events',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Dynamic URL Path Parameters</summary>

This example demonstrates building URLs with dynamic path parameters.

Learn more: [Request Configuration](/concepts/api-specs/request.md) • [State Variables](/concepts/api-specs/structure.md#state-variables)

**Spec File** (`customer_details_api.yaml`)

{% code title="customer\_details\_api.yaml" overflow="wrap" %}

```yaml
name: "Customer Details API"

defaults:
  state:
    base_url: https://api.crm.com/v1
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  customer_profile:
    state:
      # require() will fail if env var is not set
      customer_id: '{require(env.CUSTOMER_ID)}'

    request:
      # Use state variable as part of URL path
      url: '{state.base_url}/customers/{state.customer_id}/profile'
      method: GET

    response:
      records:
        # Wrap single object in array
        jmespath: "[customer]"
      processors:
        # Add customer_id to the record for reference
        - expression: "state.customer_id"
          output: "record.customer_id"
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  customer_profile:
    object: public.customer_profiles

env:
  CUSTOMER_ID: 'cust_12345'
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'
os.environ['CUSTOMER_ID'] = 'cust_12345'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'customer_profile': ReplicationStream(
            object='public.customer_profiles',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>GraphQL API</summary>

This example shows how to query a GraphQL API.

Learn more: [Request Configuration](/concepts/api-specs/request.md) • [Response Processing](/concepts/api-specs/response.md)

**Spec File** (`github_graphql_api.yaml`)

{% code title="github\_graphql\_api.yaml" overflow="wrap" %}

```yaml
name: "GitHub GraphQL API"

defaults:
  state:
    base_url: https://api.github.com
  request:
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer {secrets.github_token}"

endpoints:
  repositories:
    state:
      cursor: null  # First page has no cursor
      org_name: '{require(env.GITHUB_ORG)}'

    request:
      url: '{state.base_url}/graphql'
      method: POST
      payload:
        # GraphQL query as a string
        query: |
          query($org: String!, $cursor: String) {
            organization(login: $org) {
              repositories(first: 50, after: $cursor) {
                pageInfo {
                  hasNextPage
                  endCursor
                }
                nodes {
                  name
                  description
                  stargazerCount
                  forkCount
                  createdAt
                  updatedAt
                }
              }
            }
          }
        # GraphQL variables from state
        variables:
          org: '{state.org_name}'
          cursor: '{state.cursor}'

    pagination:
      next_state:
        # Extract next cursor from GraphQL response
        cursor: '{jmespath("response.json", "data.organization.repositories.pageInfo.endCursor")}'
      # GraphQL provides hasNextPage in pageInfo
      stop_condition: 'jmespath("response.json", "data.organization.repositories.pageInfo.hasNextPage") == false'

    response:
      records:
        # Extract nodes array from GraphQL response
        jmespath: "data.organization.repositories.nodes[]"
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  repositories:
    object: public.github_repositories

env:
  GITHUB_ORG: 'your_organization'
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

os.environ['GITHUB_TOKEN'] = 'your_github_token'
os.environ['GITHUB_ORG'] = 'your_organization'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'repositories': ReplicationStream(
            object='public.github_repositories',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
```

{% endcode %}

</details>

<details>

<summary>Multi-Endpoint Queue Pattern</summary>

This example demonstrates passing data between endpoints using queues.

Learn more: [Queues](/concepts/api-specs/queues.md) • [Iteration](/concepts/api-specs/request.md#iteration-looping-requests) • [Processors](/concepts/api-specs/advanced.md#data-processors)

**Spec File** (`ecommerce_api.yaml`)

{% code title="ecommerce\_api.yaml" overflow="wrap" %}

```yaml
name: "E-commerce API"

# Declare queues to pass data between endpoints
queues:
  - order_ids

defaults:
  state:
    base_url: https://api.shop.com/v1
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  # Step 1: List all orders and collect their IDs
  list_orders:
    state:
      page: 1
      limit: 100

    request:
      url: '{state.base_url}/orders'
      method: GET
      parameters:
        page: '{state.page}'
        limit: '{state.limit}'

    pagination:
      next_state:
        page: '{state.page + 1}'
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
      processors:
        # Send each order ID to the queue for the next endpoint
        - expression: "record.id"
          output: "queue.order_ids"

  # Step 2: Fetch details for each order from the queue
  order_details:
    description: "Fetch detailed information for each order"

    iterate:
      # Process each order ID from the queue
      over: "queue.order_ids"
      into: "state.order_id"
      concurrency: 10  # Process 10 orders concurrently

    request:
      # Use the current order ID from iteration in URL
      url: '{state.base_url}/orders/{state.order_id}/details'
      method: GET

    response:
      records:
        jmespath: "[order_detail]"
      processors:
        # Add the order_id to the detail record for reference
        - expression: "state.order_id"
          output: "record.order_id"
```

{% endcode %}

***

**Using Replication**

Running with Sling: `sling run -r /path/to/replication.yaml`

{% code title="replication.yaml" overflow="wrap" %}

```yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  list_orders:
    object: public.orders

  order_details:
    object: public.order_details

env:
  SLING_THREADS: 2
```

{% endcode %}

***

**Using Python**

{% code title="api\_to\_database.py" overflow="wrap" %}

```python
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'list_orders': ReplicationStream(
            object='public.orders',
            mode=Mode.FULL_REFRESH
        ),
        'order_details': ReplicationStream(
            object='public.order_details',
            mode=Mode.FULL_REFRESH
        )
    },
    env={'SLING_THREADS': '2'}
)

replication.run()
```

{% endcode %}

</details>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/examples/api-to-database.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
