Getting Started

This guide covers OpenETL installation, configuration, and implementing your first data pipeline.

Prerequisites

  • Node.js 18.0 or higher
  • npm or yarn package manager
  • TypeScript 4.7+ (optional, for TypeScript projects)

Installation

Install Core Package

npm install openetl

Install Adapters

Install the adapters you need for your data sources:

# Database adapters
npm install @openetl/postgresql
npm install @openetl/mysql
npm install @openetl/mongodb

# API adapters
npm install @openetl/hubspot
npm install @openetl/stripe
npm install @openetl/xero
npm install @openetl/google-ads

TypeScript Configuration

For TypeScript projects, ensure your tsconfig.json includes:

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "strict": true,
    "esModuleInterop": true
  }
}

Basic Concepts

Vault

The Vault stores credentials for your data sources:

import { Vault } from 'openetl';

const vault: Vault = {
  'my-database': {
    id: 'my-database',
    type: 'basic',
    credentials: {
      host: 'localhost',
      database: 'myapp',
      username: 'user',
      password: 'secret',
    },
  },
  'my-api': {
    id: 'my-api',
    type: 'oauth2',
    credentials: {
      client_id: 'xxx',
      client_secret: 'xxx',
      access_token: 'xxx',
      refresh_token: 'xxx',
    },
  },
};

Orchestrator

The Orchestrator manages adapters and executes pipelines:

import { Orchestrator } from 'openetl';
import { postgresql } from '@openetl/postgresql';
import { hubspot } from '@openetl/hubspot';

const etl = Orchestrator(vault, { postgresql, hubspot });

Pipeline

A Pipeline defines the data flow from source to target:

import { Pipeline } from 'openetl';

const pipeline: Pipeline = {
  id: 'my-pipeline',
  source: { /* source configuration */ },
  target: { /* target configuration */ },
};

First Pipeline

This example synchronizes HubSpot contacts to a PostgreSQL database.

Step 1: Configure Credentials

import { Orchestrator, Pipeline, Vault } from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';

const vault: Vault = {
  'hubspot': {
    id: 'hubspot',
    type: 'oauth2',
    credentials: {
      client_id: process.env.HUBSPOT_CLIENT_ID!,
      client_secret: process.env.HUBSPOT_CLIENT_SECRET!,
      access_token: process.env.HUBSPOT_ACCESS_TOKEN!,
      refresh_token: process.env.HUBSPOT_REFRESH_TOKEN!,
    },
  },
  'database': {
    id: 'database',
    type: 'basic',
    credentials: {
      host: process.env.DB_HOST || 'localhost',
      port: process.env.DB_PORT || '5432',
      database: process.env.DB_NAME!,
      username: process.env.DB_USER!,
      password: process.env.DB_PASSWORD!,
    },
  },
};

Step 2: Create Orchestrator

const etl = Orchestrator(vault, { hubspot, postgresql });

Step 3: Define Pipeline

const pipeline: Pipeline = {
  id: 'hubspot-to-postgres',
  source: {
    id: 'contacts-source',
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hubspot',
    fields: ['email', 'firstname', 'lastname', 'company'],
    pagination: { type: 'cursor', itemsPerPage: 100 },
  },
  target: {
    id: 'contacts-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'database',
    config: {
      schema: 'public',
      table: 'contacts',
    },
    fields: ['email', 'first_name', 'last_name', 'company'],
  },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,
    fail_on_error: true,
  },
};

Step 4: Execute Pipeline

async function main() {
  try {
    const result = await etl.runPipeline(pipeline);
    console.log(`Synchronized ${result.data.length} contacts`);
  } catch (error) {
    console.error('Pipeline failed:', error);
    process.exit(1);
  }
}

main();

Complete Example

import { Orchestrator, Pipeline, Vault } from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';

const vault: Vault = {
  'hubspot': {
    id: 'hubspot',
    type: 'oauth2',
    credentials: {
      client_id: process.env.HUBSPOT_CLIENT_ID!,
      client_secret: process.env.HUBSPOT_CLIENT_SECRET!,
      access_token: process.env.HUBSPOT_ACCESS_TOKEN!,
      refresh_token: process.env.HUBSPOT_REFRESH_TOKEN!,
    },
  },
  'database': {
    id: 'database',
    type: 'basic',
    credentials: {
      host: process.env.DB_HOST || 'localhost',
      database: process.env.DB_NAME!,
      username: process.env.DB_USER!,
      password: process.env.DB_PASSWORD!,
    },
  },
};

const etl = Orchestrator(vault, { hubspot, postgresql });

const pipeline: Pipeline = {
  id: 'hubspot-to-postgres',
  source: {
    id: 'contacts-source',
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hubspot',
    fields: ['email', 'firstname', 'lastname'],
    pagination: { type: 'cursor', itemsPerPage: 100 },
  },
  target: {
    id: 'contacts-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'database',
    config: { schema: 'public', table: 'contacts' },
    fields: ['email', 'first_name', 'last_name'],
  },
};

etl.runPipeline(pipeline)
  .then(result => console.log(`Synchronized ${result.data.length} contacts`))
  .catch(error => console.error('Pipeline failed:', error));

Running the Pipeline

Using TypeScript

npx tsx pipeline.ts

Using Compiled JavaScript

npx tsc && node dist/pipeline.js

Validation

Use validatePipeline to check configuration before execution:

import { validatePipeline } from 'openetl';

const validation = validatePipeline(pipeline, { hubspot, postgresql }, vault);

if (!validation.valid) {
  console.error('Pipeline validation failed:');
  validation.errors.forEach(error => console.error(`  - ${error}`));
  process.exit(1);
}

// Check warnings even if valid
if (validation.warnings.length > 0) {
  console.warn('Warnings:');
  validation.warnings.forEach(warning => console.warn(`  - ${warning}`));
}

The validation function checks:

  • Pipeline has source data or source connector
  • Adapter references exist
  • Credential references exist in vault
  • Filter configurations are valid
  • Error handling and rate limiting settings are valid

Next Steps