Architecture Overview
This document describes OpenETL's architecture, including core components, execution workflows, and design principles.
Core Components
Vault
The Vault stores authentication credentials (OAuth tokens, API keys, database credentials) as a key-value map. Credentials are referenced by ID in pipeline configurations.
Adapter
Adapters implement interfaces to specific data sources (APIs, databases). Each adapter provides methods for:
connect(): Establishing connections and authenticatingdownload(): Extracting data with pagination supportupload(): Loading data to the target systemdisconnect(): Cleaning up resources (optional)
Connector
Connectors are configuration objects that specify:
- Adapter ID and endpoint
- Credential reference
- Field selection
- Filters and transformations
- Pagination settings
Pipeline
Pipelines define ETL workflows, including:
- Source connector (data extraction)
- Target connector (data loading)
- Transformation chain
- Error handling configuration
- Rate limiting settings
- Event callbacks (logging, completion hooks)
Orchestrator
The Orchestrator executes pipelines by:
- Resolving credentials from the Vault
- Initializing adapters with connector configurations
- Executing the data flow (extract, transform, load)
- Managing error handling and retries
- Emitting events for monitoring
Stateless Design
OpenETL does not maintain internal state between pipeline executions. The framework does not track:
- Previous execution timestamps
- Processed record IDs
- Sync checkpoints
- Execution history
This design provides:
- Predictable execution: No hidden state affecting pipeline behavior
- Deployment flexibility: No state database required
- User-controlled state: Application manages state according to requirements
- Horizontal scalability: Multiple instances can run independently
For incremental synchronization, applications must:
- Implement external state storage for sync markers
- Pass state parameters to pipeline configurations
Incremental Sync Implementation
// Incremental synchronization with external state management
const lastSyncTime = await db.getSyncState('sales-sync'); // Your state storage
orchestrator.runPipeline({
id: 'incremental-sales',
source: {
adapter_id: 'hubspot',
endpoint_id: 'deals',
credential_id: 'hs-auth',
fields: ['amount', 'closedate'],
filters: [
'time_inserted': lastSyncTime
]
},
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'sales'
}
},
oncomplete: async (result) => {
if (result.success) {
await db.updateSyncState('sales-sync', new Date().toISOString());
}
}
});
State storage options include databases, file systems, or external configuration services, depending on application requirements.
Pipeline Execution Modes
The Orchestrator supports multiple execution modes based on pipeline configuration:
Full ETL (Extract, Transform, Load)
Executes the complete data flow: extraction from source, optional transformation, and loading to target.
import Orchestrator from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';
const vault = {
'hs-auth': {
type: 'oauth2',
credentials: { /* ... */ }
},
'pg-auth': {
type: 'basic',
credentials: { /* ... */ }
},
};
const orchestrator = Orchestrator(vault, {
hubspot,
postgresql
});
orchestrator.runPipeline({
id: 'hs-to-pg',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['firstname']
},
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'contacts'
}
},
});
Extract Only
Executes only the extraction phase. Data is returned via the onload callback without loading to a target.
orchestrator.runPipeline({
id: 'hs-export',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['email']
},
onload: data => console.log(data),
});
Load Only
Loads pre-existing data to a target without extraction. Data is provided directly in the pipeline configuration.
orchestrator.runPipeline({
id: 'static-to-pg',
data: [{
firstname: 'Jane'
}],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'contacts'
}
},
});
Transform and Load
Applies transformations to provided data before loading to a target.
orchestrator.runPipeline({
id: 'transform-to-pg',
data: [{
name: 'john doe'
}],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: {
schema: 'public',
table: 'contacts'
},
transform: [{
type: 'uppercase',
options: {
field: 'name',
to: 'firstname'
}
}],
},
});
Additional Resources
- Adapters: Adapter implementation and usage
- Pipelines: Pipeline configuration reference
- Authentication: Vault and credential management