Pipelines
Pipelines define ETL workflows in OpenETL, specifying data sources, targets, transformations, and execution parameters. This document covers pipeline configuration, execution modes, event handling, and advanced features.
Pipeline Definition
A pipeline is a configuration object that defines an ETL workflow, including:
- Source connector: Data extraction configuration
- Target connector: Data loading configuration
- Transformations: Data manipulation logic
- Error handling: Retry and failure behavior
- Rate limiting: API request management
- Event callbacks: Monitoring and logging hooks
Pipeline Execution
The Orchestrator executes pipelines through the following steps:
- Resolve credentials from the Vault using
credential_idreferences - Initialize adapters with connector configurations
- Execute data operations:
- Extract data from source (if specified)
- Apply transformations (if specified)
- Load data to target (if specified)
- Handle errors according to configuration
- Emit events for monitoring and logging
Pipeline Configuration Table
Below is a detailed table of all pipeline options, their types, and descriptions:
| Option | Type | Description |
|---|---|---|
id |
string |
Unique identifier for the pipeline. |
data? |
T[] |
Optional raw data provided for upload-only pipelines. |
source? |
Connector |
Specifies the source connector for data extraction. |
target? |
Connector |
Specifies the target connector for data loading. |
schedule? |
{ frequency: string; at: string } |
Optional scheduling (e.g., { frequency: 'daily', at: '02:00' }). |
logging? |
(event: PipelineEvent) => void |
Callback for logging pipeline events (e.g., start, error). |
onload? |
(data: T[]) => void |
Callback after data extraction (e.g., for download-only pipelines). |
onbeforesend? |
(data: T[]) => T[] | boolean | void |
Callback before uploading data, allowing modification or cancellation. |
error_handling? |
{ max_retries: number; retry_interval: number; fail_on_error: boolean } |
Configures retry logic and error behavior. |
rate_limiting? |
{ requests_per_second: number; concurrent_requests: number; max_retries_on_rate_limit: number } |
Manages rate limiting for API calls. |
Pipeline Use Cases
Pipelines can be tailored to various scenarios. Below are common use cases with practical examples.
Full ETL (Download and Upload)
Extracts data from a source, optionally transforms it, and loads it to a target.
Example: Sync HubSpot contacts to PostgreSQL.
orchestrator.runPipeline({
id: 'hs-to-pg',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['firstname', 'lastname'],
},
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
},
});
Data Extraction (Download Only)
Fetches data from a source and stops—ideal for exporting or inspecting data.
Example: Extract and log HubSpot contacts.
orchestrator.runPipeline({
id: 'hs-export',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['email'],
},
onload: data => console.log(data),
});
Data Loading (Upload Only)
Loads provided data to a target—perfect for migrating existing datasets.
Example: Load static data into PostgreSQL.
orchestrator.runPipeline({
id: 'static-to-pg',
data: [{ firstname: 'Jane' }],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
},
});
Data Transformation and Loading
Transforms provided data before loading it—customize first, then save.
Example: Uppercase names before loading.
orchestrator.runPipeline({
id: 'transform-to-pg',
data: [{ name: 'john doe' }],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
transform: [{ type: 'uppercase', options: { field: 'name', to: 'firstname' } }],
},
});
Logging and Debugging
Pipelines support robust logging for monitoring and troubleshooting.
- Enable Logging: Pass a callback to
loggingto capture pipeline events.
orchestrator.runPipeline({
id: 'my-pipeline',
source: { /* ... */ },
logging: event => console.log(event),
});
- Debugging Tips:
- Use
loggingto track events likestart,extract,load, anderror. - Check for authentication issues by verifying Vault credentials.
- Break down pipelines into smaller steps to isolate failures.
- Use
Pipeline Events
Pipelines emit events at key stages, which can be captured via the logging callback:
start: Pipeline begins execution.extract: Data extraction completes (withdataCount).transform: Data transformation occurs.load: Data loading completes.error: An error occurs (with error message).complete: Pipeline finishes successfully.info: Informational messages (e.g., rate limiting delays).
Example:
logging: event => {
if (event.type === 'error') {
console.error('Error:', event.message);
} else {
console.log(`${event.type}: ${event.message}`);
}
}
Advanced Usage
Pipelines support advanced configurations for complex workflows.
- Scheduling: Automate pipelines with
schedule.
orchestrator.runPipeline({
id: 'daily-sync',
source: { /* ... */ },
target: { /* ... */ },
schedule: { frequency: 'daily', at: '03:00' },
});
- Rate Limiting: Manage API usage with
rate_limiting.
orchestrator.runPipeline({
id: 'rate-limited-pipeline',
source: { /* ... */ },
rate_limiting: {
requests_per_second: 10,
concurrent_requests: 5,
max_retries_on_rate_limit: 3,
},
});
- Error Handling: Configure retries and failure behavior.
orchestrator.runPipeline({
id: 'retry-pipeline',
source: { /* ... */ },
error_handling: {
max_retries: 3,
retry_interval: 1000,
fail_on_error: false,
},
});
These configuration options enable robust, scalable pipeline implementations.
Additional Resources
- Transformations: Data transformation reference
- Error Handling: Error management and retry logic
- Rate Limiting: API rate limiting configuration