File to Database

Examples of using Sling to load data from storage systems to databases

We first need to make sure our connections are available in our environment. See Environment, Storage Connections and Database Connections for more details.

export MY_TARGET_DB='...'
export SLING_SAMPLE_SIZE=2000 # increase the sample size to infer types. Default is 900.
export SLING_THREADS=3 # run streams concurrently

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_S3_BUCKET  | FileSys - S3     | sling env yaml  |
| MY_TARGET_DB  | DB - PostgreSQL  | env variable    |
| MY_GS_BUCKET  | FileSys - Google | sling env yaml  |
| MY_AZURE_CONT | FileSys - Azure  | sling env yaml  |
+---------------+------------------+-----------------+
Local Storage (CSV) ⇨ Database

Using CLI Flags

sling.sh
$ cat /tmp/my_file.csv | sling run --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh

$ sling run --src-stream 'file:///tmp/my_file.csv' \
  --columns '{ "*": string }' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
  --columns '{col2: string, col3: string}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
  --transforms '[remove_accents]' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

$ sling run --src-stream 'file://C:/Temp/my_csv_folder/' \
  --transforms '[remove_accents]' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: LOCAL
target: MY_TARGET_DB

defaults:
  mode: full-refresh
  object: 'target_schema.{stream_file_folder}_{stream_file_name}'

streams:
  # a stream with many parts, all sub files will be merged into one table
  "file:///tmp/my_csv_folder/":
    columns:
      col2: string # cast `col2` as string
    transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.
    source_options:
      format: csv

  # expand all files into individual streams, each file will load into its own table
  "file:///tmp/my_csv_folder/*.csv":
    object: 'target_schema.{stream_file_name}'

  # consider as a single stream (don't expand into individual streams)
  "file:///tmp/my_csv_folder/prefix_*.csv":
    object: 'target_schema.my_new_table'
    single: true

  "file:///tmp/my_file.csv":
    columns:
      "*": string # cast all columns to string

  # Windows path format
  "file://C:/Temp/my_file.csv":
    columns:
      "*": string # cast all columns to string

env:
  SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
  SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
  SLING_THREADS: 3 # run streams concurrently

Using Python

replication.py
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os

# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
os.environ['SLING_SAMPLE_SIZE'] = '2000'  # increase the sample size to infer types
os.environ['SLING_THREADS'] = '3'  # run streams concurrently

# Single CSV file import
replication = Replication(
    source='LOCAL',
    target='MY_TARGET_DB',
    defaults={'mode': Mode.FULL_REFRESH},
    streams={
        'file:///tmp/my_file.csv': ReplicationStream(
            object='target_schema.target_table',
            columns={'*': 'string'}  # cast all columns to string
        )
    }
)

replication.run()

# Multiple CSV files with various options
replication = Replication(
    source='LOCAL',
    target='MY_TARGET_DB',
    defaults={
        'mode': Mode.FULL_REFRESH,
        'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
    },
    streams={
        # A stream with many parts, all sub files will be merged into one table
        'file:///tmp/my_csv_folder/': ReplicationStream(
            columns={'col2': 'string'},  # cast col2 as string
            transforms=['remove_accents'],  # Apply transforms
            source_options=SourceOptions(format=Format.CSV)
        ),
        # Expand all files into individual streams
        'file:///tmp/my_csv_folder/*.csv': ReplicationStream(
            object='target_schema.{stream_file_name}'
        ),
        # Consider as a single stream (don't expand)
        'file:///tmp/my_csv_folder/prefix_*.csv': ReplicationStream(
            object='target_schema.my_new_table',
            single=True
        ),
        # Windows path format
        'file://C:/Temp/my_file.csv': ReplicationStream(
            columns={'*': 'string'}
        )
    },
    env={
        'SLING_SAMPLE_SIZE': '2000',
        'SLING_STREAM_URL_COLUMN': 'true',  # adds a _sling_stream_url column
        'SLING_THREADS': '3'
    }
)

replication.run()
Local Storage (Excel) ⇨ Database

Using CLI Flags

sling.sh
$ sling run --src-stream 'file:///path/to/test.excel.xlsx' --src-options '{ sheet: "Sheet2!A:F" }' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: LOCAL
target: MY_TARGET_DB

defaults:
  mode: full-refresh
  object: 'target_schema.{stream_file_folder}_{stream_file_name}'

streams:
  # expand all files into a stream, each file will load into its own table
  "file:///tmp/my_excel_folder/*.xlsx":
    object: 'target_schema.{stream_file_name}'
    source_options:
      sheet: "Sheet1!A:F"
  
   # consider as a single stream (don't expand into individual streams)
  "file:///tmp/my_excel_folder/prefix_*.xlsx":
    object: 'target_schema.my_new_table'
    single: true
    source_options:
      sheet: "Sheet1!A:F"

  "file:///path/to/test.excel.xlsx":
    columns:
      "*": string # cast all columns to string
    source_options:
      sheet: "Sheet2!A:F"

  # Windows path format
  "file://C:/Temp/my_file.xlsx":
    columns:
      "col2": integer # cast col2 to integer
    source_options:
      sheet: "Sheet2!A:F"

env:
  SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
  SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
  SLING_ROW_NUM_COLUMN: true # adds a _sling_row_num column with the row number

Using Python

replication.py
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os

# Set environment variables
os.environ['MY_TARGET_DB'] = '...'

# Excel file import with sheet specification
replication = Replication(
    source='LOCAL',
    target='MY_TARGET_DB',
    defaults={
        'mode': Mode.FULL_REFRESH,
        'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
    },
    streams={
        # Expand all files into streams
        'file:///tmp/my_excel_folder/*.xlsx': ReplicationStream(
            object='target_schema.{stream_file_name}',
            source_options=SourceOptions(sheet='Sheet1!A:F')
        ),
        # Consider as a single stream (don't expand)
        'file:///tmp/my_excel_folder/prefix_*.xlsx': ReplicationStream(
            object='target_schema.my_new_table',
            single=True,
            source_options=SourceOptions(sheet='Sheet1!A:F')
        ),
        # Single Excel file
        'file:///path/to/test.excel.xlsx': ReplicationStream(
            columns={'*': 'string'},  # cast all columns to string
            source_options=SourceOptions(sheet='Sheet2!A:F')
        ),
        # Windows path format
        'file://C:/Temp/my_file.xlsx': ReplicationStream(
            columns={'col2': 'integer'},  # cast col2 to integer
            source_options=SourceOptions(sheet='Sheet2!A:F')
        )
    },
    env={
        'SLING_SAMPLE_SIZE': '2000',
        'SLING_STREAM_URL_COLUMN': 'true',  # adds a _sling_stream_url column
        'SLING_ROW_NUM_COLUMN': 'true'  # adds a _sling_row_num column
    }
)

replication.run()
Local Storage (JSON) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Local Storage (JSON Flattened) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Local Storage (Parquet) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Local Storage (SAS7BDAT) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

SFTP Storage (CSV) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Cloud Storage (CSV) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Cloud Storage (JSON) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Cloud Storage (JSON Flattened) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Cloud Storage (Parquet) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Cloud Storage (Avro) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Cloud Storage (XML) ⇨ Database

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Last updated

Was this helpful?