File to Database
Examples of using Sling to load data from storage systems to databases
We first need to make sure our connections are available in our environment. See Environment, Storage Connections and Database Connections for more details.
export MY_TARGET_DB='...'
export SLING_SAMPLE_SIZE=2000 # increase the sample size to infer types. Default is 900.
export SLING_THREADS=3 # run streams concurrently
$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+---------------+------------------+-----------------+
| MY_S3_BUCKET | FileSys - S3 | sling env yaml |
| MY_TARGET_DB | DB - PostgreSQL | env variable |
| MY_GS_BUCKET | FileSys - Google | sling env yaml |
| MY_AZURE_CONT | FileSys - Azure | sling env yaml |
+---------------+------------------+-----------------+# using windows Powershell
$env:MY_TARGET_DB = '...'
$env:SLING_SAMPLE_SIZE = 2000 # increase the sample size to infer types. Default is 900.
$env:SLING_THREADS = 3 # run streams concurrently
$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+---------------+------------------+-----------------+
| MY_S3_BUCKET | FileSys - S3 | sling env yaml |
| MY_TARGET_DB | DB - PostgreSQL | env variable |
| MY_GS_BUCKET | FileSys - Google | sling env yaml |
| MY_AZURE_CONT | FileSys - Azure | sling env yaml |
+---------------+------------------+-----------------+Local Storage (CSV) ⇨ Database
Using CLI Flags
$ cat /tmp/my_file.csv | sling run --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh
$ sling run --src-stream 'file:///tmp/my_file.csv' \
--columns '{ "*": string }' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
--columns '{col2: string, col3: string}' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
--transforms '[remove_accents]' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \
--mode full-refresh
$ sling run --src-stream 'file://C:/Temp/my_csv_folder/' \
--transforms '[remove_accents]' \
--tgt-conn MY_TARGET_DB \
--tgt-object 'target_schema.target_table' \Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
streams:
# a stream with many parts, all sub files will be merged into one table
"file:///tmp/my_csv_folder/":
columns:
col2: string # cast `col2` as string
transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.
source_options:
format: csv
# expand all files into individual streams, each file will load into its own table
"file:///tmp/my_csv_folder/*.csv":
object: 'target_schema.{stream_file_name}'
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_csv_folder/prefix_*.csv":
object: 'target_schema.my_new_table'
single: true
"file:///tmp/my_file.csv":
columns:
"*": string # cast all columns to string
# Windows path format
"file://C:/Temp/my_file.csv":
columns:
"*": string # cast all columns to string
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_THREADS: 3 # run streams concurrentlyUsing Python
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
os.environ['SLING_SAMPLE_SIZE'] = '2000' # increase the sample size to infer types
os.environ['SLING_THREADS'] = '3' # run streams concurrently
# Single CSV file import
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={'mode': Mode.FULL_REFRESH},
streams={
'file:///tmp/my_file.csv': ReplicationStream(
object='target_schema.target_table',
columns={'*': 'string'} # cast all columns to string
)
}
)
replication.run()
# Multiple CSV files with various options
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
},
streams={
# A stream with many parts, all sub files will be merged into one table
'file:///tmp/my_csv_folder/': ReplicationStream(
columns={'col2': 'string'}, # cast col2 as string
transforms=['remove_accents'], # Apply transforms
source_options=SourceOptions(format=Format.CSV)
),
# Expand all files into individual streams
'file:///tmp/my_csv_folder/*.csv': ReplicationStream(
object='target_schema.{stream_file_name}'
),
# Consider as a single stream (don't expand)
'file:///tmp/my_csv_folder/prefix_*.csv': ReplicationStream(
object='target_schema.my_new_table',
single=True
),
# Windows path format
'file://C:/Temp/my_file.csv': ReplicationStream(
columns={'*': 'string'}
)
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true', # adds a _sling_stream_url column
'SLING_THREADS': '3'
}
)
replication.run()Local Storage (Excel) ⇨ Database
Using CLI Flags
$ sling run --src-stream 'file:///path/to/test.excel.xlsx' --src-options '{ sheet: "Sheet2!A:F" }' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refreshUsing Replication
Running with Sling: sling run -r /path/to/replication.yaml
source: LOCAL
target: MY_TARGET_DB
defaults:
mode: full-refresh
object: 'target_schema.{stream_file_folder}_{stream_file_name}'
streams:
# expand all files into a stream, each file will load into its own table
"file:///tmp/my_excel_folder/*.xlsx":
object: 'target_schema.{stream_file_name}'
source_options:
sheet: "Sheet1!A:F"
# consider as a single stream (don't expand into individual streams)
"file:///tmp/my_excel_folder/prefix_*.xlsx":
object: 'target_schema.my_new_table'
single: true
source_options:
sheet: "Sheet1!A:F"
"file:///path/to/test.excel.xlsx":
columns:
"*": string # cast all columns to string
source_options:
sheet: "Sheet2!A:F"
# Windows path format
"file://C:/Temp/my_file.xlsx":
columns:
"col2": integer # cast col2 to integer
source_options:
sheet: "Sheet2!A:F"
env:
SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
SLING_ROW_NUM_COLUMN: true # adds a _sling_row_num column with the row numberUsing Python
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os
# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
# Excel file import with sheet specification
replication = Replication(
source='LOCAL',
target='MY_TARGET_DB',
defaults={
'mode': Mode.FULL_REFRESH,
'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
},
streams={
# Expand all files into streams
'file:///tmp/my_excel_folder/*.xlsx': ReplicationStream(
object='target_schema.{stream_file_name}',
source_options=SourceOptions(sheet='Sheet1!A:F')
),
# Consider as a single stream (don't expand)
'file:///tmp/my_excel_folder/prefix_*.xlsx': ReplicationStream(
object='target_schema.my_new_table',
single=True,
source_options=SourceOptions(sheet='Sheet1!A:F')
),
# Single Excel file
'file:///path/to/test.excel.xlsx': ReplicationStream(
columns={'*': 'string'}, # cast all columns to string
source_options=SourceOptions(sheet='Sheet2!A:F')
),
# Windows path format
'file://C:/Temp/my_file.xlsx': ReplicationStream(
columns={'col2': 'integer'}, # cast col2 to integer
source_options=SourceOptions(sheet='Sheet2!A:F')
)
},
env={
'SLING_SAMPLE_SIZE': '2000',
'SLING_STREAM_URL_COLUMN': 'true', # adds a _sling_stream_url column
'SLING_ROW_NUM_COLUMN': 'true' # adds a _sling_row_num column
}
)
replication.run()Local Storage (JSON) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Local Storage (JSON Flattened) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Local Storage (Parquet) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Local Storage (SAS7BDAT) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
SFTP Storage (CSV) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Cloud Storage (CSV) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Cloud Storage (JSON) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Cloud Storage (JSON Flattened) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Cloud Storage (Parquet) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Cloud Storage (Avro) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Cloud Storage (XML) ⇨ Database
Using CLI Flags
Using Replication
Running with Sling: sling run -r /path/to/replication.yaml
Using Python
Last updated
Was this helpful?