MongoDB Adapter
The MongoDB adapter provides flexible NoSQL connectivity with automatic query translation and projection support.
Installation
npm install @openetl/mongodb
Features
- Query Translation - Automatic conversion of OpenETL filters to MongoDB operators
- Projection Support - Select specific fields to retrieve
- Sort and Pagination - Full control over result ordering and batching
- Flexible Authentication - Works with or without credentials
- Custom Queries - Execute raw MongoDB queries via JSON
- Full CRUD Support - Download (find) and Upload (insertMany) operations
Authentication
The MongoDB adapter uses basic authentication (credentials are optional):
const vault = {
'my-mongo': {
id: 'my-mongo',
type: 'basic',
credentials: {
host: 'localhost', // Default: 'localhost'
port: '27017', // Default: 27017
database: 'myapp', // Required
username: 'user', // Optional
password: 'secret', // Optional
},
},
};
Connection strings:
- With auth:
mongodb://user:secret@localhost:27017/myapp - Without auth:
mongodb://localhost:27017/myapp
Quick Start
import { Orchestrator } from 'openetl';
import { mongodb } from '@openetl/mongodb';
const vault = {
'my-mongo': {
id: 'my-mongo',
type: 'basic',
credentials: {
host: 'localhost',
port: '27017',
database: 'myapp',
username: '',
password: '',
},
},
};
const etl = Orchestrator(vault, { mongodb });
const pipeline = {
id: 'export-users',
source: {
id: 'users-source',
adapter_id: 'mongodb',
endpoint_id: 'collection_query',
credential_id: 'my-mongo',
config: {
table: 'users', // Collection name
},
fields: ['_id', 'email', 'name', 'createdAt'],
filters: [
{ field: 'active', operator: '=', value: 'true' },
],
sort: [{ field: 'createdAt', type: 'desc' }],
pagination: { type: 'offset', itemsPerPage: 100 },
},
};
const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);
Endpoints
collection_query
Query documents from a collection with filters, sorting, and pagination.
| Config Property | Required | Description |
|---|---|---|
table |
Yes | Collection name |
const connector = {
adapter_id: 'mongodb',
endpoint_id: 'collection_query',
credential_id: 'my-mongo',
config: {
table: 'users',
},
fields: ['_id', 'name', 'email'], // Empty = all fields
filters: [
{ field: 'status', operator: '=', value: 'active' },
],
sort: [
{ field: 'createdAt', type: 'desc' },
],
pagination: { type: 'offset', itemsPerPage: 100 },
};
Translated MongoDB Query:
db.users.find(
{ status: 'active' },
{ _id: 1, name: 1, email: 1 }
).sort({ createdAt: -1 }).skip(0).limit(100)
custom_query
Execute a custom MongoDB query using JSON.
const connector = {
adapter_id: 'mongodb',
endpoint_id: 'custom_query',
credential_id: 'my-mongo',
config: {
table: 'users',
custom_query: JSON.stringify({
$and: [
{ age: { $gte: 18 } },
{ status: { $in: ['active', 'pending'] } },
{ 'address.country': 'US' }
]
}),
},
fields: [],
};
collection_insert
Insert documents into a collection.
const connector = {
adapter_id: 'mongodb',
endpoint_id: 'collection_insert',
credential_id: 'my-mongo',
config: {
table: 'contacts',
},
fields: ['email', 'name', 'company'],
};
Query Translation
OpenETL filters are automatically translated to MongoDB query operators.
Operator Mapping
| OpenETL | MongoDB | Example Output |
|---|---|---|
= |
Equality | { field: value } |
!=, <> |
$ne |
{ field: { $ne: value } } |
> |
$gt |
{ field: { $gt: value } } |
>= |
$gte |
{ field: { $gte: value } } |
< |
$lt |
{ field: { $lt: value } } |
<= |
$lte |
{ field: { $lte: value } } |
IN |
$in |
{ field: { $in: [a, b] } } |
NOT IN |
$nin |
{ field: { $nin: [a, b] } } |
LIKE |
$regex |
{ field: { $regex: '.*pattern.*', $options: 'i' } } |
IS NULL |
$eq: null |
{ field: { $eq: null } } |
IS NOT NULL |
$ne: null |
{ field: { $ne: null } } |
Filter Examples
Simple filters:
filters: [
{ field: 'status', operator: '=', value: 'active' },
{ field: 'age', operator: '>=', value: '18' },
{ field: 'email', operator: 'LIKE', value: '%@example.com' },
]
Translated to:
{
status: 'active',
age: { $gte: '18' },
email: { $regex: '.*@example.com', $options: 'i' }
}
Filter Groups (AND/OR)
filters: [
{
op: 'OR',
filters: [
{ field: 'role', operator: '=', value: 'admin' },
{ field: 'role', operator: '=', value: 'moderator' },
],
},
]
Translated to:
{
$or: [
{ role: 'admin' },
{ role: 'moderator' }
]
}
Sorting
sort: [
{ field: 'createdAt', type: 'desc' },
{ field: 'name', type: 'asc' },
]
Translated to:
{ createdAt: -1, name: 1 }
Complete Example
import { Orchestrator, Pipeline } from 'openetl';
import { mongodb } from '@openetl/mongodb';
import { postgresql } from '@openetl/postgresql';
const vault = {
'mongo-source': {
id: 'mongo-source',
type: 'basic',
credentials: {
host: 'mongodb.example.com',
port: '27017',
database: 'production',
username: 'readonly',
password: process.env.MONGO_PASSWORD,
},
},
'pg-target': {
id: 'pg-target',
type: 'basic',
credentials: {
host: 'postgres.example.com',
database: 'analytics',
username: 'writer',
password: process.env.PG_PASSWORD,
},
},
};
const etl = Orchestrator(vault, { mongodb, postgresql });
const pipeline: Pipeline = {
id: 'sync-orders',
source: {
id: 'orders-source',
adapter_id: 'mongodb',
endpoint_id: 'collection_query',
credential_id: 'mongo-source',
config: {
table: 'orders',
},
fields: ['orderId', 'customerId', 'total', 'status', 'createdAt'],
filters: [
{ field: 'status', operator: '=', value: 'completed' },
],
sort: [{ field: 'createdAt', type: 'asc' }],
pagination: { type: 'offset', itemsPerPage: 1000 },
},
target: {
id: 'orders-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-target',
config: { schema: 'analytics', table: 'orders_archive' },
fields: ['order_id', 'customer_id', 'total', 'status', 'created_at'],
},
};
const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} orders`);