Products

Architecture Overview

This section unpacks OpenETL's structure and core pieces and how they flow. It's your high-level map to the framework.

Core Components

Vault

The Vault is your credential locker, storing auth details (e.g., OAuth tokens, API keys) as a key-value map. It keeps secrets safe and accessible.

Adapter

Adapters connect to data sources such as HubSpot APIs or PostgreSQL databases. They manage the specifics for each provider and do the heavy lifting: connecting, downloading, and uploading.

Connector

A Connector tells an adapter what to do—endpoint, fields, filters. It's the configuration layer linking adapters to your pipeline.

Pipeline

The Pipeline outlines your ETL job: source, target, transformations, plus optional schedule (e.g., daily), logging (progress tracking), and events (e.g., start, finish).

Orchestrator

The Orchestrator runs the show. It reads your pipeline, grabs credentials from the Vault, sets up Adapters via Connectors, and executes the ETL flow.

Stateless Design

OpenETL is intentionally stateless by design, which means it doesn't keep track of previous executions, sync states, or processed records internally. This design choice offers several benefits:

  • Simplicity: No hidden state means predictable behavior and easier debugging
  • Portability: Run pipelines anywhere without needing a state database
  • Control: You determine exactly how and where state is managed
  • Scalability: Distribute workloads across multiple instances without shared state

For incremental data syncs (e.g., fetching only new sales records since last run), you'll need to:

  1. Track your own sync state: Store timestamps, IDs, or page markers of the last processed records
  2. Pass sync parameters: Include these markers in your pipeline configuration
// Example: Incremental sync with custom state tracking
const lastSyncTime = await db.getSyncState('sales-sync'); // Your state storage

orchestrator.runPipeline({
    id: 'incremental-sales',
    source: {
        adapter_id: 'hubspot',
        endpoint_id: 'deals',
        credential_id: 'hs-auth',
        fields: ['amount', 'closedate'],
        filters: [
            'time_inserted': lastSyncTime
        ]
    },
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'sales'
        }
    },
    // Update your sync state after successful completion
    oncomplete: async (result) => {
        if (result.success) {
            const newSyncTime = new Date().toISOString();
            await db.updateSyncState('sales-sync', newSyncTime);
        }
    }
});

The stateless approach gives you flexibility in how you implement incremental syncs. You can store your sync state in your own database, file system, or configuration service based on your application's needs.

How It Works

The Orchestrator adapts to your pipeline's needs—here's how:

Download and Upload

Extracts data from a source (e.g., HubSpot) via an adapter, transforms it if needed, and loads it to a target (e.g., PostgreSQL). It's the full ETL cycle.

Moves data end-to-end—extracting contacts and loading them into a table.

import Orchestrator from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';

const vault = {
    'hs-auth': {
        type: 'oauth2',
        credentials: { /* ... */ }
    },
    'pg-auth': {
        type: 'basic',
        credentials: { /* ... */ }
    },
};
const orchestrator = Orchestrator(vault, {
    hubspot,
    postgresql
});

orchestrator.runPipeline({
    id: 'hs-to-pg',
    source: {
        adapter_id: 'hubspot',
        endpoint_id: 'contacts',
        credential_id: 'hs-auth',
        fields: ['firstname']
    },
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'contacts'
        }
    },
});

Download Only

Pulls data from a source adapter and stops—ideal for exporting or checking data without loading it elsewhere.

Fetches data and hands it off—no target needed, just raw output.

orchestrator.runPipeline({
    id: 'hs-export',
    source: {
        adapter_id: 'hubspot',
        endpoint_id: 'contacts',
        credential_id: 'hs-auth',
        fields: ['email']
    },
    onload: data => console.log(data), // Handle downloaded data
});

Upload Only

Takes pre-provided data (no source adapter) and pushes it to a target—perfect for loading existing datasets.

Pushes your data straight to a target—skip the extraction step.

orchestrator.runPipeline({
    id: 'static-to-pg',
    data: [{
        firstname: 'Jane'
    }],
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'contacts'
        }
    },
});

Data Provided to Upload

Pairs pre-provided data with transformations, tweaking it before loading to a target—customize first, then save.

Takes your data, tweaks it (e.g., uppercase), then loads it.

orchestrator.runPipeline({
    id: 'transform-to-pg',
    data: [{
        name: 'john doe'
    }],
    target: {
        adapter_id: 'postgresql',
        endpoint_id: 'table_insert',
        credential_id: 'pg-auth',
        config: {
            schema: 'public',
            table: 'contacts'
        },
        transform: [{
            type: 'uppercase',
            options: {
                field: 'name',
                to: 'firstname'
            }
        }],
    },
});

These flows show OpenETL's versatility—adapt it to your needs!

Dig deeper in Core Concepts!