API to Database
Examples of using Sling to load data from APIs to databases using API specifications
Sling can extract data from REST and GraphQL APIs using YAML specification files. These specifications define how to authenticate, paginate, and extract data from API endpoints.
See API Spec for detailed information about building API specifications.
Basic REST API with Offset Pagination
This example shows a simple API with offset-based pagination.
Learn more: Pagination • Response Processing
Spec File (users_api.yaml)
name: "Users API"
defaults:
state:
base_url: https://api.example.com
request:
headers:
Accept: "application/json"
endpoints:
users:
state:
limit: 100
offset: 0 # Start at the beginning
request:
url: '{state.base_url}/users'
method: GET
parameters:
limit: '{state.limit}'
offset: '{state.offset}' # Pass offset as query parameter
pagination:
next_state:
# Increment offset by limit for next page
offset: '{state.offset + state.limit}'
# Stop when fewer records returned than requested
stop_condition: length(response.records) < state.limit
response:
records:
# Extract records array from response using JMESPath
jmespath: "data[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
users:
object: public.usersUsing Python
from sling import Replication, ReplicationStream, Mode
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'users': ReplicationStream(
object='public.users',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Cursor-Based Pagination API
This example demonstrates cursor-based pagination similar to Stripe's API pattern.
Learn more: Pagination • Authentication
Spec File (transactions_api.yaml)
name: "Transactions API"
defaults:
state:
base_url: https://api.example.com/v1
request:
headers:
Accept: "application/json"
# Bearer token from secrets defined in env.yaml
Authorization: "Bearer {secrets.api_key}"
endpoints:
transactions:
state:
limit: 100
starting_after: null # First page has no cursor
request:
url: '{state.base_url}/transactions'
method: GET
parameters:
limit: '{state.limit}'
starting_after: '{state.starting_after}'
pagination:
next_state:
# Use ID of last record as cursor for next page
starting_after: '{response.records[-1].id}'
# Stop when API indicates no more pages or no records returned
stop_condition: response.json.has_more == false || length(response.records) < 1
response:
records:
jmespath: "data[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
transactions:
object: public.transactionsUsing Python
from sling import Replication, ReplicationStream, Mode
import os
# Set API credentials
os.environ['API_KEY'] = 'your_api_key'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'transactions': ReplicationStream(
object='public.transactions',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Incremental Sync with Timestamps
This example shows how to fetch only new or updated records using timestamp-based incremental synchronization.
Learn more: Incremental Sync • Processors
Spec File (orders_api.yaml)
name: "Orders API"
defaults:
state:
base_url: https://api.example.com
request:
headers:
Accept: "application/json"
Authorization: "Bearer {secrets.api_token}"
endpoints:
orders:
state:
# Use last sync time if available, otherwise default to 30 days ago
updated_at_min: '{coalesce(sync.last_updated_at, date_format(date_add(now(), -30, "day"), "%Y-%m-%dT%H:%M:%S%z"))}'
current_max_updated_at: '{state.updated_at_min}'
limit: 100
offset: 0
# Persist last_updated_at for next run
sync: [last_updated_at]
request:
url: '{state.base_url}/orders'
method: GET
parameters:
# Filter API to only return records updated after this timestamp
updated_at_min: '{state.updated_at_min}'
limit: '{state.limit}'
offset: '{state.offset}'
pagination:
next_state:
offset: '{state.offset + state.limit}'
stop_condition: length(response.records) < state.limit
response:
records:
jmespath: "orders[]"
processors:
# Track the maximum updated_at timestamp across all records
- expression: "record.updated_at"
output: "state.last_updated_at"
aggregation: "maximum" # Save highest timestamp for next syncUsing Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: incremental
primary_key: [id]
update_key: updated_at
streams:
orders:
object: public.orders
env:
SLING_STATE: postgres/sling_state.my_api # one state table per replication to persistUsing Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['API_TOKEN'] = 'your_api_token'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'orders': ReplicationStream(
object='public.orders',
mode=Mode.INCREMENTAL,
primary_key=['id'],
update_key='updated_at'
)
},
env={'SLING_STATE': 'postgres/sling_state.my_api'}
)
replication.run()Nested Data Extraction with JMESPath
This example demonstrates extracting nested data from complex API responses.
Learn more: Response Processing • JMESPath Expressions
Spec File (products_api.yaml)
name: "Products API"
defaults:
state:
base_url: https://api.store.com/v2
request:
headers:
Accept: "application/json"
X-API-Key: '{secrets.api_key}'
endpoints:
products:
state:
page: 1
per_page: 50
request:
url: '{state.base_url}/products'
method: GET
parameters:
page: '{state.page}'
per_page: '{state.per_page}'
pagination:
next_state:
page: '{state.page + 1}'
stop_condition: length(response.records) < state.per_page
response:
records:
# JMESPath projection to flatten and reshape nested data
# Maps: product_id -> id, product_name -> name, pricing.amount -> price, etc.
jmespath: >
result.items[].{
id: product_id,
name: product_name,
price: pricing.amount,
currency: pricing.currency,
category: metadata.category,
tags: tags[].name
}Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
products:
object: public.productsUsing Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['API_KEY'] = 'your_api_key'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'products': ReplicationStream(
object='public.products',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()POST Request with JSON Payload
This example shows how to send POST requests with a JSON body to an API.
Learn more: Request Configuration • Dynamic Values
Spec File (analytics_api.yaml)
name: "Analytics API"
defaults:
state:
base_url: https://analytics.example.com/api
request:
headers:
Content-Type: "application/json"
Authorization: "Bearer {secrets.access_token}"
endpoints:
events:
state:
# Use env vars if available, otherwise default to last 7 days
start_date: '{coalesce(env.START_DATE, date_format(date_add(now(), -7, "day"), "%Y-%m-%d"))}'
end_date: '{coalesce(env.END_DATE, date_format(now(), "%Y-%m-%d"))}'
request:
url: '{state.base_url}/events/query'
method: POST
payload:
# Request body sent as JSON
date_range:
start: '{state.start_date}'
end: '{state.end_date}'
metrics: ["views", "clicks", "conversions"]
dimensions: ["event_name", "user_id"]
response:
records:
jmespath: "data.events[]"Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: MY_API
target: MY_TARGET_DB
defaults:
mode: full-refresh
streams:
events:
object: analytics.events
env:
START_DATE: ${START_DATE} # passed as env var
END_DATE: ${END_DATE} # passed as env varUsing Python
from sling import Replication, ReplicationStream, Mode
import os
os.environ['ACCESS_TOKEN'] = 'your_access_token'
os.environ['START_DATE'] = '2024-01-01'
os.environ['END_DATE'] = '2024-01-31'
replication = Replication(
source='MY_API',
target='MY_TARGET_DB',
streams={
'events': ReplicationStream(
object='analytics.events',
mode=Mode.FULL_REFRESH
)
}
)
replication.run()Dynamic URL Path Parameters
This example demonstrates building URLs with dynamic path parameters.
Learn more: Request Configuration • State Variables
Spec File (customer_details_api.yaml)
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
GraphQL API
This example shows how to query a GraphQL API.
Learn more: Request Configuration • Response Processing
Spec File (github_graphql_api.yaml)
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Multi-Endpoint Queue Pattern
This example demonstrates passing data between endpoints using queues.
Learn more: Queues • Iteration • Processors
Spec File (ecommerce_api.yaml)
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Last updated
Was this helpful?