Error Handling

OpenETL provides comprehensive error handling for pipeline execution. This document covers the AdapterError class, retry logic with exponential backoff, and debugging strategies.

Overview

Error handling in OpenETL manages failures during pipeline execution including network errors, authentication failures, rate limiting, and data validation errors. The framework provides:

  • AdapterError - Standardized error class with error codes and retry information
  • Configurable Retries - Linear or exponential backoff strategies
  • Error Events - Callbacks for logging and monitoring
  • Graceful Degradation - Continue or halt based on configuration

AdapterError Class

The AdapterError class provides structured error information for adapter operations:

import { AdapterError } from 'openetl';

try {
  const result = await adapter.download({ limit: 100, offset: 0 });
} catch (error) {
  if (error instanceof AdapterError) {
    console.log(error.code);      // Error code
    console.log(error.retryable); // Whether operation can be retried
    console.log(error.context);   // Additional context
    console.log(error.cause);     // Original error if wrapped
  }
}

Error Codes

Code Description Typically Retryable
CONNECTION_FAILED Failed to connect to service Yes
AUTHENTICATION_FAILED Invalid credentials No
VALIDATION_FAILED Invalid data or configuration No
DOWNLOAD_FAILED Error fetching data Depends
UPLOAD_FAILED Error sending data Depends
TIMEOUT Operation timed out Yes
RATE_LIMITED Rate limit exceeded (HTTP 429) Yes
NOT_FOUND Resource not found (HTTP 404) No
INVALID_CONFIG Invalid adapter configuration No
UNKNOWN Unclassified error No

Creating AdapterError

import { AdapterError } from 'openetl';

// Basic error
throw new AdapterError(
  'Connection timeout',
  'CONNECTION_FAILED',
  true  // retryable
);

// Error with context
throw new AdapterError(
  'Rate limit exceeded',
  'RATE_LIMITED',
  true,
  { retryAfter: 60, endpoint: '/api/contacts' }
);

// Wrap existing error
throw new AdapterError(
  'Download failed',
  'DOWNLOAD_FAILED',
  false,
  { table: 'users' },
  originalError  // cause
);

Creating from Unknown Errors

The AdapterError.from() static method creates AdapterError instances from unknown error types, automatically detecting HTTP status codes:

try {
  const response = await axios.get(url);
} catch (error) {
  // Automatically detects 401, 404, 429, 5xx status codes
  throw AdapterError.from(error, 'DOWNLOAD_FAILED', true, { url });
}

Serialization

AdapterError includes a toJSON() method for logging:

const error = new AdapterError('Connection failed', 'CONNECTION_FAILED', true);
console.log(JSON.stringify(error.toJSON(), null, 2));
// {
//   "name": "AdapterError",
//   "message": "Connection failed",
//   "code": "CONNECTION_FAILED",
//   "retryable": true,
//   "context": undefined,
//   "cause": undefined,
//   "stack": "..."
// }

Retry Configuration

Basic Configuration

error_handling: {
  max_retries: 3,          // Number of retry attempts
  retry_interval: 1000,    // Base delay between retries (ms)
}

Exponential Backoff with Jitter

Enable exponential backoff for more robust retry behavior:

error_handling: {
  max_retries: 5,
  retry_interval: 1000,           // Base delay: 1 second
  use_exponential_backoff: true,  // Enable exponential backoff
}

How it works:

  1. First retry: ~1 second (1000ms + jitter)
  2. Second retry: ~2 seconds (2000ms + jitter)
  3. Third retry: ~4 seconds (4000ms + jitter)
  4. Fourth retry: ~8 seconds (8000ms + jitter)
  5. Fifth retry: ~16 seconds (16000ms + jitter)

The jitter (random variation up to 30%) prevents thundering herd problems when multiple clients retry simultaneously.

Backoff formula:

delay = min(baseDelay * 2^attempt, maxDelay) + random(0, 0.3 * delay)

Configuration Options

Property Type Default Description
max_retries number 0 Maximum retry attempts
retry_interval number 1000 Base delay between retries (ms)
use_exponential_backoff boolean false Use exponential backoff

Error Scenarios

Network Failures

  • Retries according to max_retries configuration
  • Uses exponential backoff if enabled
  • Emits error event for each failure
// Recommended configuration for network errors
error_handling: {
  max_retries: 3,
  retry_interval: 1000,
  use_exponential_backoff: true,
}

Rate Limiting (HTTP 429)

  • Uses rate_limiting.max_retries_on_rate_limit if configured
  • Falls back to error_handling.max_retries otherwise
  • Respects Retry-After header when provided by the API
  • Applies exponential backoff between retries
rate_limiting: {
  requests_per_second: 10,
  max_retries_on_rate_limit: 5,
}

