MongoDB Adapter

The MongoDB adapter provides flexible NoSQL connectivity with automatic query translation and projection support.

Installation

npm install @openetl/mongodb

Features

  • Query Translation - Automatic conversion of OpenETL filters to MongoDB operators
  • Projection Support - Select specific fields to retrieve
  • Sort and Pagination - Full control over result ordering and batching
  • Flexible Authentication - Works with or without credentials
  • Custom Queries - Execute raw MongoDB queries via JSON
  • Full CRUD Support - Download (find) and Upload (insertMany) operations

Authentication

The MongoDB adapter uses basic authentication (credentials are optional):

const vault = {
  'my-mongo': {
    id: 'my-mongo',
    type: 'basic',
    credentials: {
      host: 'localhost',       // Default: 'localhost'
      port: '27017',           // Default: 27017
      database: 'myapp',       // Required
      username: 'user',        // Optional
      password: 'secret',      // Optional
    },
  },
};

Connection strings:

  • With auth: mongodb://user:secret@localhost:27017/myapp
  • Without auth: mongodb://localhost:27017/myapp

Quick Start

import { Orchestrator } from 'openetl';
import { mongodb } from '@openetl/mongodb';

const vault = {
  'my-mongo': {
    id: 'my-mongo',
    type: 'basic',
    credentials: {
      host: 'localhost',
      port: '27017',
      database: 'myapp',
      username: '',
      password: '',
    },
  },
};

const etl = Orchestrator(vault, { mongodb });

const pipeline = {
  id: 'export-users',
  source: {
    id: 'users-source',
    adapter_id: 'mongodb',
    endpoint_id: 'collection_query',
    credential_id: 'my-mongo',
    config: {
      table: 'users',  // Collection name
    },
    fields: ['_id', 'email', 'name', 'createdAt'],
    filters: [
      { field: 'active', operator: '=', value: 'true' },
    ],
    sort: [{ field: 'createdAt', type: 'desc' }],
    pagination: { type: 'offset', itemsPerPage: 100 },
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);

Endpoints

collection_query

Query documents from a collection with filters, sorting, and pagination.

Config Property Required Description
table Yes Collection name
const connector = {
  adapter_id: 'mongodb',
  endpoint_id: 'collection_query',
  credential_id: 'my-mongo',
  config: {
    table: 'users',
  },
  fields: ['_id', 'name', 'email'],  // Empty = all fields
  filters: [
    { field: 'status', operator: '=', value: 'active' },
  ],
  sort: [
    { field: 'createdAt', type: 'desc' },
  ],
  pagination: { type: 'offset', itemsPerPage: 100 },
};

Translated MongoDB Query:

db.users.find(
  { status: 'active' },
  { _id: 1, name: 1, email: 1 }
).sort({ createdAt: -1 }).skip(0).limit(100)

custom_query

Execute a custom MongoDB query using JSON.

const connector = {
  adapter_id: 'mongodb',
  endpoint_id: 'custom_query',
  credential_id: 'my-mongo',
  config: {
    table: 'users',
    custom_query: JSON.stringify({
      $and: [
        { age: { $gte: 18 } },
        { status: { $in: ['active', 'pending'] } },
        { 'address.country': 'US' }
      ]
    }),
  },
  fields: [],
};

collection_insert

Insert documents into a collection.

const connector = {
  adapter_id: 'mongodb',
  endpoint_id: 'collection_insert',
  credential_id: 'my-mongo',
  config: {
    table: 'contacts',
  },
  fields: ['email', 'name', 'company'],
};

Query Translation

OpenETL filters are automatically translated to MongoDB query operators.

Operator Mapping

OpenETL MongoDB Example Output
= Equality { field: value }
!=, <> $ne { field: { $ne: value } }
> $gt { field: { $gt: value } }
>= $gte { field: { $gte: value } }
< $lt { field: { $lt: value } }
<= $lte { field: { $lte: value } }
IN $in { field: { $in: [a, b] } }
NOT IN $nin { field: { $nin: [a, b] } }
LIKE $regex { field: { $regex: '.*pattern.*', $options: 'i' } }
IS NULL $eq: null { field: { $eq: null } }
IS NOT NULL $ne: null { field: { $ne: null } }

Filter Examples

Simple filters:

filters: [
  { field: 'status', operator: '=', value: 'active' },
  { field: 'age', operator: '>=', value: '18' },
  { field: 'email', operator: 'LIKE', value: '%@example.com' },
]

Translated to:

{
  status: 'active',
  age: { $gte: '18' },
  email: { $regex: '.*@example.com', $options: 'i' }
}

Filter Groups (AND/OR)

filters: [
  {
    op: 'OR',
    filters: [
      { field: 'role', operator: '=', value: 'admin' },
      { field: 'role', operator: '=', value: 'moderator' },
    ],
  },
]

Translated to:

{
  $or: [
    { role: 'admin' },
    { role: 'moderator' }
  ]
}

Sorting

sort: [
  { field: 'createdAt', type: 'desc' },
  { field: 'name', type: 'asc' },
]

Translated to:

{ createdAt: -1, name: 1 }

Complete Example

import { Orchestrator, Pipeline } from 'openetl';
import { mongodb } from '@openetl/mongodb';
import { postgresql } from '@openetl/postgresql';

const vault = {
  'mongo-source': {
    id: 'mongo-source',
    type: 'basic',
    credentials: {
      host: 'mongodb.example.com',
      port: '27017',
      database: 'production',
      username: 'readonly',
      password: process.env.MONGO_PASSWORD,
    },
  },
  'pg-target': {
    id: 'pg-target',
    type: 'basic',
    credentials: {
      host: 'postgres.example.com',
      database: 'analytics',
      username: 'writer',
      password: process.env.PG_PASSWORD,
    },
  },
};

const etl = Orchestrator(vault, { mongodb, postgresql });

const pipeline: Pipeline = {
  id: 'sync-orders',
  source: {
    id: 'orders-source',
    adapter_id: 'mongodb',
    endpoint_id: 'collection_query',
    credential_id: 'mongo-source',
    config: {
      table: 'orders',
    },
    fields: ['orderId', 'customerId', 'total', 'status', 'createdAt'],
    filters: [
      { field: 'status', operator: '=', value: 'completed' },
    ],
    sort: [{ field: 'createdAt', type: 'asc' }],
    pagination: { type: 'offset', itemsPerPage: 1000 },
  },
  target: {
    id: 'orders-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-target',
    config: { schema: 'analytics', table: 'orders_archive' },
    fields: ['order_id', 'customer_id', 'total', 'status', 'created_at'],
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} orders`);

Related