API to Database

Examples of using Sling to load data from APIs to databases using API specifications

Sling can extract data from REST and GraphQL APIs using YAML specification files. These specifications define how to authenticate, paginate, and extract data from API endpoints.

See API Spec for detailed information about building API specifications.

Basic REST API with Offset Pagination

This example shows a simple API with offset-based pagination.

Learn more: PaginationResponse Processing

Spec File (users_api.yaml)

users_api.yaml
name: "Users API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Accept: "application/json"

endpoints:
  users:
    state:
      limit: 100
      offset: 0  # Start at the beginning

    request:
      url: '{state.base_url}/users'
      method: GET
      parameters:
        limit: '{state.limit}'
        offset: '{state.offset}'  # Pass offset as query parameter

    pagination:
      next_state:
        # Increment offset by limit for next page
        offset: '{state.offset + state.limit}'
      # Stop when fewer records returned than requested
      stop_condition: length(response.records) < state.limit

    response:
      records:
        # Extract records array from response using JMESPath
        jmespath: "data[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  users:
    object: public.users

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'users': ReplicationStream(
            object='public.users',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Cursor-Based Pagination API

This example demonstrates cursor-based pagination similar to Stripe's API pattern.

Learn more: PaginationAuthentication

Spec File (transactions_api.yaml)

transactions_api.yaml
name: "Transactions API"

defaults:
  state:
    base_url: https://api.example.com/v1
  request:
    headers:
      Accept: "application/json"
      # Bearer token from secrets defined in env.yaml
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  transactions:
    state:
      limit: 100
      starting_after: null  # First page has no cursor

    request:
      url: '{state.base_url}/transactions'
      method: GET
      parameters:
        limit: '{state.limit}'
        starting_after: '{state.starting_after}'

    pagination:
      next_state:
        # Use ID of last record as cursor for next page
        starting_after: '{response.records[-1].id}'
      # Stop when API indicates no more pages or no records returned
      stop_condition: response.json.has_more == false || length(response.records) < 1

    response:
      records:
        jmespath: "data[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  transactions:
    object: public.transactions

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

# Set API credentials
os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'transactions': ReplicationStream(
            object='public.transactions',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Incremental Sync with Timestamps

This example shows how to fetch only new or updated records using timestamp-based incremental synchronization.

Learn more: Incremental SyncProcessors

Spec File (orders_api.yaml)

orders_api.yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_token}"

endpoints:
  orders:
    state:
      # Use last sync time if available, otherwise default to 30 days ago
      updated_at_min: '{coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z"))}'
      current_max_updated_at: '{state.updated_at_min}'
      limit: 100
      offset: 0

    # Persist last_updated_at for next run
    sync: [last_updated_at]

    request:
      url: '{state.base_url}/orders'
      method: GET
      parameters:
        # Filter API to only return records updated after this timestamp
        updated_at_min: '{state.updated_at_min}'
        limit: '{state.limit}'
        offset: '{state.offset}'

    pagination:
      next_state:
        offset: '{state.offset + state.limit}'
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
      processors:
        # Track the maximum updated_at timestamp across all records
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"  # Save highest timestamp for next sync

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  orders:
    object: public.orders

env:
  SLING_STATE: postgres/sling_state.my_api # one state table per replication to persist

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_TOKEN'] = 'your_api_token'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='public.orders',
            mode=Mode.INCREMENTAL,
            primary_key=['id'],
            update_key='updated_at'
        )
    },
    env={'SLING_STATE': 'postgres/sling_state.my_api'}
)

replication.run()
Nested Data Extraction with JMESPath

This example demonstrates extracting nested data from complex API responses.

Learn more: Response ProcessingJMESPath Expressions

Spec File (products_api.yaml)

products_api.yaml
name: "Products API"

defaults:
  state:
    base_url: https://api.store.com/v2
  request:
    headers:
      Accept: "application/json"
      X-API-Key: '{secrets.api_key}'

