Architecture Overview
This section unpacks OpenETL's structure and core pieces and how they flow. It's your high-level map to the framework.
Core Components
Vault
The Vault is your credential locker, storing auth details (e.g., OAuth tokens, API keys) as a key-value map. It keeps secrets safe and accessible.
Adapter
Adapters connect to data sources such as HubSpot APIs or PostgreSQL databases. They manage the specifics for each provider and do the heavy lifting: connecting, downloading, and uploading.
Connector
A Connector tells an adapter what to do—endpoint, fields, filters. It's the configuration layer linking adapters to your pipeline.
Pipeline
The Pipeline outlines your ETL job: source, target, transformations, plus optional schedule
(e.g., daily), logging
(progress tracking), and events
(e.g., start, finish).
Orchestrator
The Orchestrator runs the show. It reads your pipeline, grabs credentials from the Vault, sets up Adapters via Connectors, and executes the ETL flow.
Stateless Design
OpenETL is intentionally stateless by design, which means it doesn't keep track of previous executions, sync states, or processed records internally. This design choice offers several benefits:
- Simplicity: No hidden state means predictable behavior and easier debugging
- Portability: Run pipelines anywhere without needing a state database
- Control: You determine exactly how and where state is managed
- Scalability: Distribute workloads across multiple instances without shared state
For incremental data syncs (e.g., fetching only new sales records since last run), you'll need to:
- Track your own sync state: Store timestamps, IDs, or page markers of the last processed records
- Pass sync parameters: Include these markers in your pipeline configuration
// Example: Incremental sync with custom state tracking
const lastSyncTime = await db.getSyncState('sales-sync'); // Your state storage
orchestrator.runPipeline({
id: 'incremental-sales',
source: {
adapter_id: 'hubspot',
endpoint_id: 'deals',
credential_id: 'hs-auth',
fields: ['amount', 'closedate'],
filters: [
'time_inserted': lastSyncTime
]
},
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'sales'
}
},
// Update your sync state after successful completion
oncomplete: async (result) => {
if (result.success) {
const newSyncTime = new Date().toISOString();
await db.updateSyncState('sales-sync', newSyncTime);
}
}
});
The stateless approach gives you flexibility in how you implement incremental syncs. You can store your sync state in your own database, file system, or configuration service based on your application's needs.
How It Works
The Orchestrator adapts to your pipeline's needs—here's how:
Download and Upload
Extracts data from a source (e.g., HubSpot) via an adapter, transforms it if needed, and loads it to a target (e.g., PostgreSQL). It's the full ETL cycle.
Moves data end-to-end—extracting contacts and loading them into a table.
import Orchestrator from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';
const vault = {
'hs-auth': {
type: 'oauth2',
credentials: { /* ... */ }
},
'pg-auth': {
type: 'basic',
credentials: { /* ... */ }
},
};
const orchestrator = Orchestrator(vault, {
hubspot,
postgresql
});
orchestrator.runPipeline({
id: 'hs-to-pg',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['firstname']
},
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'contacts'
}
},
});
Download Only
Pulls data from a source adapter and stops—ideal for exporting or checking data without loading it elsewhere.
Fetches data and hands it off—no target needed, just raw output.
orchestrator.runPipeline({
id: 'hs-export',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['email']
},
onload: data => console.log(data), // Handle downloaded data
});
Upload Only
Takes pre-provided data (no source adapter) and pushes it to a target—perfect for loading existing datasets.
Pushes your data straight to a target—skip the extraction step.
orchestrator.runPipeline({
id: 'static-to-pg',
data: [{
firstname: 'Jane'
}],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'contacts'
}
},
});
Data Provided to Upload
Pairs pre-provided data with transformations, tweaking it before loading to a target—customize first, then save.
Takes your data, tweaks it (e.g., uppercase), then loads it.
orchestrator.runPipeline({
id: 'transform-to-pg',
data: [{
name: 'john doe'
}],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'contacts'
},
transform: [{
type: 'uppercase',
options: {
field: 'name',
to: 'firstname'
}
}],
},
});
These flows show OpenETL's versatility—adapt it to your needs!
Dig deeper in Core Concepts!