List

List hooks allow you to retrieve file and directory listings from any supported filesystem connection. This is particularly useful for discovering files, validating directory contents, or preparing for batch operations.

Configuration

- type: list
  location: "aws_s3/path/to/directory"  # Required: Location string
  recursive: false      # Optional: List files/folders recursively (default: false)
  only: files | folders  # Optional: List only files or only folders
  into: my_variable    # Optional: Store results in store or env (e.g., "file_list" or "env.FILE_LIST")
  on_failure: abort    # Optional: abort/warn/quiet/skip
  id: my_id           # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

location

Yes

The location string. Contains connection name and path.

recursive

No

Whether to list files recursively in subdirectories (default: false)

only

No

Filter to list only "files" or only "folders"

into

No

Store the result array in the replication store or environment variables. Use store.variable_name or just variable_name for store, or env.VARIABLE_NAME for environment variables

on_failure

No

What to do if the listing fails (abort/warn/quiet/skip)

Output

When the list hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
result:  # Array of file/directory entries
  - name: "file1.txt"  # Name of the file/directory
    path: "path/to/file1.txt"  # Full path
    location: "my_conn/path/to/file1.txt"  # Location string
    uri: "s3://bucket/path/to/file1.txt"  # Full URI
    is_file: true  # Whether entry is a file
    is_dir: false  # Whether entry is a directory
    size: 1024  # Size in bytes
    created_at: "2023-01-01T00:00:00Z"  # Creation timestamp if available
    created_at_unix: 1672531200  # Creation unix timestamp if available
    updated_at: "2023-01-02T00:00:00Z"  # Last modified timestamp if available
    updated_at_unix: 1672617600  # Last modified unix timestamp if available
path: "path/to/directory"  # The listed path
connection: "aws_s3"  # The connection used

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.result} - Array of file/directory entries

  • {state.hook_id.path} - The listed path

  • {state.hook_id.connection} - The connection used

Examples

Process Files in Directory

List files and process them in a group:

hooks:
  pre:
    - type: list
      id: file_list
      location: "aws_s3/data/{run.stream.name}/"
      recursive: true

    - type: group
      loop: state.file_list.result
      steps:
        - type: log
          if: loop.value.is_file
          message: "Processing file: {loop.value.name}"

Archive Old Files

List and archive files older than a certain date:

hooks:
  post:
    - type: list
      id: old_files
      location: "gcs/temp/{run.stream.name}/"
      recursive: true

    - type: group
      loop: state.old_files.result
      steps:
        - type: copy
          if: loop.value.updated_at_unix < timestamp.unix - 7*24*60*60  # 7 days old
          from: "{loop.value.location}"
          to: "gcs/archive/{timestamp.year}/{timestamp.month}/{loop.value.name}"

Size-based Processing

Process files based on their size:

hooks:
  pre:
    - type: list
      id: large_files
      location: "aws_s3/uploads/"

    - type: group
      loop: state.large_files.result
      steps:
        - type: log
          if: loop.value.size > 1024*1024  # > 1MB
          message: "Large file detected: {loop.value.name} ({loop.value.size} bytes)"

Store Results for Later Use

Use the into parameter to store list results in the store or environment variables for use across pipeline steps:

steps:
  # List files and store in replication store
  - type: list
    location: "s3/data/inbox/"
    recursive: true
    only: files
    into: inbox_files  # Store in replication store

  # Use stored results in subsequent step
  - type: log
    message: "Found {len(store.inbox_files)} files in inbox"

  # Process each file
  - type: group
    loop: store.inbox_files
    steps:
      - type: log
        message: "Processing: {loop.value.name} ({loop.value.size} bytes)"

Store Results as Environment Variable

Store list results as a JSON environment variable for use in subsequent pipeline steps or replications:

steps:
  # List files and store as environment variable
  - type: list
    location: "local/exports/"
    only: files
    into: env.EXPORT_FILES  # Store as environment variable (JSON string)

  # The env var is now available in subsequent steps
  - type: log
    message: "Export files available: {env.EXPORT_FILES}"

  # Run replication that can access the env var
  - type: replication
    path: /path/to/replication.yaml

Store File Paths for API Iteration

List files and use their paths to drive API endpoint iteration:

steps:
  # List CSV files to process
  - type: list
    location: "s3/data/uploads/"
    recursive: false
    only: files
    into: upload_files

  # Store just the file paths
  - type: store
    key: env.FILE_PATHS
    value: >
      {join(map(store.upload_files, "location"), ",")}

  # Log the files that will be processed
  - type: log
    message: "Will process files: {env.FILE_PATHS}"

  # Run replication using file list
  - type: replication
    path: /path/to/process_files.yaml

Combined with Query Results

Combine list results with database queries:

steps:
  # List available data files
  - type: list
    location: "s3/data/raw/"
    recursive: true
    only: files
    into: raw_files

  # Query database for already processed files
  - type: query
    connection: MY_DB
    query: |
      SELECT filename
      FROM processed_files
      WHERE processed_date > CURRENT_DATE - INTERVAL '7 days'
    into: processed_files

  # Store count of new files to process
  - type: store
    key: new_file_count
    value: >
      {len(store.raw_files) - len(store.processed_files)}

  # Log processing status
  - type: log
    message: |
      File Processing Status:
      - Total raw files: {len(store.raw_files)}
      - Already processed: {len(store.processed_files)}
      - New files to process: {store.new_file_count}

  # Process only new files
  - type: group
    if: store.new_file_count > 0
    loop: store.raw_files
    steps:
      - type: log
        message: "Processing new file: {loop.value.name}"

Notes

  • Not all filesystems provide all metadata fields

  • Timestamps may be zero if not supported by the filesystem

  • Directory sizes are typically reported as 0

  • The hook will not fail if the path doesn't exist or is empty

  • When using into, the result array is stored directly without the wrapping object (no need to access .result)

  • Use into: "variable_name" for replication store (accessible via {store.variable_name})

  • Use into: "env.VARIABLE_NAME" for environment variables (accessible via {env.VARIABLE_NAME}, stored as JSON string)

Last updated

Was this helpful?