# MongoDB

Sling supports Change Data Capture from MongoDB by using Change Streams, which are built on MongoDB's oplog (operations log). Each run reads document-level inserts, updates, replaces, and deletes from the change stream and merges them into the target table.

For general CDC concepts, the two-phase process, and all available options, see the [Change Capture overview](/concepts/change-capture.md).

## Prerequisites

### 1. Replica Set Required

MongoDB Change Streams require a replica set. **Standalone MongoDB instances are not supported.** A single-node replica set is sufficient for development and testing.

To convert a standalone instance to a single-node replica set:

1. Add the following to your `mongod.conf`:

   ```yaml
   replication:
     replSetName: "rs0"
   ```
2. Restart `mongod`, then initialize the replica set:

   ```javascript
   // In mongosh
   rs.initiate()
   ```
3. Verify the replica set is running:

   ```javascript
   rs.status()
   ```

{% hint style="warning" %}
Change Streams are not available on standalone MongoDB instances. If your MongoDB is standalone, convert it to a single-node replica set before enabling CDC.
{% endhint %}

{% hint style="info" %}
**MongoDB Atlas**: Change Streams work out of the box on M10+ dedicated clusters (which are always deployed as replica sets). No special setup is needed — just use the standard Atlas connection string. Note that Atlas Flex Clusters do not support Change Streams.
{% endhint %}

### 2. User Permissions

The MongoDB user needs the `read` role on the source database. This role includes both the `find` and `changeStream` privileges required for CDC:

```javascript
db.createUser({
  user: "sling_user",
  pwd: "secret",
  roles: [
    { role: "read", db: "my_database" }
  ]
})
```

To watch multiple databases, grant the `read` role on each, or use `readAnyDatabase` on the `admin` database for deployment-wide access.

### 3. Post-Images (Recommended)

MongoDB 6.0+ supports Change Stream post-images, which provide the complete document state after every update. This ensures full row data for UPDATE events without an additional database read.

To enable post-images on a collection:

```javascript
db.runCommand({
  collMod: "my_collection",
  changeStreamPreAndPostImages: { enabled: true }
})
```

{% hint style="info" %}
Post-images are optional. On MongoDB versions below 6.0, or on collections without post-images enabled, Sling falls back to `updateLookup` mode, which performs a separate read to fetch the current document on each UPDATE event. For most workloads this is sufficient, but enabling post-images provides stronger consistency guarantees and avoids the extra read.
{% endhint %}

### 4. Version Requirements

| Feature                                      | Minimum Version |
| -------------------------------------------- | --------------- |
| Change Streams (collection-level)            | MongoDB 3.6     |
| Change Streams (database-level)              | MongoDB 4.0     |
| Post-images (`changeStreamPreAndPostImages`) | MongoDB 6.0     |

Sling requires **MongoDB 4.0 or later** for database-level change stream watching.

## Quick Start

```bash
# Source MongoDB (no special CDC properties needed — uses standard connection)
sling conns set MY_MONGO type=mongodb host=mongo.example.com user=sling_user password=secret port=27017

# Target PostgreSQL
sling conns set MY_POSTGRES type=postgres host=pg.example.com user=postgres password=secret database=analytics

# State store (required for CDC)
export SLING_STATE='MY_POSTGRES/sling_state'
```

```yaml
# replication.yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: public.{stream_table}

streams:
  my_database.customers:
  my_database.orders:
```

```bash
# First run: performs the initial snapshot
sling run -r replication.yaml

# Subsequent runs: captures and applies changes
sling run -r replication.yaml
```

{% hint style="info" %}
No special CDC connection properties are needed for MongoDB. Sling uses the standard MongoDB connection to open Change Streams. The `primary_key` defaults to `[_id]` since every MongoDB document has an `_id` field.
{% endhint %}

## Examples

### Large Tables with Custom Chunk Size

For very large collections, adjust the chunk size to control memory usage and checkpointing frequency during the initial snapshot.

```yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: analytics.{stream_table}
  change_capture_options:
    snapshot_chunk_size: 50000  # 50k documents per chunk

streams:
  my_database.transactions:
    # This 10M-document collection will be loaded in ~200 chunks
    # If interrupted, it resumes from the last completed chunk
```

### Time-Bounded Snapshots for Very Large Tables

For collections with hundreds of millions of documents, the initial snapshot can take hours. Use `snapshot_run_duration` to cap how long each run spends on the snapshot. The next run automatically resumes from the last completed chunk.

```yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: analytics.{stream_table}
  change_capture_options:
    snapshot_chunk_size: 50000
    snapshot_run_duration: 30m  # spend at most 30 minutes per run on the snapshot

streams:
  my_database.huge_events:
    # 500M documents — will take multiple runs to complete the initial load
    # Each run processes ~30 minutes worth of chunks, then exits cleanly
```

