Error Handling
OpenETL provides comprehensive error handling for pipeline execution. This document covers the AdapterError class, retry logic with exponential backoff, and debugging strategies.
Overview
Error handling in OpenETL manages failures during pipeline execution including network errors, authentication failures, rate limiting, and data validation errors. The framework provides:
- AdapterError - Standardized error class with error codes and retry information
- Configurable Retries - Linear or exponential backoff strategies
- Error Events - Callbacks for logging and monitoring
- Graceful Degradation - Continue or halt based on configuration
AdapterError Class
The AdapterError class provides structured error information for adapter operations:
import { AdapterError } from 'openetl';
try {
const result = await adapter.download({ limit: 100, offset: 0 });
} catch (error) {
if (error instanceof AdapterError) {
console.log(error.code); // Error code
console.log(error.retryable); // Whether operation can be retried
console.log(error.context); // Additional context
console.log(error.cause); // Original error if wrapped
}
}
Error Codes
| Code | Description | Typically Retryable |
|---|---|---|
CONNECTION_FAILED |
Failed to connect to service | Yes |
AUTHENTICATION_FAILED |
Invalid credentials | No |
VALIDATION_FAILED |
Invalid data or configuration | No |
DOWNLOAD_FAILED |
Error fetching data | Depends |
UPLOAD_FAILED |
Error sending data | Depends |
TIMEOUT |
Operation timed out | Yes |
RATE_LIMITED |
Rate limit exceeded (HTTP 429) | Yes |
NOT_FOUND |
Resource not found (HTTP 404) | No |
INVALID_CONFIG |
Invalid adapter configuration | No |
UNKNOWN |
Unclassified error | No |
Creating AdapterError
import { AdapterError } from 'openetl';
// Basic error
throw new AdapterError(
'Connection timeout',
'CONNECTION_FAILED',
true // retryable
);
// Error with context
throw new AdapterError(
'Rate limit exceeded',
'RATE_LIMITED',
true,
{ retryAfter: 60, endpoint: '/api/contacts' }
);
// Wrap existing error
throw new AdapterError(
'Download failed',
'DOWNLOAD_FAILED',
false,
{ table: 'users' },
originalError // cause
);
Creating from Unknown Errors
The AdapterError.from() static method creates AdapterError instances from unknown error types, automatically detecting HTTP status codes:
try {
const response = await axios.get(url);
} catch (error) {
// Automatically detects 401, 404, 429, 5xx status codes
throw AdapterError.from(error, 'DOWNLOAD_FAILED', true, { url });
}
Serialization
AdapterError includes a toJSON() method for logging:
const error = new AdapterError('Connection failed', 'CONNECTION_FAILED', true);
console.log(JSON.stringify(error.toJSON(), null, 2));
// {
// "name": "AdapterError",
// "message": "Connection failed",
// "code": "CONNECTION_FAILED",
// "retryable": true,
// "context": undefined,
// "cause": undefined,
// "stack": "..."
// }
Retry Configuration
Basic Configuration
error_handling: {
max_retries: 3, // Number of retry attempts
retry_interval: 1000, // Base delay between retries (ms)
}
Exponential Backoff with Jitter
Enable exponential backoff for more robust retry behavior:
error_handling: {
max_retries: 5,
retry_interval: 1000, // Base delay: 1 second
use_exponential_backoff: true, // Enable exponential backoff
}
How it works:
- First retry: ~1 second (1000ms + jitter)
- Second retry: ~2 seconds (2000ms + jitter)
- Third retry: ~4 seconds (4000ms + jitter)
- Fourth retry: ~8 seconds (8000ms + jitter)
- Fifth retry: ~16 seconds (16000ms + jitter)
The jitter (random variation up to 30%) prevents thundering herd problems when multiple clients retry simultaneously.
Backoff formula:
delay = min(baseDelay * 2^attempt, maxDelay) + random(0, 0.3 * delay)
Configuration Options
| Property | Type | Default | Description |
|---|---|---|---|
max_retries |
number | 0 | Maximum retry attempts |
retry_interval |
number | 1000 | Base delay between retries (ms) |
use_exponential_backoff |
boolean | false | Use exponential backoff |
Error Scenarios
Network Failures
- Retries according to
max_retriesconfiguration - Uses exponential backoff if enabled
- Emits error event for each failure
// Recommended configuration for network errors
error_handling: {
max_retries: 3,
retry_interval: 1000,
use_exponential_backoff: true,
}
Rate Limiting (HTTP 429)
- Uses
rate_limiting.max_retries_on_rate_limitif configured - Falls back to
error_handling.max_retriesotherwise - Respects
Retry-Afterheader when provided by the API - Applies exponential backoff between retries
rate_limiting: {
requests_per_second: 10,
max_retries_on_rate_limit: 5,
}
Authentication Errors (HTTP 401)
- Attempts OAuth2 token refresh if applicable
- Retries with refreshed credentials
- Fails permanently if refresh unsuccessful
- Not retryable by default (invalid credentials won't become valid)
Timeout Errors
- Configurable per-connector via
timeoutoption - Retryable by default (transient network issues)
source: {
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'db-auth',
timeout: 60000, // 60 second timeout
fields: [],
}
Error Events
Pipeline events include error information for monitoring:
orchestrator.runPipeline({
id: 'my-pipeline',
source: { /* ... */ },
logging: event => {
if (event.type === 'error') {
console.error(`Error: ${event.message}`);
// Send to monitoring service
monitoring.recordError(event);
}
},
});
Event Types
| Type | Description |
|---|---|
start |
Pipeline begins execution |
extract |
Data extraction completes |
transform |
Data transformation occurs |
load |
Data loading completes |
error |
An error occurs |
complete |
Pipeline finishes successfully |
info |
Informational messages |
Debugging Strategies
Enable Debug Logging
Set debug: true on connectors for verbose logging:
source: {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['email'],
debug: true, // Enable debug logging
}
Use Pipeline Events
orchestrator.runPipeline({
id: 'debug-pipeline',
source: { /* ... */ },
logging: event => {
console.log(`[${event.timestamp}] ${event.type}: ${event.message}`);
if (event.dataCount) {
console.log(` Records: ${event.dataCount}`);
}
},
});
Validate Configuration
Use validatePipeline to catch configuration errors before execution:
import { validatePipeline } from 'openetl';
const validation = validatePipeline(pipeline, adapters, vault);
if (!validation.valid) {
console.error('Validation errors:');
validation.errors.forEach(e => console.error(` - ${e}`));
}
if (validation.warnings.length > 0) {
console.warn('Warnings:');
validation.warnings.forEach(w => console.warn(` - ${w}`));
}
Verification Steps
- Credential Validation: Verify
credential_idreferences exist in Vault - Adapter Registration: Ensure adapters are registered with Orchestrator
- Endpoint Validation: Check
endpoint_idmatches adapter endpoints - Component Isolation: Test source and target adapters independently
Complete Example
import { Orchestrator, Pipeline, validatePipeline, AdapterError } from 'openetl';
import { hubspot } from '@openetl/hubspot';
import { postgresql } from '@openetl/postgresql';
const vault = {
'hs-auth': {
id: 'hs-auth',
type: 'oauth2',
credentials: {
client_id: process.env.HUBSPOT_CLIENT_ID!,
client_secret: process.env.HUBSPOT_CLIENT_SECRET!,
access_token: process.env.HUBSPOT_ACCESS_TOKEN!,
refresh_token: process.env.HUBSPOT_REFRESH_TOKEN!,
},
},
'pg-auth': {
id: 'pg-auth',
type: 'basic',
credentials: {
host: 'localhost',
database: 'myapp',
username: 'user',
password: 'secret',
},
},
};
const adapters = { hubspot, postgresql };
const orchestrator = Orchestrator(vault, adapters);
const pipeline: Pipeline = {
id: 'contacts-sync',
source: {
id: 'hs-source',
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hs-auth',
fields: ['email', 'firstname', 'lastname'],
debug: true,
},
target: {
id: 'pg-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'pg-auth',
config: { schema: 'public', table: 'contacts' },
fields: ['email', 'first_name', 'last_name'],
},
error_handling: {
max_retries: 3,
retry_interval: 1000,
use_exponential_backoff: true,
},
rate_limiting: {
requests_per_second: 10,
max_retries_on_rate_limit: 5,
},
logging: event => {
const timestamp = new Date().toISOString();
if (event.type === 'error') {
console.error(`[${timestamp}] ERROR: ${event.message}`);
} else {
console.log(`[${timestamp}] ${event.type}: ${event.message}`);
}
},
};
// Validate first
const validation = validatePipeline(pipeline, adapters, vault);
if (!validation.valid) {
console.error('Pipeline validation failed:', validation.errors);
process.exit(1);
}
// Execute with error handling
try {
const result = await orchestrator.runPipeline(pipeline);
console.log(`Successfully synchronized ${result.data.length} contacts`);
} catch (error) {
if (error instanceof AdapterError) {
console.error(`Adapter error [${error.code}]: ${error.message}`);
if (error.retryable) {
console.log('This error may be retryable after resolving the issue');
}
} else {
console.error('Unexpected error:', error);
}
process.exit(1);
}
Best Practices
- Use Exponential Backoff - Enable
use_exponential_backofffor production pipelines - Set Appropriate Timeouts - Configure
timeouton connectors to prevent indefinite hangs - Validate Before Execution - Use
validatePipelineto catch configuration errors early - Log Pipeline Events - Use the
loggingcallback for observability - Handle AdapterError - Check
retryableproperty before manual retry attempts - Use Debug Mode - Enable
debug: trueduring development
Additional Resources
- Pipelines: Pipeline configuration reference
- Rate Limiting: Rate limit configuration
- Adapters: Adapter error handling requirements
- Custom Adapters: Implementing error handling in custom adapters