API to Database

Examples of using Sling to load data from APIs to databases using API specifications

Sling can extract data from REST and GraphQL APIs using YAML specification files. These specifications define how to authenticate, paginate, and extract data from API endpoints.

See API Spec for detailed information about building API specifications.

Basic REST API with Offset Pagination

This example shows a simple API with offset-based pagination.

Learn more: PaginationResponse Processing

Spec File (users_api.yaml)

users_api.yaml
name: "Users API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Accept: "application/json"

endpoints:
  users:
    state:
      limit: 100
      offset: 0  # Start at the beginning

    request:
      url: '{state.base_url}/users'
      method: GET
      parameters:
        limit: '{state.limit}'
        offset: '{state.offset}'  # Pass offset as query parameter

    pagination:
      next_state:
        # Increment offset by limit for next page
        offset: '{state.offset + state.limit}'
      # Stop when fewer records returned than requested
      stop_condition: length(response.records) < state.limit

    response:
      records:
        # Extract records array from response using JMESPath
        jmespath: "data[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  users:
    object: public.users

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'users': ReplicationStream(
            object='public.users',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Cursor-Based Pagination API

This example demonstrates cursor-based pagination similar to Stripe's API pattern.

Learn more: PaginationAuthentication

Spec File (transactions_api.yaml)

transactions_api.yaml
name: "Transactions API"

defaults:
  state:
    base_url: https://api.example.com/v1
  request:
    headers:
      Accept: "application/json"
      # Bearer token from secrets defined in env.yaml
      Authorization: "Bearer {secrets.api_key}"

endpoints:
  transactions:
    state:
      limit: 100
      starting_after: null  # First page has no cursor

    request:
      url: '{state.base_url}/transactions'
      method: GET
      parameters:
        limit: '{state.limit}'
        starting_after: '{state.starting_after}'

    pagination:
      next_state:
        # Use ID of last record as cursor for next page
        starting_after: '{response.records[-1].id}'
      # Stop when API indicates no more pages or no records returned
      stop_condition: response.json.has_more == false || length(response.records) < 1

    response:
      records:
        jmespath: "data[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  transactions:
    object: public.transactions

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

# Set API credentials
os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'transactions': ReplicationStream(
            object='public.transactions',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Incremental Sync with Timestamps

This example shows how to fetch only new or updated records using timestamp-based incremental synchronization.

Learn more: Incremental SyncProcessors

Spec File (orders_api.yaml)

orders_api.yaml
name: "Orders API"

defaults:
  state:
    base_url: https://api.example.com
  request:
    headers:
      Accept: "application/json"
      Authorization: "Bearer {secrets.api_token}"

endpoints:
  orders:
    state:
      # Use last sync time if available, otherwise default to 30 days ago
      updated_at_min: '{coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z"))}'
      current_max_updated_at: '{state.updated_at_min}'
      limit: 100
      offset: 0

    # Persist last_updated_at for next run
    sync: [last_updated_at]

    request:
      url: '{state.base_url}/orders'
      method: GET
      parameters:
        # Filter API to only return records updated after this timestamp
        updated_at_min: '{state.updated_at_min}'
        limit: '{state.limit}'
        offset: '{state.offset}'

    pagination:
      next_state:
        offset: '{state.offset + state.limit}'
      stop_condition: length(response.records) < state.limit

    response:
      records:
        jmespath: "orders[]"
      processors:
        # Track the maximum updated_at timestamp across all records
        - expression: "record.updated_at"
          output: "state.last_updated_at"
          aggregation: "maximum"  # Save highest timestamp for next sync

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: incremental
  primary_key: [id]
  update_key: updated_at

streams:
  orders:
    object: public.orders

env:
  SLING_STATE: postgres/sling_state.my_api # one state table per replication to persist

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_TOKEN'] = 'your_api_token'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'orders': ReplicationStream(
            object='public.orders',
            mode=Mode.INCREMENTAL,
            primary_key=['id'],
            update_key='updated_at'
        )
    },
    env={'SLING_STATE': 'postgres/sling_state.my_api'}
)

replication.run()
Nested Data Extraction with JMESPath

This example demonstrates extracting nested data from complex API responses.

Learn more: Response ProcessingJMESPath Expressions

Spec File (products_api.yaml)

products_api.yaml
name: "Products API"

defaults:
  state:
    base_url: https://api.store.com/v2
  request:
    headers:
      Accept: "application/json"
      X-API-Key: '{secrets.api_key}'

endpoints:
  products:
    state:
      page: 1
      per_page: 50

    request:
      url: '{state.base_url}/products'
      method: GET
      parameters:
        page: '{state.page}'
        per_page: '{state.per_page}'

    pagination:
      next_state:
        page: '{state.page + 1}'
      stop_condition: length(response.records) < state.per_page

    response:
      records:
        # JMESPath projection to flatten and reshape nested data
        # Maps: product_id -> id, product_name -> name, pricing.amount -> price, etc.
        jmespath: >
          result.items[].{
            id: product_id,
            name: product_name,
            price: pricing.amount,
            currency: pricing.currency,
            category: metadata.category,
            tags: tags[].name
          }

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  products:
    object: public.products

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['API_KEY'] = 'your_api_key'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'products': ReplicationStream(
            object='public.products',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
POST Request with JSON Payload

This example shows how to send POST requests with a JSON body to an API.

Learn more: Request ConfigurationDynamic Values

Spec File (analytics_api.yaml)

analytics_api.yaml
name: "Analytics API"

defaults:
  state:
    base_url: https://analytics.example.com/api
  request:
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer {secrets.access_token}"

endpoints:
  events:
    state:
      # Use env vars if available, otherwise default to last 7 days
      start_date: '{coalesce(env.START_DATE, date_format(date_add(now(), -7, "day"), "%Y-%m-%d"))}'
      end_date: '{coalesce(env.END_DATE, date_format(now(), "%Y-%m-%d"))}'

    request:
      url: '{state.base_url}/events/query'
      method: POST
      payload:
        # Request body sent as JSON
        date_range:
          start: '{state.start_date}'
          end: '{state.end_date}'
        metrics: ["views", "clicks", "conversions"]
        dimensions: ["event_name", "user_id"]

    response:
      records:
        jmespath: "data.events[]"

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: MY_API
target: MY_TARGET_DB

defaults:
  mode: full-refresh

streams:
  events:
    object: analytics.events

env:
  START_DATE: ${START_DATE}  # passed as env var
  END_DATE: ${END_DATE}      # passed as env var

Using Python

api_to_database.py
from sling import Replication, ReplicationStream, Mode
import os

os.environ['ACCESS_TOKEN'] = 'your_access_token'
os.environ['START_DATE'] = '2024-01-01'
os.environ['END_DATE'] = '2024-01-31'

replication = Replication(
    source='MY_API',
    target='MY_TARGET_DB',
    streams={
        'events': ReplicationStream(
            object='analytics.events',
            mode=Mode.FULL_REFRESH
        )
    }
)

replication.run()
Dynamic URL Path Parameters

This example demonstrates building URLs with dynamic path parameters.

Learn more: Request ConfigurationState Variables

Spec File (customer_details_api.yaml)


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

GraphQL API

This example shows how to query a GraphQL API.

Learn more: Request ConfigurationResponse Processing

Spec File (github_graphql_api.yaml)


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Multi-Endpoint Queue Pattern

This example demonstrates passing data between endpoints using queues.

Learn more: QueuesIterationProcessors

Spec File (ecommerce_api.yaml)


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Last updated

Was this helpful?