Replication

Replication hooks allow you to trigger nested replication tasks within your workflow. This is particularly useful for orchestrating complex data pipelines, running dependent replications, or managing multi-step data transformations.

Configuration

- type: replication
  path: "path/to/replication.yaml"  # Required: Path to replication configuration file
  mode: "full-refresh"          # Optional: Override replication mode
  range: "2021-01-01,2022-01-01"    # Optional: Override backfill range for incremental mode
  streams: ["stream1", "stream2"]   # Optional: List of specific streams to run
  env:                              # Optional: Environment variables for the replication
    ENV_VAR1: "value1"
    ENV_VAR2: "value2"
  on_failure: abort                 # Optional: abort/warn/quiet/skip
  id: my_id                         # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.

Properties

Property
Required
Description

path

Yes

Path to the replication configuration file

mode

No

Override the replication mode (full/incremental)

range

No

Override the backfill range for incremental mode

streams

No

List of specific streams to run. If not provided, all streams will be run

env

No

Map of environment variables to set for the replication

on_failure

No

What to do if the replication fails (abort/warn/quiet/skip)

Output

When the replication hook executes successfully, it returns the following output that can be accessed in subsequent hooks:

status: success  # Status of the hook execution
path: "path/to/replication.yaml"  # The replication configuration file path
range: "2021-01-01,2022-01-01"  # The backfill range used (if specified)
streams: ["stream1", "stream2"]  # The streams that were processed

You can access these values in subsequent hooks using the following syntax (jmespath):

  • {state.hook_id.status} - Status of the hook execution

  • {state.hook_id.path} - The replication configuration file path

  • {state.hook_id.range} - The backfill range used

  • {state.hook_id.streams} - The streams that were processed

Examples

Run Dependent Replication

Execute a dependent replication after successful processing:

hooks:
  post:
    - type: replication
      if: run.status == "success"
      path: "configs/dependent_replication.yaml"
      on_failure: abort

Environment-Specific Replication

Run different replication configurations based on environment:

hooks:
  post:
    - type: replication
      path: "configs/{target.environment}/transform.yaml"
      streams: ["{run.stream.name}_processed"]
      env:
        SOURCE_SCHEMA: "{run.stream.schema}"
        TARGET_TABLE: "{run.object.name}_final"
      on_failure: warn

Conditional Stream Selection

Select streams based on runtime conditions:

hooks:
  post:
    - type: replication
      path: "configs/conditional_replication.yaml"
      if: run.total_rows > 1000
      streams:
        - "{run.stream.name}_analytics"
        - "{run.stream.name}_archive"
      env:
        PROCESS_DATE: "{timestamp.date}"
        SOURCE_TABLE: "{run.object.full_name}"

Last updated