### High-Throughput Workloads

For collections with heavy write activity, increase `run_max_events` and `run_max_duration` so each run captures more changes.

```yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: warehouse.{stream_table}
  change_capture_options:
    run_max_events: 50000  # Process up to 50k events per run
    run_max_duration: 60s  # Wait up to 60 seconds for events

streams:
  my_database.click_events:
  my_database.page_views:
```

### Soft Deletes

Keep deleted documents in the target instead of physically removing them. Deleted rows are marked with `_sling_synced_op = 'D'`. Useful for audit trails or when downstream queries need to detect deletions.

```yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: public.{stream_table}
  change_capture_options:
    soft_delete: true

streams:
  my_database.customers:
  my_database.subscriptions:
```

When a document is deleted in MongoDB, the target row is preserved with `_sling_synced_op` set to `'D'` and `_sling_synced_at` updated to the current timestamp. A subsequent re-insert of the same `_id` restores the row with the appropriate operation type.

### Mixed Streams with Per-Stream Overrides

Different collections can have different CDC options.

```yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: public.{stream_table}

streams:
  # High-volume collection: larger batches
  my_database.events:
    change_capture_options:
      run_max_events: 100000
      run_max_duration: 2m

  # Audit collection: keep soft deletes
  my_database.user_accounts:
    change_capture_options:
      soft_delete: true

  # Standard collection: uses defaults
  my_database.products:
```

### Replay / Backfill from a Point in Time

If target data becomes inconsistent, you can replay changes from an earlier position. The `replay_from` value is applied exactly once per unique value.

```yaml
source: MY_MONGO
target: MY_POSTGRES

defaults:
  mode: change-capture
  primary_key: [_id]
  object: public.{stream_table}
  change_capture_options:
    replay_from: "2025-06-01T00:00:00Z"  # Re-process all changes since June 1

streams:
  my_database.orders:
```

After the replay run completes, remove or change the `replay_from` value. Leaving it unchanged has no effect (it is only applied once).

## Replay Formats

The `replay_from` option accepts this MongoDB-specific position format:

* **RFC 3339 timestamp**: `2025-06-01T00:00:00Z` — resolved to a MongoDB `operationTime` (ClusterTime)

## Oplog Retention

MongoDB's oplog is a capped collection. If the oplog wraps around and removes entries that Sling hasn't processed, the Change Stream will be invalidated.

Check the current oplog size and retention window:

```javascript
db.getReplicationInfo()
```

Ensure the oplog window is longer than the maximum gap between CDC runs. To resize the oplog:

```javascript
// Resize oplog to 10 GB (MongoDB 4.0+)
db.adminCommand({ replSetResizeOplog: 1, size: 10240 })
```

{% hint style="warning" %}
If the oplog wraps past Sling's saved position, the next run will detect the invalidated Change Stream and automatically perform a fresh initial snapshot.
{% endhint %}

## Document Flattening

MongoDB documents are nested JSON. Sling flattens nested documents into columns using dot notation — for example, a field `address.city` in a nested document becomes the column `address__city` in the target table. Arrays and deeply nested objects beyond the flattening depth are serialized as JSON strings.

## Troubleshooting

### "CDC not supported for \<type>"

Ensure your source connection is configured as `type=mongodb`.

### "change stream not supported"

MongoDB Change Streams require a replica set. Standalone instances are not supported. Convert to a single-node replica set:

```javascript
// In mongod.conf, add:
// replication:
//   replSetName: "rs0"

// Then restart mongod and initialize:
rs.initiate()
```

### "not authorized to run changeStream"

The MongoDB user needs the `read` role on the source database, which includes the `changeStream` privilege:

```javascript
db.grantRolesToUser("sling_user", [
  { role: "read", db: "my_database" }
])
```

### Initial snapshot keeps restarting

Ensure `SLING_STATE` is configured. Without state persistence, Sling cannot track that the snapshot completed and will restart it on every run.

### "could not resolve replay\_from position"

The `replay_from` value must be a valid RFC 3339 timestamp.

### Missing fields in UPDATE events

Enable post-images (MongoDB 6.0+) for complete document state on updates:

```javascript
db.runCommand({
  collMod: "my_collection",
  changeStreamPreAndPostImages: { enabled: true }
})
```

Without post-images, Sling uses `updateLookup` which reads the current document state. This works for most cases but may miss intermediate values in high-write scenarios.

### "change stream invalidated"

The oplog wrapped past the saved resume position. Sling will automatically re-snapshot on the next run. To prevent this, increase oplog size or run CDC more frequently:

```javascript
// Resize oplog to 10 GB
db.adminCommand({ replSetResizeOplog: 1, size: 10240 })
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.slingdata.io/concepts/change-capture/mongodb.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
