File to Database

Examples of using Sling to load data from storage systems to databases

We first need to make sure our connections are available in our environment. See Environmentarrow-up-right, Storage Connections and Database Connections for more details.

export MY_TARGET_DB='...'
export SLING_SAMPLE_SIZE=2000 # increase the sample size to infer types. Default is 900.
export SLING_THREADS=3 # run streams concurrently

$ sling conns list
+---------------+------------------+-----------------+
| CONN NAME     | CONN TYPE        | SOURCE          |
+---------------+------------------+-----------------+
| MY_S3_BUCKET  | FileSys - S3     | sling env yaml  |
| MY_TARGET_DB  | DB - PostgreSQL  | env variable    |
| MY_GS_BUCKET  | FileSys - Google | sling env yaml  |
| MY_AZURE_CONT | FileSys - Azure  | sling env yaml  |
+---------------+------------------+-----------------+
chevron-rightLocal Storage (CSV) ⇨ Databasehashtag

Using CLI Flags

sling.sh
$ cat /tmp/my_file.csv | sling run --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh

$ sling run --src-stream 'file:///tmp/my_file.csv' \
  --columns '{ "*": string }' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
  --columns '{col2: string, col3: string}' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

$ sling run --src-stream 'file:///tmp/my_csv_folder/' \
  --transforms '[remove_accents]' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \
  --mode full-refresh

$ sling run --src-stream 'file://C:/Temp/my_csv_folder/' \
  --transforms '[remove_accents]' \
  --tgt-conn MY_TARGET_DB \
  --tgt-object 'target_schema.target_table' \

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: LOCAL
target: MY_TARGET_DB

defaults:
  mode: full-refresh
  object: 'target_schema.{stream_file_folder}_{stream_file_name}'

streams:
  # a stream with many parts, all sub files will be merged into one table
  "file:///tmp/my_csv_folder/":
    columns:
      col2: string # cast `col2` as string
    transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.
    source_options:
      format: csv

  # expand all files into individual streams, each file will load into its own table
  "file:///tmp/my_csv_folder/*.csv":
    object: 'target_schema.{stream_file_name}'

  # consider as a single stream (don't expand into individual streams)
  "file:///tmp/my_csv_folder/prefix_*.csv":
    object: 'target_schema.my_new_table'
    single: true

  "file:///tmp/my_file.csv":
    columns:
      "*": string # cast all columns to string

  # Windows path format
  "file://C:/Temp/my_file.csv":
    columns:
      "*": string # cast all columns to string

env:
  SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
  SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
  SLING_THREADS: 3 # run streams concurrently

Using Python

replication.py
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os

# Set environment variables
os.environ['MY_TARGET_DB'] = '...'
os.environ['SLING_SAMPLE_SIZE'] = '2000'  # increase the sample size to infer types
os.environ['SLING_THREADS'] = '3'  # run streams concurrently

# Single CSV file import
replication = Replication(
    source='LOCAL',
    target='MY_TARGET_DB',
    defaults={'mode': Mode.FULL_REFRESH},
    streams={
        'file:///tmp/my_file.csv': ReplicationStream(
            object='target_schema.target_table',
            columns={'*': 'string'}  # cast all columns to string
        )
    }
)

replication.run()

# Multiple CSV files with various options
replication = Replication(
    source='LOCAL',
    target='MY_TARGET_DB',
    defaults={
        'mode': Mode.FULL_REFRESH,
        'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
    },
    streams={
        # A stream with many parts, all sub files will be merged into one table
        'file:///tmp/my_csv_folder/': ReplicationStream(
            columns={'col2': 'string'},  # cast col2 as string
            transforms=['remove_accents'],  # Apply transforms
            source_options=SourceOptions(format=Format.CSV)
        ),
        # Expand all files into individual streams
        'file:///tmp/my_csv_folder/*.csv': ReplicationStream(
            object='target_schema.{stream_file_name}'
        ),
        # Consider as a single stream (don't expand)
        'file:///tmp/my_csv_folder/prefix_*.csv': ReplicationStream(
            object='target_schema.my_new_table',
            single=True
        ),
        # Windows path format
        'file://C:/Temp/my_file.csv': ReplicationStream(
            columns={'*': 'string'}
        )
    },
    env={
        'SLING_SAMPLE_SIZE': '2000',
        'SLING_STREAM_URL_COLUMN': 'true',  # adds a _sling_stream_url column
        'SLING_THREADS': '3'
    }
)

