API to Database
Examples of using Sling to load data from APIs to databases using API specifications
Sling can extract data from REST and GraphQL APIs using YAML specification files. These specifications define how to authenticate, paginate, and extract data from API endpoints.
See API Spec for detailed information about building API specifications.
Basic REST API with Offset Pagination
This example shows a simple API with offset-based pagination.
Learn more: Pagination • Response Processing
Spec File (users_api.yaml)
name: "Users API"
defaults:
state:
base_url: https://api.example.com
request:
headers:
Accept: "application/json"
endpoints:
users:
state:
limit: 100
offset: 0 # Start at the beginning
request:
url: '{state.base_url}/users'
method: GET
parameters:
limit: '{state.limit}'
offset: '{state.offset}' # Pass offset as query parameter
pagination:
next_state:
# Increment offset by limit for next page
offset: '{state.offset + state.limit}'
# Stop when fewer records returned than requested
stop_condition: length(response.records) < state.limit
response:
records:
# Extract records array from response using JMESPath
jmespath: "data[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
users:
object: public.usersUsing Python
from sling import Replication, ReplicationStream, Mode
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'users': ReplicationStream(
object='public.users',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Cursor-Based Pagination API
This example demonstrates cursor-based pagination similar to Stripe's API pattern.
Learn more: Pagination • Authentication
Spec File (transactions_api.yaml)
name: "Transactions API"
defaults:
state:
base_url: https://api.example.com/v1
request:
headers:
Accept: "application/json"
# Bearer token from secrets defined in env.yaml
Authorization: "Bearer {secrets.api_key}"
endpoints:
transactions:
state:
limit: 100
starting_after: null # First page has no cursor
request:
url: '{state.base_url}/transactions'
method: GET
parameters:
limit: '{state.limit}'
starting_after: '{state.starting_after}'
pagination:
next_state:
# Use ID of last record as cursor for next page
starting_after: '{response.records[-1].id}'
# Stop when API indicates no more pages or no records returned
stop_condition: response.json.has_more == false || length(response.records) < 1
response:
records:
jmespath: "data[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
transactions:
object: public.transactionsUsing Python
from sling import Replication, ReplicationStream, Mode
import os
# Set API credentials
os.environ['API_KEY'] = 'your_api_key'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'transactions': ReplicationStream(
object='public.transactions',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Incremental Sync with Timestamps
This example shows how to fetch only new or updated records using timestamp-based incremental synchronization.
Learn more: Incremental Sync • Processors
Spec File (orders_api.yaml)
name: "Orders API"
defaults:
state:
base_url: https://api.example.com
request:
headers:
Accept: "application/json"
Authorization: "Bearer {secrets.api_token}"
endpoints:
orders:
state:
# Use last sync time if available, otherwise default to 30 days ago
updated_at_min: '{coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z"))}'
current_max_updated_at: '{state.updated_at_min}'
limit: 100
offset: 0
# Persist last_updated_at for next run
sync: [last_updated_at]
request:
url: '{state.base_url}/orders'
method: GET
parameters:
# Filter API to only return records updated after this timestamp
updated_at_min: '{state.updated_at_min}'
limit: '{state.limit}'
offset: '{state.offset}'
pagination:
next_state:
offset: '{state.offset + state.limit}'
stop_condition: length(response.records) < state.limit
response:
records:
jmespath: "orders[]"
processors:
# Track the maximum updated_at timestamp across all records
- expression: "record.updated_at"
output: "state.last_updated_at"
aggregation: "maximum" # Save highest timestamp for next syncUsing Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: incremental
primary_key: [id]
update_key: updated_at
streams:
orders:
object: public.orders
env:
SLING_STATE: postgres/sling_state.my_api # one state table per replication to persistUsing Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['API_TOKEN'] = 'your_api_token'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'orders': ReplicationStream(
object='public.orders',
mode=Mode.INCREMENTAL,
primary_key=['id'],
update_key='updated_at'
)
},
env={'SLING_STATE': 'postgres/sling_state.my_api'}
)
replication.run()Nested Data Extraction with JMESPath
This example demonstrates extracting nested data from complex API responses.
Learn more: Response Processing • JMESPath Expressions
Spec File (products_api.yaml)
name: "Products API"
defaults:
state:
base_url: https://api.store.com/v2
request:
headers:
Accept: "application/json"
X-API-Key: '{secrets.api_key}'
endpoints:
products:
state:
page: 1
per_page: 50
request:
url: '{state.base_url}/products'
method: GET
parameters:
page: '{state.page}'
per_page: '{state.per_page}'
pagination:
next_state:
page: '{state.page + 1}'
stop_condition: length(response.records) < state.per_page
response:
records:
# JMESPath projection to flatten and reshape nested data
# Maps: product_id -> id, product_name -> name, pricing.amount -> price, etc.
jmespath: >
result.items[].{
id: product_id,
name: product_name,
price: pricing.amount,
currency: pricing.currency,
category: metadata.category,
tags: tags[].name
}Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
products:
object: public.productsUsing Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['API_KEY'] = 'your_api_key'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'products': ReplicationStream(
object='public.products',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()POST Request with JSON Payload
This example shows how to send POST requests with a JSON body to an API.
Learn more: Request Configuration • Dynamic Values
Spec File (analytics_api.yaml)
name: "Analytics API"
defaults:
state:
base_url: https://analytics.example.com/api
request:
headers:
Content-Type: "application/json"
Authorization: "Bearer {secrets.access_token}"
endpoints:
events:
state:
# Use env vars if available, otherwise default to last 7 days
start_date: '{coalesce(env.START_DATE, date_format(date_add(now(), -7, "day"), "%Y-%m-%d"))}'
end_date: '{coalesce(env.END_DATE, date_format(now(), "%Y-%m-%d"))}'
request:
url: '{state.base_url}/events/query'
method: POST
payload:
# Request body sent as JSON
date_range:
start: '{state.start_date}'
end: '{state.end_date}'
metrics: ["views", "clicks", "conversions"]
dimensions: ["event_name", "user_id"]
response:
records:
jmespath: "data.events[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
events:
object: analytics.events
env:
START_DATE: ${START_DATE} # passed as env var
END_DATE: ${END_DATE} # passed as env varUsing Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['ACCESS_TOKEN'] = 'your_access_token'
os.environ['START_DATE'] = '2024-01-01'
os.environ['END_DATE'] = '2024-01-31'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'events': ReplicationStream(
object='analytics.events',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Dynamic URL Path Parameters
This example demonstrates building URLs with dynamic path parameters.
Learn more: Request Configuration • State Variables
Spec File (customer_details_api.yaml)
name: "Customer Details API"
defaults:
state:
base_url: https://api.crm.com/v1
request:
headers:
Accept: "application/json"
Authorization: "Bearer {secrets.api_key}"
endpoints:
customer_profile:
state:
# require() will fail if env var is not set
customer_id: '{require(env.CUSTOMER_ID)}'
request:
# Use state variable as part of URL path
url: '{state.base_url}/customers/{state.customer_id}/profile'
method: GET
response:
records:
# Wrap single object in array
jmespath: "[customer]"
processors:
# Add customer_id to the record for reference
- expression: "state.customer_id"
output: "record.customer_id"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
customer_profile:
object: public.customer_profiles
env:
CUSTOMER_ID: 'cust_12345'Using Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['API_KEY'] = 'your_api_key'
os.environ['CUSTOMER_ID'] = 'cust_12345'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'customer_profile': ReplicationStream(
object='public.customer_profiles',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()GraphQL API
This example shows how to query a GraphQL API.
Learn more: Request Configuration • Response Processing
Spec File (github_graphql_api.yaml)
name: "GitHub GraphQL API"
defaults:
state:
base_url: https://api.github.com
request:
headers:
Content-Type: "application/json"
Authorization: "Bearer {secrets.github_token}"
endpoints:
repositories:
state:
cursor: null # First page has no cursor
org_name: '{require(env.GITHUB_ORG)}'
request:
url: '{state.base_url}/graphql'
method: POST
payload:
# GraphQL query as a string
query: |
query($org: String!, $cursor: String) {
organization(login: $org) {
repositories(first: 50, after: $cursor) {
pageInfo {
hasNextPage
endCursor
}
nodes {
name
description
stargazerCount
forkCount
createdAt
updatedAt
}
}
}
}
# GraphQL variables from state
variables:
org: '{state.org_name}'
cursor: '{state.cursor}'
pagination:
next_state:
# Extract next cursor from GraphQL response
cursor: '{jmespath("response.json", "data.organization.repositories.pageInfo.endCursor")}'
# GraphQL provides hasNextPage in pageInfo
stop_condition: 'jmespath("response.json", "data.organization.repositories.pageInfo.hasNextPage") == false'
response:
records:
# Extract nodes array from GraphQL response
jmespath: "data.organization.repositories.nodes[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
repositories:
object: public.github_repositories
env:
GITHUB_ORG: 'your_organization'Using Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['GITHUB_TOKEN'] = 'your_github_token'
os.environ['GITHUB_ORG'] = 'your_organization'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'repositories': ReplicationStream(
object='public.github_repositories',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Multi-Endpoint Queue Pattern
This example demonstrates passing data between endpoints using queues.
Learn more: Queues • Iteration • Processors
Spec File (ecommerce_api.yaml)
name: "E-commerce API"
# Declare queues to pass data between endpoints
queues:
- order_ids
defaults:
state:
base_url: https://api.shop.com/v1
request:
headers:
Accept: "application/json"
Authorization: "Bearer {secrets.api_key}"
endpoints:
# Step 1: List all orders and collect their IDs
list_orders:
state:
page: 1
limit: 100
request:
url: '{state.base_url}/orders'
method: GET
parameters:
page: '{state.page}'
limit: '{state.limit}'
pagination:
next_state:
page: '{state.page + 1}'
stop_condition: length(response.records) < state.limit
response:
records:
jmespath: "orders[]"
processors:
# Send each order ID to the queue for the next endpoint
- expression: "record.id"
output: "queue.order_ids"
# Step 2: Fetch details for each order from the queue
order_details:
description: "Fetch detailed information for each order"
iterate:
# Process each order ID from the queue
over: "queue.order_ids"
into: "state.order_id"
concurrency: 10 # Process 10 orders concurrently
request:
# Use the current order ID from iteration in URL
url: '{state.base_url}/orders/{state.order_id}/details'
method: GET
response:
records:
jmespath: "[order_detail]"
processors:
# Add the order_id to the detail record for reference
- expression: "state.order_id"
output: "record.order_id"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
list_orders:
object: public.orders
order_details:
object: public.order_details
env:
SLING_THREADS: 2Using Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['API_KEY'] = 'your_api_key'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'list_orders': ReplicationStream(
object='public.orders',
mode=Mode.FULL_REFRESH
),
'order_details': ReplicationStream(
object='public.order_details',
mode=Mode.FULL_REFRESH
)
},
env={'SLING_THREADS': '2'}
)
replication.run()Last updated
Was this helpful?