endpoints:
  products:
    state:
      page: 1
      per_page: 50

    request:
      url: '{state.base_url}/products'
      method: GET
      parameters:
        page: '{state.page}'
        per_page: '{state.per_page}'

    pagination:
      next_state:
        page: '{state.page + 1}'
      stop_condition: length(response.records) < state.per_page

    response:
      records:
        # JMESPath projection to flatten and reshape nested data
        # Maps: product_id -> id, product_name -> name, pricing.amount -> price, etc.
        jmespath: >
          result.items[].{
            id: product_id,
            name: product_name,
            price: pricing.amount,
            currency: pricing.currency,
            category: metadata.category,
            tags: tags[].name
          }

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  products:
    object: public.products

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'products': ReplicationStream(
            object='public.products',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
POST Request with JSON Payload

This example shows how to send POST requests with a JSON body to an API.

Learn more: Request ConfigurationDynamic Values

Spec File (analytics_api.yaml)

analytics_api.yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://analytics.example.com/api
  request:
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer {secrets.access_token}"

endpoints:
  events:
    state:
      # Use env vars if available, otherwise default to last 7 days
      start_date: '{coalesce(env.START_DATE, date_format(date_add(now(), -7, "day"), "%Y-%m-%d"))}'
      end_date: '{coalesce(env.END_DATE, date_format(now(), "%Y-%m-%d"))}'

    request:
      url: '{state.base_url}/events/query'
      method: POST
      payload:
        # Request body sent as JSON
        date_range:
          start: '{state.start_date}'
          end: '{state.end_date}'
        metrics: ["views", "clicks", "conversions"]
        dimensions: ["event_name", "user_id"]

    response:
      records:
        jmespath: "data.events[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  events:
    object: analytics.events

env:
  START_DATE: ${START_DATE}  # passed as env var
  END_DATE: ${END_DATE}      # passed as env var

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['ACCESS_TOKEN'] = 'your_access_token'
os.environ['START_DATE'] = '2024-01-01'
os.environ['END_DATE'] = '2024-01-31'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'events': ReplicationStream(
            object='analytics.events',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Dynamic URL Path Parameters

This example demonstrates building URLs with dynamic path parameters.

Learn more: Request ConfigurationState Variables

Spec File (customer_details_api.yaml)

customer_details_api.yaml
name: "Customer Details API"

defaults:
  state:
    base_url: https://api.crm.com/v1
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  customer_profile:
    state:
      # require() will fail if env var is not set
      customer_id: '{require(env.CUSTOMER_ID)}'

    request:
      # Use state variable as part of URL path
      url: '{state.base_url}/customers/{state.customer_id}/profile'
      method: GET

    response:
      records:
        # Wrap single object in array
        jmespath: "[customer]"
      processors:
        # Add customer_id to the record for reference
        - expression: "state.customer_id"
          output: "record.customer_id"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  customer_profile:
    object: public.customer_profiles

env:
  CUSTOMER_ID: 'cust_12345'

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'
os.environ['CUSTOMER_ID'] = 'cust_12345'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'customer_profile': ReplicationStream(
            object='public.customer_profiles',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
GraphQL API

This example shows how to query a GraphQL API.

Learn more: Request ConfigurationResponse Processing

Spec File (github_graphql_api.yaml)

github_graphql_api.yaml
name: "GitHub GraphQL API"

defaults:
  state:
    base_url: https://api.github.com
  request:
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer {secrets.github_token}"

endpoints:
  repositories:
    state:
      cursor: null  # First page has no cursor
      org_name: '{require(env.GITHUB_ORG)}'

    request:
      url: '{state.base_url}/graphql'
      method: POST
      payload:
        # GraphQL query as a string
        query: |
          query($org: String!, $cursor: String) {
            organization(login: $org) {
              repositories(first: 50, after: $cursor) {
                pageInfo {
                  hasNextPage
                  endCursor
                }
                nodes {
                  name
                  description
                  stargazerCount
                  forkCount
                  createdAt
                  updatedAt
                }
              }
            }
          }
        # GraphQL variables from state
        variables:
          org: '{state.org_name}'
          cursor: '{state.cursor}'

    pagination:
      next_state:
        # Extract next cursor from GraphQL response
        cursor: '{jmespath("response.json", "data.organization.repositories.pageInfo.endCursor")}'
      # GraphQL provides hasNextPage in pageInfo
      stop_condition: 'jmespath("response.json", "data.organization.repositories.pageInfo.hasNextPage") == false'

    response:
      records:
        # Extract nodes array from GraphQL response
        jmespath: "data.organization.repositories.nodes[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  repositories:
    object: public.github_repositories

env:
  GITHUB_ORG: 'your_organization'

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['GITHUB_TOKEN'] = 'your_github_token'
os.environ['GITHUB_ORG'] = 'your_organization'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'repositories': ReplicationStream(
            object='public.github_repositories',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Multi-Endpoint Queue Pattern

This example demonstrates passing data between endpoints using queues.

Learn more: QueuesIterationProcessors

Spec File (ecommerce_api.yaml)

ecommerce_api.yaml
name: "E-commerce API"

# Declare queues to pass data between endpoints
queues:
  - order_ids

defaults:
  state:
    base_url: https://api.shop.com/v1
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  # Step 1: List all orders and collect their IDs
  list_orders:
    state:
      page: 1
      limit: 100

    request:
      url: '{state.base_url}/orders'
      method: GET
      parameters:
        page: '{state.page}'
        limit: '{state.limit}'

    pagination:
      next_state:
        page: '{state.page + 1}'
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
      processors:
        # Send each order ID to the queue for the next endpoint
        - expression: "record.id"
          output: "queue.order_ids"

  # Step 2: Fetch details for each order from the queue
  order_details:
    description: "Fetch detailed information for each order"

    iterate:
      # Process each order ID from the queue
      over: "queue.order_ids"
      into: "state.order_id"
      concurrency: 10  # Process 10 orders concurrently

    request:
      # Use the current order ID from iteration in URL
      url: '{state.base_url}/orders/{state.order_id}/details'
      method: GET

    response:
      records:
        jmespath: "[order_detail]"
      processors:
        # Add the order_id to the detail record for reference
        - expression: "state.order_id"
          output: "record.order_id"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  list_orders:
    object: public.orders

  order_details:
    object: public.order_details

env:
  SLING_THREADS: 2

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'list_orders': ReplicationStream(
            object='public.orders',
            mode=Mode.FULL_REFRESH
        ),
        'order_details': ReplicationStream(
            object='public.order_details',
            mode=Mode.FULL_REFRESH
        )
    },
    env={'SLING_THREADS': '2'}
)

replication.run()

Last updated

Was this helpful?