replication.run()
chevron-rightLocal Storage (Excel) ⇨ Databasehashtag

Using CLI Flags

sling.sh
$ sling run --src-stream 'file:///path/to/test.excel.xlsx' --src-options '{ sheet: "Sheet2!A:F" }' --tgt-conn MY_TARGET_DB --tgt-object 'target_schema.target_table' --mode full-refresh

Using Replication

Running with Sling: sling run -r /path/to/replication.yaml

replication.yaml
source: LOCAL
target: MY_TARGET_DB

defaults:
  mode: full-refresh
  object: 'target_schema.{stream_file_folder}_{stream_file_name}'

streams:
  # expand all files into a stream, each file will load into its own table
  "file:///tmp/my_excel_folder/*.xlsx":
    object: 'target_schema.{stream_file_name}'
    source_options:
      sheet: "Sheet1!A:F"
  
   # consider as a single stream (don't expand into individual streams)
  "file:///tmp/my_excel_folder/prefix_*.xlsx":
    object: 'target_schema.my_new_table'
    single: true
    source_options:
      sheet: "Sheet1!A:F"

  "file:///path/to/test.excel.xlsx":
    columns:
      "*": string # cast all columns to string
    source_options:
      sheet: "Sheet2!A:F"

  # Windows path format
  "file://C:/Temp/my_file.xlsx":
    columns:
      "col2": integer # cast col2 to integer
    source_options:
      sheet: "Sheet2!A:F"

env:
  SLING_SAMPLE_SIZE: 2000 # increase the sample size to infer types (default=900).
  SLING_STREAM_URL_COLUMN: true # adds a _sling_stream_url column with file path
  SLING_ROW_NUM_COLUMN: true # adds a _sling_row_num column with the row number

Using Python

replication.py
from sling import Replication, ReplicationStream, SourceOptions, Mode, Format
import os

# Set environment variables
os.environ['MY_TARGET_DB'] = '...'

# Excel file import with sheet specification
replication = Replication(
    source='LOCAL',
    target='MY_TARGET_DB',
    defaults={
        'mode': Mode.FULL_REFRESH,
        'object': 'target_schema.{stream_file_folder}_{stream_file_name}'
    },
    streams={
        # Expand all files into streams
        'file:///tmp/my_excel_folder/*.xlsx': ReplicationStream(
            object='target_schema.{stream_file_name}',
            source_options=SourceOptions(sheet='Sheet1!A:F')
        ),
        # Consider as a single stream (don't expand)
        'file:///tmp/my_excel_folder/prefix_*.xlsx': ReplicationStream(
            object='target_schema.my_new_table',
            single=True,
            source_options=SourceOptions(sheet='Sheet1!A:F')
        ),
        # Single Excel file
        'file:///path/to/test.excel.xlsx': ReplicationStream(
            columns={'*': 'string'},  # cast all columns to string
            source_options=SourceOptions(sheet='Sheet2!A:F')
        ),
        # Windows path format
        'file://C:/Temp/my_file.xlsx': ReplicationStream(
            columns={'col2': 'integer'},  # cast col2 to integer
            source_options=SourceOptions(sheet='Sheet2!A:F')
        )
    },
    env={
        'SLING_SAMPLE_SIZE': '2000',
        'SLING_STREAM_URL_COLUMN': 'true',  # adds a _sling_stream_url column
        'SLING_ROW_NUM_COLUMN': 'true'  # adds a _sling_row_num column
    }
)

replication.run()
chevron-rightLocal Storage (JSON) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightLocal Storage (JSON Flattened) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightLocal Storage (Parquet) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightLocal Storage (SAS7BDAT) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightSFTP Storage (CSV) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightCloud Storage (CSV) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightCloud Storage (JSON) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightCloud Storage (JSON Flattened) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightCloud Storage (Parquet) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightCloud Storage (Avro) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

chevron-rightCloud Storage (XML) ⇨ Databasehashtag

Using CLI Flags


Using Replication

Running with Sling: sling run -r /path/to/replication.yaml


Using Python

Last updated

Was this helpful?