Building Type-Safe ETL Pipelines in TypeScript
A practical guide to implementing Extract, Transform, Load workflows with full type safety
Introduction
ETL (Extract, Transform, Load) is a fundamental pattern in data engineering. Whether you're syncing CRM data to a warehouse, migrating between databases, or integrating third-party APIs, ETL pipelines are everywhere. But building them in a type-safe, maintainable way has traditionally been challenging.
In this article, we'll explore how to implement ETL pipelines in TypeScript using OpenETL, a modern framework that brings type safety and abstraction to data integration.
What is ETL?
ETL consists of three phases:
- Extract: Pull data from a source (database, API, file)
- Transform: Clean, reshape, or enrich the data
- Load: Write the data to a destination
┌─────────┐ ┌───────────┐ ┌─────────┐
│ Extract │ ──▶ │ Transform │ ──▶ │ Load │
└─────────┘ └───────────┘ └─────────┘
│ │ │
▼ ▼ ▼
Database Rename Data
API Filter Warehouse
File Merge API
The Challenge with Traditional ETL
Traditional ETL implementations often suffer from:
- Tight coupling: Source-specific code mixed with business logic
- No type safety: Runtime errors from schema mismatches
- Code duplication: Similar patterns repeated for each integration
- Hard to test: Database connections embedded in business logic
The Adapter Pattern Solution
OpenETL solves these problems with the adapter pattern. Each data source implements a common interface, allowing you to swap sources without changing your pipeline logic.
// The adapter interface - all sources implement this
interface AdapterInstance {
connect(): Promise<void>;
disconnect(): Promise<void>;
download(options: PageOptions): Promise<{ data: any[] }>;
upload(data: any[]): Promise<void>;
}
This abstraction means your pipeline code doesn't care whether data comes from PostgreSQL, MongoDB, or a REST API.
Getting Started
Installation
npm install openetl
npm install @openetl/postgresql # or any adapter you need
Basic Pipeline
Here's a simple pipeline that downloads data from PostgreSQL:
import { Orchestrator, Vault, Pipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';
// 1. Define credentials in a vault
const vault: Vault = {
'my-database': {
id: 'my-database',
type: 'basic',
credentials: {
host: 'localhost',
port: '5432',
database: 'myapp',
username: 'user',
password: 'secret',
},
},
};
// 2. Register adapters
const adapters = { postgresql };
// 3. Define the pipeline
const pipeline: Pipeline = {
id: 'export-users',
source: {
id: 'source',
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'my-database',
fields: ['id', 'email', 'name', 'created_at'],
config: {
schema: 'public',
table: 'users',
},
},
};
// 4. Execute
const etl = Orchestrator(vault, adapters);
const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);
Working with Multiple Data Sources
The power of abstraction becomes clear when working with multiple sources. The same pipeline structure works regardless of the source:
import { postgresql } from '@openetl/postgresql';
import { mysql } from '@openetl/mysql';
import { mongodb } from '@openetl/mongodb';
import { hubspot } from '@openetl/hubspot';
const adapters = { postgresql, mysql, mongodb, hubspot };
// PostgreSQL source
const postgresSource = {
adapter_id: 'postgresql',
endpoint_id: 'table_query',
config: { schema: 'public', table: 'customers' },
};
// MySQL source - same structure, different adapter
const mysqlSource = {
adapter_id: 'mysql',
endpoint_id: 'table_query',
config: { database: 'sales', table: 'customers' },
};
// MongoDB source - still the same pattern
const mongoSource = {
adapter_id: 'mongodb',
endpoint_id: 'collection_query',
config: { table: 'customers' },
};
// HubSpot API - REST API, same interface
const hubspotSource = {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
};
Data Transformations
OpenETL includes 12 built-in transformation types. Transformations are applied in sequence during the pipeline execution:
const pipeline: Pipeline = {
id: 'transform-contacts',
source: {
// ... source config
transform: [
// Combine first and last name
{
type: 'concat',
options: {
properties: ['first_name', 'last_name'],
glue: ' ',
to: 'full_name',
},
},
// Normalize email to lowercase
{
type: 'lowercase',
options: { field: 'email' },
},
// Remove whitespace
{
type: 'trim',
options: { field: 'full_name' },
},
// Extract domain from email
{
type: 'extract',
options: {
field: 'email',
pattern: '@(.+)$',
to: 'email_domain',
},
},
],
},
};
Available Transformations
| Type | Description | Example |
|---|---|---|
concat |
Combine multiple fields | ['first', 'last'] → 'full_name' |
renameKey |
Copy/rename a field | 'old_field' → 'new_field' |
uppercase |
Convert to uppercase | 'hello' → 'HELLO' |
lowercase |
Convert to lowercase | 'HELLO' → 'hello' |
trim |
Remove whitespace | ' text ' → 'text' |
split |
Split into array | 'a,b,c' → ['a','b','c'] |
replace |
Regex replacement | 'foo' → 'bar' |
addPrefix |
Add prefix | '123' → 'ID-123' |
addSuffix |
Add suffix | 'file' → 'file.txt' |
toNumber |
Parse as number | '42' → 42 |
extract |
Extract substring | '[email protected]' → 'example.com' |
mergeObjects |
Combine fields into object | {a, b} → {merged: {a, b}} |
Filtering and Sorting
Apply filters and sorting at the source level for efficient data retrieval:
const pipeline: Pipeline = {
id: 'active-premium-users',
source: {
// ...
filters: [
{ field: 'status', operator: '=', value: 'active' },
{ field: 'plan', operator: '=', value: 'premium' },
{ field: 'created_at', operator: '>=', value: '2024-01-01' },
],
sort: [
{ field: 'created_at', type: 'desc' },
{ field: 'name', type: 'asc' },
],
limit: 1000,
},
};
Filter Groups
For complex conditions, use filter groups with AND/OR logic:
filters: [
{
op: 'OR',
filters: [
{ field: 'status', operator: '=', value: 'active' },
{ field: 'status', operator: '=', value: 'pending' },
],
},
{ field: 'email_verified', operator: '=', value: 'true' },
]
// SQL: WHERE (status = 'active' OR status = 'pending') AND email_verified = true
Source to Target Pipelines
Move data between systems by defining both source and target:
const pipeline: Pipeline = {
id: 'sync-crm-to-warehouse',
source: {
id: 'hubspot-source',
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hubspot-api',
fields: ['email', 'firstname', 'lastname', 'company'],
pagination: { type: 'cursor', itemsPerPage: 100 },
},
target: {
id: 'postgres-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'warehouse-db',
fields: ['email', 'first_name', 'last_name', 'company'],
config: {
schema: 'analytics',
table: 'crm_contacts',
},
},
};
const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} contacts to warehouse`);
Pagination Strategies
Different APIs use different pagination methods. OpenETL supports three strategies:
// Offset-based (SQL databases)
pagination: { type: 'offset', itemsPerPage: 100 }
// Cursor-based (modern APIs like HubSpot, Stripe)
pagination: { type: 'cursor', itemsPerPage: 100 }
// Page-based (traditional REST APIs)
pagination: { type: 'page', itemsPerPage: 50 }
Error Handling
Configure retry behavior for resilient pipelines:
const pipeline: Pipeline = {
id: 'resilient-sync',
source: { /* ... */ },
error_handling: {
max_retries: 3,
retry_interval: 1000, // ms between retries
fail_on_error: true, // stop on first error
},
};
OpenETL uses exponential backoff with jitter to prevent thundering herd problems when retrying.
Pipeline Validation
Validate pipelines before execution to catch configuration errors early:
import { validatePipeline } from 'openetl';
const result = validatePipeline(pipeline, adapters, vault);
if (!result.valid) {
console.error('Pipeline validation failed:');
result.errors.forEach(err => console.error(` - ${err}`));
} else {
// Safe to run
await etl.runPipeline(pipeline);
}
Building Custom Adapters
Create adapters for any data source by implementing the adapter interface:
import { Adapter, AdapterInstance, Connector, AuthConfig } from 'openetl';
const MyCustomAdapter = {
id: 'my-adapter',
name: 'My Custom Adapter',
type: 'http',
action: ['download', 'upload'],
credential_type: 'api_key',
base_url: 'https://api.example.com',
endpoints: [
{
id: 'users',
path: '/users',
method: 'GET',
description: 'Fetch users',
supported_actions: ['download'],
},
],
};
function myAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
return {
async connect() {
// Validate connection
},
async disconnect() {
// Cleanup
},
async download({ limit, offset }) {
const response = await fetch(
`https://api.example.com/users?limit=${limit}&offset=${offset}`,
{ headers: { 'Authorization': `Bearer ${auth.credentials.api_key}` } }
);
const data = await response.json();
return { data };
},
async upload(data) {
await fetch('https://api.example.com/users', {
method: 'POST',
body: JSON.stringify(data),
});
},
};
}
export { myAdapter, MyCustomAdapter };
Type Safety Benefits
TypeScript provides compile-time checking for your pipelines:
// Type error: 'invalid' is not a valid pagination type
const pipeline: Pipeline = {
source: {
pagination: { type: 'invalid' }, // ❌ TypeScript error
},
};
// Type error: missing required field
const auth: BasicAuth = {
id: 'db',
type: 'basic',
credentials: {
username: 'user',
// password: missing // ❌ TypeScript error
},
};
Conclusion
OpenETL brings the benefits of TypeScript to ETL development:
- Type safety catches configuration errors at compile time
- Adapter abstraction decouples pipelines from data sources
- Built-in transformations handle common data manipulation
- Flexible pagination works with any API pattern
- Error handling with exponential backoff for resilience
By using these patterns, you can build maintainable, testable ETL pipelines that scale with your data integration needs.
Resources
OpenETL is open source under the MIT license. Contributions welcome!