Products

Pipelines in OpenETL

Pipelines are the backbone of OpenETL, orchestrating the flow of data from extraction to loading. This section provides a deep dive into pipelines, including their role, configuration, practical use cases, logging, events, and advanced features.

What is a Pipeline?

A pipeline defines an ETL (Extract, Transform, Load) task in OpenETL, specifying:

  • The source (where data comes from),
  • The target (where data goes),
  • Optional transformations, scheduling, logging, and error handling.

Pipelines are versatile, supporting full ETL workflows, data extraction, data loading, or transformation-only tasks.

How Pipelines Work

The Orchestrator reads the pipeline configuration and:

  1. Fetches credentials from the Vault.
  2. Sets up adapters via connectors.
  3. Executes the flow:
    • Extracts data from the source (if specified).
    • Applies transformations (if specified).
    • Loads data to the target (if specified).
  4. Manages logging, events, and error handling as configured.

This flexibility allows pipelines to adapt to a variety of data integration needs.

Pipeline Configuration Table

Below is a detailed table of all pipeline options, their types, and descriptions:

Option Type Description
id string Unique identifier for the pipeline.
data? T[] Optional raw data provided for upload-only pipelines.
source? Connector Specifies the source connector for data extraction.
target? Connector Specifies the target connector for data loading.
schedule? { frequency: string; at: string } Optional scheduling (e.g., { frequency: 'daily', at: '02:00' }).
logging? (event: PipelineEvent) => void Callback for logging pipeline events (e.g., start, error).
onload? (data: T[]) => void Callback after data extraction (e.g., for download-only pipelines).
onbeforesend? (data: T[]) => T[] | boolean | void Callback before uploading data, allowing modification or cancellation.
error_handling? { max_retries: number; retry_interval: number; fail_on_error: boolean } Configures retry logic and error behavior.
rate_limiting? { requests_per_second: number; concurrent_requests: number; max_retries_on_rate_limit: number } Manages rate limiting for API calls.

Pipeline Use Cases

Pipelines can be tailored to various scenarios. Below are common use cases with practical examples.

Full ETL (Download and Upload)

Extracts data from a source, optionally transforms it, and loads it to a target.

Example: Sync HubSpot contacts to PostgreSQL.

orchestrator.runPipeline({
  id: 'hs-to-pg',
  source: {
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hs-auth',
    fields: ['firstname', 'lastname'],
  },
  target: {
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
  },
});

Data Extraction (Download Only)

Fetches data from a source and stops—ideal for exporting or inspecting data.

Example: Extract and log HubSpot contacts.

orchestrator.runPipeline({
  id: 'hs-export',
  source: {
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hs-auth',
    fields: ['email'],
  },
  onload: data => console.log(data),
});

Data Loading (Upload Only)

Loads provided data to a target—perfect for migrating existing datasets.

Example: Load static data into PostgreSQL.

orchestrator.runPipeline({
  id: 'static-to-pg',
  data: [{ firstname: 'Jane' }],
  target: {
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
  },
});

Data Transformation and Loading

Transforms provided data before loading it—customize first, then save.

Example: Uppercase names before loading.

orchestrator.runPipeline({
  id: 'transform-to-pg',
  data: [{ name: 'john doe' }],
  target: {
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
    transform: [{ type: 'uppercase', options: { field: 'name', to: 'firstname' } }],
  },
});

Logging and Debugging

Pipelines support robust logging for monitoring and troubleshooting.

  • Enable Logging: Pass a callback to logging to capture pipeline events.
orchestrator.runPipeline({
  id: 'my-pipeline',
  source: { /* ... */ },
  logging: event => console.log(event),
});
  • Debugging Tips:
    • Use logging to track events like start, extract, load, and error.
    • Check for authentication issues by verifying Vault credentials.
    • Break down pipelines into smaller steps to isolate failures.

Pipeline Events

Pipelines emit events at key stages, which can be captured via the logging callback:

  • start: Pipeline begins execution.
  • extract: Data extraction completes (with dataCount).
  • transform: Data transformation occurs.
  • load: Data loading completes.
  • error: An error occurs (with error message).
  • complete: Pipeline finishes successfully.
  • info: Informational messages (e.g., rate limiting delays).

Example:

logging: event => {
  if (event.type === 'error') {
    console.error('Error:', event.message);
  } else {
    console.log(`${event.type}: ${event.message}`);
  }
}

Advanced Usage

Pipelines support advanced configurations for complex workflows.

  • Scheduling: Automate pipelines with schedule.
orchestrator.runPipeline({
  id: 'daily-sync',
  source: { /* ... */ },
  target: { /* ... */ },
  schedule: { frequency: 'daily', at: '03:00' },
});
  • Rate Limiting: Manage API usage with rate_limiting.
orchestrator.runPipeline({
  id: 'rate-limited-pipeline',
  source: { /* ... */ },
  rate_limiting: {
    requests_per_second: 10,
    concurrent_requests: 5,
    max_retries_on_rate_limit: 3,
  },
});
  • Error Handling: Configure retries and failure behavior.
orchestrator.runPipeline({
  id: 'retry-pipeline',
  source: { /* ... */ },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,
    fail_on_error: false,
  },
});

These features ensure pipelines are robust, scalable, and tailored to your needs.

Next, explore Transformations!