# Examples

This page provides practical examples of Sling Pipelines, demonstrating how to chain multiple steps for complex data workflows. These examples build upon the concepts from the [Pipeline](/concepts/pipeline.md), [Hooks](/concepts/hooks.md) and [Functions](/concepts/functions.md) documentation.

Each example includes:

* A brief description
* The YAML configuration
* Key concepts demonstrated

## Basic Pipeline with Logging and Replication

This simple pipeline logs the start, runs a replication, and logs the completion with runtime state.

```yaml
steps:
  - type: log
    message: 'Starting pipeline execution on {date_format(now(), "%Y-%m-%d %H:%M:%S")}. Runtime state: {runtime_state}'

  - type: replication
    path: path/to/your/replication.yaml
    id: main_replication

  - type: log
    message: 'Pipeline completed. Final state: {runtime_state}'
    level: info

  - type: command
    command: 'echo "Replication status: {upper(state.main_replication.status)}"'
    print: true
```

**Key Concepts:**

* Basic sequencing of steps
* Using [`log`](/concepts/hooks/log.md) for monitoring with `date_format` and `now` functions
* Running a [`replication`](/concepts/hooks/replication.md) as a step
* Accessing state from previous steps
* Executing system commands with [`command`](/concepts/hooks/command.md) using `upper` function

## File Processing Pipeline with Looping

This pipeline lists files from S3, copies them to Azure, and logs the process using a group for looping.

```yaml
steps:
  - type: list
    id: s3_files
    location: aws_s3/your-bucket/files/
    recursive: true
    only: files

  - type: group
    loop: state.s3_files.result
    steps:
      - type: log
        message: 'Processing file {loop.index + 1}: {loop.value.name} ({loop.value.size} bytes)'

      - type: copy
        from: '{loop.value.location}'
        to: azure_storage/processed/{coalesce(loop.value.name, "unnamed_file")}.processed
        id: file_copy

      - type: log
        if: state.file_copy.bytes_written > 0
        message: 'Successfully copied {state.file_copy.bytes_written} bytes'
        level: info

  - type: log
    message: 'Processed {length(state.s3_files.result)} files'
```

**Key Concepts:**

* Listing files with [`list`](/concepts/hooks/list.md)
* Looping with [`group`](/concepts/hooks/group.md) and `loop`
* Conditional logging with [`log`](/concepts/hooks/log.md)
* File transfer using [`copy`](/concepts/hooks/copy.md)
* Accessing loop variables (`loop.index`, `loop.value`)
* Using functions like `upper`, `split_part`, `coalesce`, and `length` in expressions

## Data Quality Pipeline

This pipeline runs a replication, performs quality checks via queries, and notifies if issues are found.

```yaml
steps:
  - type: replication
    path: replications/data_sync.yaml
    id: data_sync

  - type: query
    connection: target_db
    query: |
      SELECT COUNT(*) as invalid_count
      FROM target_schema.my_table
      WHERE some_column IS NULL
    id: quality_check
    into: qc_results

  - type: check
    check: store.qc_results[0].invalid_count == 0
    failure_message: 'Found {store.qc_results[0].invalid_count} invalid records'
    on_failure: warn

  - type: http
    if: store.qc_results[0].invalid_count > 0
    url: https://alerts.example.com/notify
    method: POST
    payload: |
      {
        "issue": "Data quality failure",
        "details": "Invalid records: {store.qc_results[0].invalid_count}",
        "table": "target_schema.my_table",
        "checked_at": "{date_format(now(), "%Y-%m-%d")}"
      }
```

**Key Concepts:**

* Running replications with [`replication`](/concepts/hooks/replication.md)
* Executing database queries with [`query`](/concepts/hooks/query.md)
* Validation with [`check`](/concepts/hooks/check.md)
* Sending notifications via [`http`](/concepts/hooks/http.md) using `date_format` function
* Storing query results with `into`
* Conditional execution
* Accessing stored values with `store.`

## Cleanup and Archiving Pipeline

This pipeline archives files after processing and cleans up temporary data.

```yaml
steps:
  - type: list
    id: temp_files
    location: local//tmp/processing/
    only: files

  - type: group
    loop: state.temp_files.result
    steps:
      - type: copy
        from: '{loop.value.location}'
        to: aws_s3/archive/{timestamp.YYYY}/{timestamp.MM}/{loop.value.name}'

      - type: delete
        location: '{loop.value.location}'
        on_failure: warn

  - type: log
    message: 'Archived and deleted {length(state.temp_files.result)} files'
```

**Key Concepts:**

* File discovery using [`list`](/concepts/hooks/list.md)
* Iterative processing with [`group`](/concepts/hooks/group.md)
* Archiving files with [`copy`](/concepts/hooks/copy.md)
* Cleanup using [`delete`](/concepts/hooks/delete.md)
* Logging results with [`log`](/concepts/hooks/log.md)
* Using timestamps in file paths
* Error handling with `on_failure`

## Advanced Pipeline with Groups and Conditions

This pipeline uses nested groups, conditions, and multiple step types for a complex workflow.

```yaml
steps:
  - type: group
    id: preparation
    steps:
      - type: log
        message: 'Preparing environment'

      - type: command
        command: mkdir -p /tmp/processing
        print: true

  - type: replication
    path: replications/main.yaml
    id: main_rep

  - type: group
    if: state.main_rep.status == "success"
    steps:
      - type: query
        connection: target_db
        query: VACUUM ANALYZE {state.main_rep.object.full_name}

      - type: log
        message: 'Optimization complete'

  - type: group
    if: state.main_rep.status == "error"
    steps:
      - type: log
        message: 'Error occurred: {state.main_rep.error}'
        level: error

      - type: http
        url: https://errors.example.com/report
        method: POST
        payload: '{state.main_rep}'
```

**Key Concepts:**

* Organized workflows using [`group`](/concepts/hooks/group.md)
* Logging with [`log`](/concepts/hooks/log.md)
* System commands via [`command`](/concepts/hooks/command.md)
* Data replication with [`replication`](/concepts/hooks/replication.md)
* Database optimization using [`query`](/concepts/hooks/query.md)
* Nested groups for organization
* Conditional execution based on previous step status
* Error handling branch
* JSON serialization with `tojson`

These examples demonstrate the flexibility of Sling Pipelines and how to use built-in functions in expressions. You can combine and extend them based on your specific needs. For more details on individual step types, refer to the [Hooks](/concepts/hooks.md) documentation. For a full list of available functions, see the [functions documentation](/concepts/functions.md).


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/concepts/pipeline/examples.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
