Pipelines in OpenETL
Pipelines are the backbone of OpenETL, orchestrating the flow of data from extraction to loading. This section provides a deep dive into pipelines, including their role, configuration, practical use cases, logging, events, and advanced features.
What is a Pipeline?
A pipeline defines an ETL (Extract, Transform, Load) task in OpenETL, specifying:
- The source (where data comes from),
- The target (where data goes),
- Optional transformations, scheduling, logging, and error handling.
Pipelines are versatile, supporting full ETL workflows, data extraction, data loading, or transformation-only tasks.
How Pipelines Work
The Orchestrator reads the pipeline configuration and:
- Fetches credentials from the Vault.
- Sets up adapters via connectors.
- Executes the flow:
- Extracts data from the source (if specified).
- Applies transformations (if specified).
- Loads data to the target (if specified).
- Manages logging, events, and error handling as configured.
This flexibility allows pipelines to adapt to a variety of data integration needs.
Pipeline Configuration Table
Below is a detailed table of all pipeline options, their types, and descriptions:
Option | Type | Description |
---|---|---|
id |
string |
Unique identifier for the pipeline. |
data? |
T[] |
Optional raw data provided for upload-only pipelines. |
source? |
Connector |
Specifies the source connector for data extraction. |
target? |
Connector |
Specifies the target connector for data loading. |
schedule? |
{ frequency: string; at: string } |
Optional scheduling (e.g., { frequency: 'daily', at: '02:00' } ). |
logging? |
(event: PipelineEvent) => void |
Callback for logging pipeline events (e.g., start, error). |
onload? |
(data: T[]) => void |
Callback after data extraction (e.g., for download-only pipelines). |
onbeforesend? |
(data: T[]) => T[] | boolean | void |
Callback before uploading data, allowing modification or cancellation. |
error_handling? |
{ max_retries: number; retry_interval: number; fail_on_error: boolean } |
Configures retry logic and error behavior. |
rate_limiting? |
{ requests_per_second: number; concurrent_requests: number; max_retries_on_rate_limit: number } |
Manages rate limiting for API calls. |
Pipeline Use Cases
Pipelines can be tailored to various scenarios. Below are common use cases with practical examples.
Full ETL (Download and Upload)
Extracts data from a source, optionally transforms it, and loads it to a target.
Example: Sync HubSpot contacts to PostgreSQL.
orchestrator.runPipeline({
id: 'hs-to-pg',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['firstname', 'lastname'],
},
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
},
});
Data Extraction (Download Only)
Fetches data from a source and stops—ideal for exporting or inspecting data.
Example: Extract and log HubSpot contacts.
orchestrator.runPipeline({
id: 'hs-export',
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['email'],
},
onload: data => console.log(data),
});
Data Loading (Upload Only)
Loads provided data to a target—perfect for migrating existing datasets.
Example: Load static data into PostgreSQL.
orchestrator.runPipeline({
id: 'static-to-pg',
data: [{ firstname: 'Jane' }],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
},
});
Data Transformation and Loading
Transforms provided data before loading it—customize first, then save.
Example: Uppercase names before loading.
orchestrator.runPipeline({
id: 'transform-to-pg',
data: [{ name: 'john doe' }],
target: {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
transform: [{ type: 'uppercase', options: { field: 'name', to: 'firstname' } }],
},
});
Logging and Debugging
Pipelines support robust logging for monitoring and troubleshooting.
-
Enable Logging: Pass a callback to
logging
to capture pipeline events.
orchestrator.runPipeline({
id: 'my-pipeline',
source: { /* ... */ },
logging: event => console.log(event),
});
-
Debugging Tips:
- Use
logging
to track events likestart
,extract
,load
, anderror
. - Check for authentication issues by verifying Vault credentials.
- Break down pipelines into smaller steps to isolate failures.
- Use
Pipeline Events
Pipelines emit events at key stages, which can be captured via the logging
callback:
-
start
: Pipeline begins execution. -
extract
: Data extraction completes (withdataCount
). -
transform
: Data transformation occurs. -
load
: Data loading completes. -
error
: An error occurs (with error message). -
complete
: Pipeline finishes successfully. -
info
: Informational messages (e.g., rate limiting delays).
Example:
logging: event => {
if (event.type === 'error') {
console.error('Error:', event.message);
} else {
console.log(`${event.type}: ${event.message}`);
}
}
Advanced Usage
Pipelines support advanced configurations for complex workflows.
-
Scheduling: Automate pipelines with
schedule
.
orchestrator.runPipeline({
id: 'daily-sync',
source: { /* ... */ },
target: { /* ... */ },
schedule: { frequency: 'daily', at: '03:00' },
});
-
Rate Limiting: Manage API usage with
rate_limiting
.
orchestrator.runPipeline({
id: 'rate-limited-pipeline',
source: { /* ... */ },
rate_limiting: {
requests_per_second: 10,
concurrent_requests: 5,
max_retries_on_rate_limit: 3,
},
});
- Error Handling: Configure retries and failure behavior.
orchestrator.runPipeline({
id: 'retry-pipeline',
source: { /* ... */ },
error_handling: {
max_retries: 3,
retry_interval: 1000,
fail_on_error: false,
},
});
These features ensure pipelines are robust, scalable, and tailored to your needs.
Next, explore Transformations!