Products

Pipelines

Pipelines define ETL workflows in OpenETL, specifying data sources, targets, transformations, and execution parameters. This document covers pipeline configuration, execution modes, event handling, and advanced features.

Pipeline Definition

A pipeline is a configuration object that defines an ETL workflow, including:

  • Source connector: Data extraction configuration
  • Target connector: Data loading configuration
  • Transformations: Data manipulation logic
  • Error handling: Retry and failure behavior
  • Rate limiting: API request management
  • Event callbacks: Monitoring and logging hooks

Pipeline Execution

The Orchestrator executes pipelines through the following steps:

  1. Resolve credentials from the Vault using credential_id references
  2. Initialize adapters with connector configurations
  3. Execute data operations:
    • Extract data from source (if specified)
    • Apply transformations (if specified)
    • Load data to target (if specified)
  4. Handle errors according to configuration
  5. Emit events for monitoring and logging

Pipeline Configuration Table

Below is a detailed table of all pipeline options, their types, and descriptions:

Option Type Description
id string Unique identifier for the pipeline.
data? T[] Optional raw data provided for upload-only pipelines.
source? Connector Specifies the source connector for data extraction.
target? Connector Specifies the target connector for data loading.
schedule? { frequency: string; at: string } Optional scheduling (e.g., { frequency: 'daily', at: '02:00' }).
logging? (event: PipelineEvent) => void Callback for logging pipeline events (e.g., start, error).
onload? (data: T[]) => void Callback after data extraction (e.g., for download-only pipelines).
onbeforesend? (data: T[]) => T[] | boolean | void Callback before uploading data, allowing modification or cancellation.
error_handling? { max_retries: number; retry_interval: number; fail_on_error: boolean } Configures retry logic and error behavior.
rate_limiting? { requests_per_second: number; concurrent_requests: number; max_retries_on_rate_limit: number } Manages rate limiting for API calls.

Pipeline Use Cases

Pipelines can be tailored to various scenarios. Below are common use cases with practical examples.

Full ETL (Download and Upload)

Extracts data from a source, optionally transforms it, and loads it to a target.

Example: Sync HubSpot contacts to PostgreSQL.

orchestrator.runPipeline({
  id: 'hs-to-pg',
  source: {
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hs-auth',
    fields: ['firstname', 'lastname'],
  },
  target: {
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
  },
});

Data Extraction (Download Only)

Fetches data from a source and stops—ideal for exporting or inspecting data.

Example: Extract and log HubSpot contacts.

orchestrator.runPipeline({
  id: 'hs-export',
  source: {
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hs-auth',
    fields: ['email'],
  },
  onload: data => console.log(data),
});

Data Loading (Upload Only)

Loads provided data to a target—perfect for migrating existing datasets.

Example: Load static data into PostgreSQL.

orchestrator.runPipeline({
  id: 'static-to-pg',
  data: [{ firstname: 'Jane' }],
  target: {
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
  },
});

Data Transformation and Loading

Transforms provided data before loading it—customize first, then save.

Example: Uppercase names before loading.

orchestrator.runPipeline({
  id: 'transform-to-pg',
  data: [{ name: 'john doe' }],
  target: {
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
    transform: [{ type: 'uppercase', options: { field: 'name', to: 'firstname' } }],
  },
});

Logging and Debugging

Pipelines support robust logging for monitoring and troubleshooting.

  • Enable Logging: Pass a callback to logging to capture pipeline events.
orchestrator.runPipeline({
  id: 'my-pipeline',
  source: { /* ... */ },
  logging: event => console.log(event),
});
  • Debugging Tips:
    • Use logging to track events like start, extract, load, and error.
    • Check for authentication issues by verifying Vault credentials.
    • Break down pipelines into smaller steps to isolate failures.

Pipeline Events

Pipelines emit events at key stages, which can be captured via the logging callback:

  • start: Pipeline begins execution.
  • extract: Data extraction completes (with dataCount).
  • transform: Data transformation occurs.
  • load: Data loading completes.
  • error: An error occurs (with error message).
  • complete: Pipeline finishes successfully.
  • info: Informational messages (e.g., rate limiting delays).

Example:

logging: event => {
  if (event.type === 'error') {
    console.error('Error:', event.message);
  } else {
    console.log(`${event.type}: ${event.message}`);
  }
}

Advanced Usage

Pipelines support advanced configurations for complex workflows.

  • Scheduling: Automate pipelines with schedule.
orchestrator.runPipeline({
  id: 'daily-sync',
  source: { /* ... */ },
  target: { /* ... */ },
  schedule: { frequency: 'daily', at: '03:00' },
});
  • Rate Limiting: Manage API usage with rate_limiting.
orchestrator.runPipeline({
  id: 'rate-limited-pipeline',
  source: { /* ... */ },
  rate_limiting: {
    requests_per_second: 10,
    concurrent_requests: 5,
    max_retries_on_rate_limit: 3,
  },
});
  • Error Handling: Configure retries and failure behavior.
orchestrator.runPipeline({
  id: 'retry-pipeline',
  source: { /* ... */ },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,
    fail_on_error: false,
  },
});

These configuration options enable robust, scalable pipeline implementations.

Additional Resources