Getting Started
This guide covers OpenETL installation, configuration, and implementing your first data pipeline.
Prerequisites
- Node.js 18.0 or higher
- npm or yarn package manager
- TypeScript 4.7+ (optional, for TypeScript projects)
Installation
Install Core Package
npm install openetl
Install Adapters
Install the adapters you need for your data sources:
# Database adapters
npm install @openetl/postgresql
npm install @openetl/mysql
npm install @openetl/mongodb
# API adapters
npm install @openetl/hubspot
npm install @openetl/stripe
npm install @openetl/xero
npm install @openetl/google-ads
TypeScript Configuration
For TypeScript projects, ensure your tsconfig.json includes:
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"strict": true,
"esModuleInterop": true
}
}
Basic Concepts
Vault
The Vault stores credentials for your data sources:
import { Vault } from 'openetl';
const vault: Vault = {
'my-database': {
id: 'my-database',
type: 'basic',
credentials: {
host: 'localhost',
database: 'myapp',
username: 'user',
password: 'secret',
},
},
'my-api': {
id: 'my-api',
type: 'oauth2',
credentials: {
client_id: 'xxx',
client_secret: 'xxx',
access_token: 'xxx',
refresh_token: 'xxx',
},
},
};
Orchestrator
The Orchestrator manages adapters and executes pipelines:
import { Orchestrator } from 'openetl';
import { postgresql } from '@openetl/postgresql';
import { hubspot } from '@openetl/hubspot';
const etl = Orchestrator(vault, { postgresql, hubspot });
Pipeline
A Pipeline defines the data flow from source to target:
import { Pipeline } from 'openetl';
const pipeline: Pipeline = {
id: 'my-pipeline',
source: { /* source configuration */ },
target: { /* target configuration */ },
};
First Pipeline
This example synchronizes HubSpot contacts to a PostgreSQL database.
Step 1: Configure Credentials
import { Orchestrator, Pipeline, Vault } from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';
const vault: Vault = {
'hubspot': {
id: 'hubspot',
type: 'oauth2',
credentials: {
client_id: process.env.HUBSPOT_CLIENT_ID!,
client_secret: process.env.HUBSPOT_CLIENT_SECRET!,
access_token: process.env.HUBSPOT_ACCESS_TOKEN!,
refresh_token: process.env.HUBSPOT_REFRESH_TOKEN!,
},
},
'database': {
id: 'database',
type: 'basic',
credentials: {
host: process.env.DB_HOST || 'localhost',
port: process.env.DB_PORT || '5432',
database: process.env.DB_NAME!,
username: process.env.DB_USER!,
password: process.env.DB_PASSWORD!,
},
},
};
Step 2: Create Orchestrator
const etl = Orchestrator(vault, { hubspot, postgresql });
Step 3: Define Pipeline
const pipeline: Pipeline = {
id: 'hubspot-to-postgres',
source: {
id: 'contacts-source',
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hubspot',
fields: ['email', 'firstname', 'lastname', 'company'],
pagination: { type: 'cursor', itemsPerPage: 100 },
},
target: {
id: 'contacts-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'database',
config: {
schema: 'public',
table: 'contacts',
},
fields: ['email', 'first_name', 'last_name', 'company'],
},
error_handling: {
max_retries: 3,
retry_interval: 1000,
fail_on_error: true,
},
};
Step 4: Execute Pipeline
async function main() {
try {
const result = await etl.runPipeline(pipeline);
console.log(`Synchronized ${result.data.length} contacts`);
} catch (error) {
console.error('Pipeline failed:', error);
process.exit(1);
}
}
main();
Complete Example
import { Orchestrator, Pipeline, Vault } from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';
const vault: Vault = {
'hubspot': {
id: 'hubspot',
type: 'oauth2',
credentials: {
client_id: process.env.HUBSPOT_CLIENT_ID!,
client_secret: process.env.HUBSPOT_CLIENT_SECRET!,
access_token: process.env.HUBSPOT_ACCESS_TOKEN!,
refresh_token: process.env.HUBSPOT_REFRESH_TOKEN!,
},
},
'database': {
id: 'database',
type: 'basic',
credentials: {
host: process.env.DB_HOST || 'localhost',
database: process.env.DB_NAME!,
username: process.env.DB_USER!,
password: process.env.DB_PASSWORD!,
},
},
};
const etl = Orchestrator(vault, { hubspot, postgresql });
const pipeline: Pipeline = {
id: 'hubspot-to-postgres',
source: {
id: 'contacts-source',
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hubspot',
fields: ['email', 'firstname', 'lastname'],
pagination: { type: 'cursor', itemsPerPage: 100 },
},
target: {
id: 'contacts-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'database',
config: { schema: 'public', table: 'contacts' },
fields: ['email', 'first_name', 'last_name'],
},
};
etl.runPipeline(pipeline)
.then(result => console.log(`Synchronized ${result.data.length} contacts`))
.catch(error => console.error('Pipeline failed:', error));
Running the Pipeline
Using TypeScript
npx tsx pipeline.ts
Using Compiled JavaScript
npx tsc && node dist/pipeline.js
Validation
Use validatePipeline to check configuration before execution:
import { validatePipeline } from 'openetl';
const validation = validatePipeline(pipeline, { hubspot, postgresql }, vault);
if (!validation.valid) {
console.error('Pipeline validation failed:');
validation.errors.forEach(error => console.error(` - ${error}`));
process.exit(1);
}
// Check warnings even if valid
if (validation.warnings.length > 0) {
console.warn('Warnings:');
validation.warnings.forEach(warning => console.warn(` - ${warning}`));
}
The validation function checks:
- Pipeline has source data or source connector
- Adapter references exist
- Credential references exist in vault
- Filter configurations are valid
- Error handling and rate limiting settings are valid
Next Steps
- Adapters - Available adapters and their configuration
- Pipeline Configuration - Pipeline options, callbacks, and transformations
- Transformations - Data transformation reference
- Error Handling - AdapterError, retry logic, and debugging
- Security - SQL injection protection and credential management
- Custom Adapters - Building custom adapters
- API Reference - Complete TypeScript API reference