Authentication Errors (HTTP 401)

  • Attempts OAuth2 token refresh if applicable
  • Retries with refreshed credentials
  • Fails permanently if refresh unsuccessful
  • Not retryable by default (invalid credentials won't become valid)

Timeout Errors

  • Configurable per-connector via timeout option
  • Retryable by default (transient network issues)
source: {
  adapter_id: 'postgresql',
  endpoint_id: 'table_query',
  credential_id: 'db-auth',
  timeout: 60000,  // 60 second timeout
  fields: [],
}

Error Events

Pipeline events include error information for monitoring:

orchestrator.runPipeline({
  id: 'my-pipeline',
  source: { /* ... */ },
  logging: event => {
    if (event.type === 'error') {
      console.error(`Error: ${event.message}`);
      // Send to monitoring service
      monitoring.recordError(event);
    }
  },
});

Event Types

Type Description
start Pipeline begins execution
extract Data extraction completes
transform Data transformation occurs
load Data loading completes
error An error occurs
complete Pipeline finishes successfully
info Informational messages

Debugging Strategies

Enable Debug Logging

Set debug: true on connectors for verbose logging:

source: {
  adapter_id: 'hubspot',
  endpoint_id: 'contacts',
  credential_id: 'hs-auth',
  fields: ['email'],
  debug: true,  // Enable debug logging
}

Use Pipeline Events

orchestrator.runPipeline({
  id: 'debug-pipeline',
  source: { /* ... */ },
  logging: event => {
    console.log(`[${event.timestamp}] ${event.type}: ${event.message}`);
    if (event.dataCount) {
      console.log(`  Records: ${event.dataCount}`);
    }
  },
});

Validate Configuration

Use validatePipeline to catch configuration errors before execution:

import { validatePipeline } from 'openetl';

const validation = validatePipeline(pipeline, adapters, vault);

if (!validation.valid) {
  console.error('Validation errors:');
  validation.errors.forEach(e => console.error(`  - ${e}`));
}

if (validation.warnings.length > 0) {
  console.warn('Warnings:');
  validation.warnings.forEach(w => console.warn(`  - ${w}`));
}

Verification Steps

  1. Credential Validation: Verify credential_id references exist in Vault
  2. Adapter Registration: Ensure adapters are registered with Orchestrator
  3. Endpoint Validation: Check endpoint_id matches adapter endpoints
  4. Component Isolation: Test source and target adapters independently

Complete Example

import { Orchestrator, Pipeline, validatePipeline, AdapterError } from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';

const vault = {
  'hs-auth': {
    id: 'hs-auth',
    type: 'oauth2',
    credentials: {
      client_id: process.env.HUBSPOT_CLIENT_ID!,
      client_secret: process.env.HUBSPOT_CLIENT_SECRET!,
      access_token: process.env.HUBSPOT_ACCESS_TOKEN!,
      refresh_token: process.env.HUBSPOT_REFRESH_TOKEN!,
    },
  },
  'pg-auth': {
    id: 'pg-auth',
    type: 'basic',
    credentials: {
      host: 'localhost',
      database: 'myapp',
      username: 'user',
      password: 'secret',
    },
  },
};

const adapters = { hubspot, postgresql };
const orchestrator = Orchestrator(vault, adapters);

const pipeline: Pipeline = {
  id: 'contacts-sync',
  source: {
    id: 'hs-source',
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hs-auth',
    fields: ['email', 'firstname', 'lastname'],
    debug: true,
  },
  target: {
    id: 'pg-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-auth',
    config: { schema: 'public', table: 'contacts' },
    fields: ['email', 'first_name', 'last_name'],
  },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,
    use_exponential_backoff: true,
  },
  rate_limiting: {
    requests_per_second: 10,
    max_retries_on_rate_limit: 5,
  },
  logging: event => {
    const timestamp = new Date().toISOString();
    if (event.type === 'error') {
      console.error(`[${timestamp}] ERROR: ${event.message}`);
    } else {
      console.log(`[${timestamp}] ${event.type}: ${event.message}`);
    }
  },
};

// Validate first
const validation = validatePipeline(pipeline, adapters, vault);
if (!validation.valid) {
  console.error('Pipeline validation failed:', validation.errors);
  process.exit(1);
}

// Execute with error handling
try {
  const result = await orchestrator.runPipeline(pipeline);
  console.log(`Successfully synchronized ${result.data.length} contacts`);
} catch (error) {
  if (error instanceof AdapterError) {
    console.error(`Adapter error [${error.code}]: ${error.message}`);
    if (error.retryable) {
      console.log('This error may be retryable after resolving the issue');
    }
  } else {
    console.error('Unexpected error:', error);
  }
  process.exit(1);
}

Best Practices

  1. Use Exponential Backoff - Enable use_exponential_backoff for production pipelines
  2. Set Appropriate Timeouts - Configure timeout on connectors to prevent indefinite hangs
  3. Validate Before Execution - Use validatePipeline to catch configuration errors early
  4. Log Pipeline Events - Use the logging callback for observability
  5. Handle AdapterError - Check retryable property before manual retry attempts
  6. Use Debug Mode - Enable debug: true during development

Additional Resources