Products

Architecture Overview

This document describes OpenETL's architecture, including core components, execution workflows, and design principles.

Core Components

Vault

The Vault stores authentication credentials (OAuth tokens, API keys, database credentials) as a key-value map. Credentials are referenced by ID in pipeline configurations.

Adapter

Adapters implement interfaces to specific data sources (APIs, databases). Each adapter provides methods for:

  • connect(): Establishing connections and authenticating
  • download(): Extracting data with pagination support
  • upload(): Loading data to the target system
  • disconnect(): Cleaning up resources (optional)

Connector

Connectors are configuration objects that specify:

  • Adapter ID and endpoint
  • Credential reference
  • Field selection
  • Filters and transformations
  • Pagination settings

Pipeline

Pipelines define ETL workflows, including:

  • Source connector (data extraction)
  • Target connector (data loading)
  • Transformation chain
  • Error handling configuration
  • Rate limiting settings
  • Event callbacks (logging, completion hooks)

Orchestrator

The Orchestrator executes pipelines by:

  1. Resolving credentials from the Vault
  2. Initializing adapters with connector configurations
  3. Executing the data flow (extract, transform, load)
  4. Managing error handling and retries
  5. Emitting events for monitoring

Stateless Design

OpenETL does not maintain internal state between pipeline executions. The framework does not track:

  • Previous execution timestamps
  • Processed record IDs
  • Sync checkpoints
  • Execution history

This design provides:

  • Predictable execution: No hidden state affecting pipeline behavior
  • Deployment flexibility: No state database required
  • User-controlled state: Application manages state according to requirements
  • Horizontal scalability: Multiple instances can run independently

For incremental synchronization, applications must:

  1. Implement external state storage for sync markers
  2. Pass state parameters to pipeline configurations

Incremental Sync Implementation

// Incremental synchronization with external state management
const lastSyncTime = await db.getSyncState('sales-sync'); // Your state storage

orchestrator.runPipeline({
    id: 'incremental-sales',
    source: {
        adapter_id: 'hubspot',
        endpoint_id: 'deals',
        credential_id: 'hs-auth',
        fields: ['amount', 'closedate'],
        filters: [
            'time_inserted': lastSyncTime
        ]
    },
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'sales'
        }
    },
    oncomplete: async (result) => {
        if (result.success) {
            await db.updateSyncState('sales-sync', new Date().toISOString());
        }
    }
});

State storage options include databases, file systems, or external configuration services, depending on application requirements.

Pipeline Execution Modes

The Orchestrator supports multiple execution modes based on pipeline configuration:

Full ETL (Extract, Transform, Load)

Executes the complete data flow: extraction from source, optional transformation, and loading to target.

import Orchestrator from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';

const vault = {
    'hs-auth': {
        type: 'oauth2',
        credentials: { /* ... */ }
    },
    'pg-auth': {
        type: 'basic',
        credentials: { /* ... */ }
    },
};
const orchestrator = Orchestrator(vault, {
    hubspot,
    postgresql
});

orchestrator.runPipeline({
    id: 'hs-to-pg',
    source: {
        adapter_id: 'hubspot',
        endpoint_id: 'contacts',
        credential_id: 'hs-auth',
        fields: ['firstname']
    },
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'contacts'
        }
    },
});

Extract Only

Executes only the extraction phase. Data is returned via the onload callback without loading to a target.

orchestrator.runPipeline({
    id: 'hs-export',
    source: {
        adapter_id: 'hubspot',
        endpoint_id: 'contacts',
        credential_id: 'hs-auth',
        fields: ['email']
    },
    onload: data => console.log(data),
});

Load Only

Loads pre-existing data to a target without extraction. Data is provided directly in the pipeline configuration.

orchestrator.runPipeline({
    id: 'static-to-pg',
    data: [{
        firstname: 'Jane'
    }],
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'contacts'
        }
    },
});

Transform and Load

Applies transformations to provided data before loading to a target.

orchestrator.runPipeline({
    id: 'transform-to-pg',
    data: [{
        name: 'john doe'
    }],
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'contacts'
        },
        transform: [{
            type: 'uppercase',
            options: {
                field: 'name',
                to: 'firstname'
            }
        }],
    },
});

Additional Resources