Write

Write hooks allow you to write content to files in any file-based storage connection. This is particularly useful for creating reports, saving processed data, generating configuration files, or writing logs.

Configuration

- type: write
  to: "connection/path/to/file.txt"     # Required: Destination Location
  content: "text content to write"      # Required: Content to write
  on_failure: abort       # Optional: abort/warn/quiet/skip
  id: my_id      # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

to

Yes

The destination location string. Contains connection name and file path.

content

Yes

The content to write to the file. Supports variable substitution. Can also use file://path/to/file to read content from a local file.

on_failure

No

What to do if the write fails (abort/warn/quiet/skip)

Output

When the write hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
target_url: "s3://bucket/path/to/file.txt"  # The normalized URI of the target file
bytes_written: 1024  # Number of bytes written

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.target_url} - The normalized URI of the target file

  • {state.hook_id.bytes_written} - Number of bytes written

Examples

Generate Data Processing Report

Create a summary report after data processing:

hooks:
  post:
    - type: write
      to: "s3/reports/processing_summary_{timestamp.YYYY-MM-DD}.txt"
      content: |
        Data Processing Summary
        ======================
        
        Stream: {run.stream.name}
        Start Time: {run.start_time}
        End Time: {run.end_time}
        Total Rows: {run.total_rows}
        Status: {run.status}
        
        Target: {target.connection}/{target.object}
        Total Bytes Written: {run.total_bytes}
        
        Generated on: {timestamp.YYYY-MM-DD HH:mm:ss}

Create JSON Configuration File

Generate a configuration file with runtime data:

hooks:
  pre:
    - type: write
      to: "local/config/runtime_config.json"
      content: |
        {
          "processing_date": "{timestamp.YYYY-MM-DD}",
          "source_connection": "{source.connection}",
          "target_connection": "{target.connection}",
          "stream_name": "{run.stream.name}",
          "environment": "{env.ENV_NAME}",
          "user": "{env.USER}"
        }

Write Query Results to File

Save query results as a formatted report:

hooks:
  post:
    - type: query
      connection: target_db
      query: |
        SELECT 
          COUNT(*) as total_records,
          MAX(created_at) as latest_record,
          MIN(created_at) as oldest_record
        FROM {run.object.full_name}
      id: stats_query
      
    - type: write
      to: "gcs/reports/table_stats_{run.stream.name}_{timestamp.YYYY-MM-DD}.txt"
      content: |
        Table Statistics Report
        =====================
        
        Table: {run.object.full_name}
        Total Records: {state.stats_query.result[0].total_records}
        Latest Record: {state.stats_query.result[0].latest_record}
        Oldest Record: {state.stats_query.result[0].oldest_record}
        
        Report generated: {timestamp.YYYY-MM-DD HH:mm:ss}

Create Error Log

Write error information to a log file when processing fails:

hooks:
  post:
    - type: write
      if: run.status == "error"
      to: "local/logs/error_log_{timestamp.YYYY-MM-DD}.txt"
      content: |
        ERROR LOG ENTRY
        ===============
        
        Timestamp: {timestamp.YYYY-MM-DD HH:mm:ss}
        Stream: {run.stream.name}
        Source: {source.connection}/{source.object}
        Target: {target.connection}/{target.object}
        Error: {run.error}
        
        Environment: {env.ENV_NAME}
        User: {env.USER}
        
        ---
      on_failure: warn

Generate CSV Report

Create a CSV file with processed data statistics:

hooks:
  post:
    - type: write
      to: "s3/reports/daily_stats_{timestamp.YYYY-MM-DD}.csv"
      content: |
        date,stream_name,source_connection,target_connection,rows_processed,bytes_processed,status,duration_seconds
        {timestamp.YYYY-MM-DD},{run.stream.name},{source.connection},{target.connection},{run.total_rows},{run.total_bytes},{run.status},{run.duration}

Write Content from Local File

Write content from a local file to a remote location:

hooks:
  post:
    - type: write
      to: "s3/reports/daily_report_{timestamp.YYYY-MM-DD}.html"
      content: "file://templates/report_template.html"

Write Processed Content

Process stored content and write it to a new file:

hooks:
  pre:
    - type: read
      from: "s3/templates/email_template.html"
      into: "template"
      
  post:
    - type: write
      to: "local/output/personalized_email_{timestamp.YYYY-MM-DD-HH-mm}.html"
      content: |
        {store.template | replace('{{USER_NAME}}', '{env.USER_NAME}') | replace('{{DATE}}', '{timestamp.YYYY-MM-DD}') | replace('{{ROWS_PROCESSED}}', '{run.total_rows}')}

Create Backup Metadata

Write metadata about the backup operation:

hooks:
  end:
    - type: write
      to: "s3/backups/metadata/backup_{timestamp.YYYY-MM-DD-HH-mm}.json"
      content: |
        {
          "backup_timestamp": "{timestamp.YYYY-MM-DD HH:mm:ss}",
          "source": {
            "connection": "{source.connection}",
            "object": "{source.object}",
            "total_rows": {run.total_rows}
          },
          "target": {
            "connection": "{target.connection}",
            "object": "{target.object}",
            "bytes_written": {run.total_bytes}
          },
          "status": "{run.status}",
          "duration_seconds": {run.duration},
          "environment": "{env.ENV_NAME}"
        }

Write Multi-line SQL Script

Generate a SQL script based on runtime data:

hooks:
  post:
    - type: write
      to: "local/sql/cleanup_{run.stream.name}_{timestamp.YYYY-MM-DD}.sql"
      content: |
        -- Cleanup script for {run.stream.name}
        -- Generated on {timestamp.YYYY-MM-DD HH:mm:ss}
        
        BEGIN;
        
        -- Archive old data
        CREATE TABLE {run.object.full_name}_archive_{timestamp.YYYY_MM_DD} AS
        SELECT * FROM {run.object.full_name}
        WHERE created_at < CURRENT_DATE - INTERVAL '90 days';
        
        -- Delete old data
        DELETE FROM {run.object.full_name}
        WHERE created_at < CURRENT_DATE - INTERVAL '90 days';
        
        -- Update statistics
        ANALYZE {run.object.full_name};
        
        COMMIT;
        
        -- Summary: Processed {run.total_rows} rows

Conditional Writing

Write different content based on conditions:

hooks:
  post:
    - type: write
      if: run.total_rows > 1000
      to: "s3/alerts/high_volume_{timestamp.YYYY-MM-DD}.txt"
      content: |
        HIGH VOLUME ALERT
        =================
        
        Stream: {run.stream.name}
        Rows Processed: {run.total_rows}
        Threshold: 1000
        Time: {timestamp.YYYY-MM-DD HH:mm:ss}
        
        This requires immediate attention.
      
    - type: write
      if: run.total_rows <= 1000
      to: "s3/logs/normal_volume_{timestamp.YYYY-MM-DD}.txt"
      content: |
        Normal processing completed for {run.stream.name}: {run.total_rows} rows processed.

Last updated

Was this helpful?