API Reference
Complete reference for OpenETL types, interfaces, and exported functions.
Core Types
Vault
Credential storage mapping credential IDs to authentication configurations.
type Vault = Record<string, AuthConfig>;
Example:
const vault: Vault = {
'my-db': { id: 'my-db', type: 'basic', credentials: { /* ... */ } },
'my-api': { id: 'my-api', type: 'oauth2', credentials: { /* ... */ } },
};
AuthConfig
Union type for authentication configurations.
type AuthConfig = ApiKeyAuth | OAuth2Auth | BasicAuth;
BaseAuth
Common authentication properties.
type BaseAuth = {
id: string;
type?: "api_key" | "oauth2" | "basic";
name?: string;
provider?: string;
environment?: "production" | "staging" | "development";
metadata?: {
created_by: string;
created_at: string;
updated_at: string;
};
retry?: {
attempts: number;
delay: number;
};
timeout?: number;
expires_at?: string | number;
};
ApiKeyAuth
API key authentication.
type ApiKeyAuth = BaseAuth & {
type: "api_key";
credentials: {
api_key: string;
api_secret?: string;
[key: string]: string | undefined;
};
};
OAuth2Auth
OAuth 2.0 authentication with token refresh support.
type OAuth2Auth = BaseAuth & {
type: "oauth2";
credentials: {
client_id: string;
client_secret: string;
refresh_token?: string;
access_token?: string;
};
scopes?: string[];
};
BasicAuth
Basic/database authentication.
type BasicAuth = BaseAuth & {
type: "basic";
credentials: {
username: string;
password: string;
host?: string;
port?: string;
database?: string;
[key: string]: string | undefined;
};
};
Pipeline Types
Pipeline
Main pipeline configuration interface.
interface Pipeline<T = object> {
id: string;
data?: T[];
source?: Connector;
target?: Connector;
schedule?: {
frequency: "hourly" | "daily" | "weekly";
at: string;
};
logging?: (event: PipelineEvent) => void;
onload?: (data: T[]) => void;
onbeforesend?: (data: T[]) => T[] | boolean | void;
onupload?: () => void;
error_handling?: {
max_retries: number;
retry_interval: number;
use_exponential_backoff?: boolean;
};
rate_limiting?: {
requests_per_second: number;
max_retries_on_rate_limit: number;
};
}
PipelineEvent
Event emitted during pipeline execution.
type PipelineEvent = {
type: "start" | "extract" | "transform" | "load" | "error" | "complete" | "info";
message: string;
timestamp?: string;
dataCount?: number;
};
Connector Types
Connector
Configuration for source or target connections.
interface Connector {
id: string;
adapter_id: string;
endpoint_id: string;
credential_id: string;
config?: {
headers?: Record<string, string>;
query_params?: Record<string, string>;
schema?: string;
table?: string;
custom_query?: string;
[key: string]: any;
};
fields: string[];
filters?: Array<Filter | FilterGroup>;
transform?: Transformation[];
sort?: Sort[];
limit?: number;
pagination?: Pagination;
timeout?: number;
debug?: boolean;
}
Filter
Simple filter condition.
type Filter = {
field: string;
operator: string;
value: string | number | boolean | null | Array<string | number>;
};
FilterGroup
Complex filter with AND/OR logic.
type FilterGroup = {
op: "AND" | "OR";
filters: Array<Filter | FilterGroup>;
};
Sort
Sorting configuration.
type Sort = {
type: 'asc' | 'desc';
field: string;
};
Pagination
Pagination configuration.
type Pagination = {
itemsPerPage?: number;
pageOffsetKey?: number | string;
};
Transformation Types
Transformation
Transformation configuration.
type Transformation = {
type: TransformationType;
options: TransformationOption;
};
TransformationType
Available transformation types.
type TransformationType =
| 'concat'
| 'renameKey'
| 'uppercase'
| 'lowercase'
| 'trim'
| 'split'
| 'replace'
| 'addPrefix'
| 'addSuffix'
| 'toNumber'
| 'extract'
| 'mergeObjects'
| Function;
Transformation Options
type ConcatOptions = {
properties: string[];
glue?: string;
to?: string;
};
type RenameKeyOptions = {
from: string;
to?: string;
};
type FieldTransformationOptions = {
field: string;
to?: string;
};
type SplitOptions = FieldTransformationOptions & {
delimiter: string;
};
type ReplaceOptions = FieldTransformationOptions & {
search: string;
replace: string;
};
type PrefixOptions = FieldTransformationOptions & {
prefix: string;
};
type SuffixOptions = FieldTransformationOptions & {
suffix: string;
};
type ExtractOptions = FieldTransformationOptions & {
pattern?: string;
start?: number;
end?: number;
};
type MergeObjectsOptions = {
fields: string[];
to?: string;
};
Adapter Types
BaseAdapter
Common adapter properties.
interface BaseAdapter {
id: string;
name: string;
type?: "http" | "database" | "file";
category?: string;
image?: string;
action: Array<"download" | "upload" | "sync">;
credential_type: "api_key" | "oauth2" | "basic";
config?: ConfigItem[];
metadata?: {
description?: string;
provider?: string;
version?: string;
[key: string]: any;
};
pagination?: AdapterPagination;
helpers?: Helpers;
}
HttpAdapter
HTTP/API adapter configuration.
interface HttpAdapter extends BaseAdapter {
type: "http";
base_url: string;
endpoints: HttpEndpoint[];
}
DatabaseAdapter
Database adapter configuration.
interface DatabaseAdapter extends BaseAdapter {
type: "database";
hasGetColumnsRoute?: boolean;
endpoints: DatabaseEndpoint[];
}
HttpEndpoint
HTTP endpoint configuration.
interface HttpEndpoint extends Endpoint {
path: string;
method: "GET" | "POST" | "PUT" | "DELETE";
defaultFields?: string[];
}
DatabaseEndpoint
Database endpoint configuration.
interface DatabaseEndpoint extends Endpoint {
query_type: "table" | "custom";
}
Endpoint
Base endpoint interface.
interface Endpoint {
id: string;
tool?: string;
description?: string;
supported_actions: Array<"download" | "upload" | "sync">;
settings?: EndpointSettings;
}
EndpointSettings
Endpoint-specific settings.
interface EndpointSettings {
pagination?: AdapterPagination | false;
config?: ConfigItem[];
}
ConfigItem
Configuration item definition.
interface ConfigItem {
id: string;
name: string;
required: boolean;
type?: string;
default?: any;
}
AdapterPagination
Adapter pagination settings.
type AdapterPagination = {
type: string;
maxItemsPerPage?: number;
};
AdapterInstance
Interface returned by adapter factory functions.
interface AdapterInstance {
getConfig: () => HttpAdapter | DatabaseAdapter;
connect?(): Promise<void>;
disconnect?(): Promise<void>;
download(pageOptions: { limit?: number; offset?: number | string }): Promise<{
data: any[];
options?: {
nextOffset?: string | number;
totalCount?: number;
[key: string]: any;
};
}>;
upload?(data: any[]): Promise<void>;
getOauthPermissionUrl?: (redirectUrl?: string) => string;
validate?(): Promise<ValidationResult>;
getSchema?(): Promise<SchemaInfo>;
count?(): Promise<number>;
}
ValidationResult
Result of adapter validation.
interface ValidationResult {
valid: boolean;
message?: string;
details?: Record<string, unknown>;
}
SchemaInfo
Schema information from adapter.
interface SchemaInfo {
fields: string[];
types?: Record<string, string>;
primaryKey?: string | string[];
required?: string[];
}
Error Types
AdapterError
Standardized error class for adapter operations.
class AdapterError extends Error {
readonly code: AdapterErrorCode;
readonly retryable: boolean;
readonly context?: Record<string, unknown>;
readonly cause?: Error;
constructor(
message: string,
code?: AdapterErrorCode,
retryable?: boolean,
context?: Record<string, unknown>,
cause?: Error
);
static from(
error: unknown,
code?: AdapterErrorCode,
retryable?: boolean,
context?: Record<string, unknown>
): AdapterError;
toJSON(): Record<string, unknown>;
}
AdapterErrorCode
Error code enumeration.
type AdapterErrorCode =
| 'CONNECTION_FAILED'
| 'AUTHENTICATION_FAILED'
| 'VALIDATION_FAILED'
| 'DOWNLOAD_FAILED'
| 'UPLOAD_FAILED'
| 'TIMEOUT'
| 'RATE_LIMITED'
| 'NOT_FOUND'
| 'INVALID_CONFIG'
| 'UNKNOWN';
Functions
Orchestrator
Creates an orchestrator instance for running pipelines.
function Orchestrator(vault: Vault, adapters: Adapters): {
runPipeline<T>(pipeline: Pipeline<T>): Promise<{ data: T[] }>;
};
Example:
import { Orchestrator } from 'openetl';
import { postgresql } from '@openetl/postgresql';
const etl = Orchestrator(vault, { postgresql });
const result = await etl.runPipeline(pipeline);
validatePipeline
Validates pipeline configuration before execution.
function validatePipeline<T>(
pipeline: Pipeline<T>,
adapters: Adapters,
vault: Vault
): PipelineValidationResult;
Returns:
interface PipelineValidationResult {
valid: boolean;
errors: string[];
warnings: string[];
}
Example:
import { validatePipeline } from 'openetl';
const validation = validatePipeline(pipeline, adapters, vault);
if (!validation.valid) {
console.error('Errors:', validation.errors);
}
Constants
DEFAULT_CONFIG
Default orchestrator configuration values.
const DEFAULT_CONFIG = {
ITEMS_PER_PAGE: 100,
TOTAL_ITEMS_LIMIT: 1000000,
TIMEOUT_MS: 30000,
RETRY_INTERVAL_MS: 1000,
MAX_RETRIES: 0,
MAX_BACKOFF_MS: 30000,
} as const;
Type Aliases
Adapter
Adapter factory function type.
type Adapter = (connector: Connector, auth: AuthConfig) => AdapterInstance;
Adapters
Map of registered adapters.
type Adapters = {
[key: string]: Adapter;
};
Helpers
OAuth helper functions.
type Helpers = {
getCode: (redirectUrl: string, client_id: string) => string;
getTokens: (redirectUrl: string, client_id: string, secret_id: string, queryParams: string) => object;
};
Exports
All types and functions are exported from the main package:
import {
// Core
Orchestrator,
validatePipeline,
DEFAULT_CONFIG,
// Types
Pipeline,
Connector,
Vault,
AuthConfig,
ApiKeyAuth,
OAuth2Auth,
BasicAuth,
// Adapters
BaseAdapter,
HttpAdapter,
DatabaseAdapter,
AdapterInstance,
Adapters,
Adapter,
// Endpoints
Endpoint,
HttpEndpoint,
DatabaseEndpoint,
EndpointSettings,
ConfigItem,
// Filters
Filter,
FilterGroup,
Sort,
Pagination,
// Transformations
Transformation,
TransformationType,
ConcatOptions,
RenameKeyOptions,
FieldTransformationOptions,
// Errors
AdapterError,
AdapterErrorCode,
// Events
PipelineEvent,
// Validation
ValidationResult,
SchemaInfo,
PipelineValidationResult,
} from 'openetl';