Custom Adapters
Custom adapters extend OpenETL to support additional data sources. This guide covers the adapter interface, implementation patterns, and testing strategies.
Adapter Structure
An adapter consists of two parts:
- Adapter Description - Static metadata defining capabilities and endpoints
- Adapter Function - Implementation of the adapter interface
Adapter Description
The description object defines the adapter's metadata and available endpoints:
import { HttpAdapter } from 'openetl';
const MyAdapter: HttpAdapter = {
id: 'my-adapter',
name: 'My Custom Adapter',
type: 'http',
action: ['download', 'upload'],
credential_type: 'api_key',
base_url: 'https://api.example.com',
metadata: {
provider: 'example',
description: 'Adapter for Example API',
version: 'v1',
},
endpoints: [
{
id: 'records',
path: '/records',
method: 'GET',
description: 'Retrieve records',
supported_actions: ['download'],
},
{
id: 'create_record',
path: '/records',
method: 'POST',
description: 'Create a record',
supported_actions: ['upload'],
},
],
};
Description Properties
| Property | Type | Description |
|---|---|---|
id |
string | Unique adapter identifier |
name |
string | Human-readable name |
type |
'http' | 'database' | Adapter category |
action |
string[] | Supported actions: download, upload, sync |
credential_type |
string | Authentication type: api_key, oauth2, basic |
base_url |
string | Base URL for HTTP adapters |
endpoints |
Endpoint[] | Available endpoints |
Adapter Function
The adapter function returns an object implementing the adapter interface:
import { Connector, AuthConfig, AdapterInstance, AdapterError } from 'openetl';
function myAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
const endpoint = MyAdapter.endpoints.find(e => e.id === connector.endpoint_id);
if (!endpoint) {
throw new AdapterError(
`Endpoint ${connector.endpoint_id} not found`,
'INVALID_CONFIG'
);
}
return {
async connect(): Promise<void> {
// Validate connection (optional)
},
async download(options: { limit: number; offset: number }): Promise<{
data: any[];
options?: { nextOffset?: string | number };
}> {
// Fetch data from source
const response = await fetch(`${MyAdapter.base_url}${endpoint.path}`, {
headers: {
'Authorization': `Bearer ${auth.credentials.api_key}`,
},
});
if (!response.ok) {
throw new AdapterError(
`API request failed: ${response.status}`,
'DOWNLOAD_FAILED',
response.status === 429 // Retryable if rate limited
);
}
const data = await response.json();
return {
data: data.items,
options: {
nextOffset: data.nextCursor,
},
};
},
async upload(data: any[]): Promise<void> {
// Send data to target
for (const item of data) {
const response = await fetch(`${MyAdapter.base_url}${endpoint.path}`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${auth.credentials.api_key}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(item),
});
if (!response.ok) {
throw new AdapterError(
`Upload failed: ${response.status}`,
'UPLOAD_FAILED'
);
}
}
},
async disconnect(): Promise<void> {
// Cleanup resources (optional)
},
};
}
export { myAdapter, MyAdapter };
Adapter Interface
Required Methods
| Method | Description |
|---|---|
download(options) |
Fetch data with pagination support |
Optional Methods
| Method | Description |
|---|---|
connect() |
Establish connection, validate credentials |
upload(data) |
Send data to target |
disconnect() |
Clean up resources |
validate() |
Test configuration validity |
getSchema() |
Return field/type information |
count() |
Return total record count |
Download Response
The download method must return:
{
data: any[]; // Array of records
options?: {
nextOffset?: string | number; // Cursor/offset for next page
totalCount?: number; // Total records (optional)
};
}
Database Adapter Example
import { Pool } from 'pg';
import { Connector, AuthConfig, AdapterInstance, AdapterError } from 'openetl';
const PostgreSQLAdapter = {
id: 'postgresql',
name: 'PostgreSQL Adapter',
type: 'database' as const,
action: ['download', 'upload'],
credential_type: 'basic',
endpoints: [
{ id: 'table_query', description: 'Query table data', supported_actions: ['download'] },
{ id: 'table_insert', description: 'Insert into table', supported_actions: ['upload'] },
],
};
function postgresql(connector: Connector, auth: AuthConfig): AdapterInstance {
let pool: Pool | null = null;
return {
async connect() {
pool = new Pool({
host: auth.credentials.host,
port: parseInt(auth.credentials.port || '5432'),
database: auth.credentials.database,
user: auth.credentials.username,
password: auth.credentials.password,
});
// Test connection
const client = await pool.connect();
client.release();
},
async download({ limit, offset }) {
if (!pool) {
throw new AdapterError('Not connected', 'CONNECTION_FAILED');
}
const { schema, table } = connector.config;
const fields = connector.fields?.length
? connector.fields.map(f => `"${f}"`).join(', ')
: '*';
const query = `SELECT ${fields} FROM "${schema}"."${table}" LIMIT $1 OFFSET $2`;
const result = await pool.query(query, [limit, offset]);
return {
data: result.rows,
options: {
nextOffset: result.rows.length === limit ? offset + limit : undefined,
},
};
},
async upload(data) {
if (!pool) {
throw new AdapterError('Not connected', 'CONNECTION_FAILED');
}
const { schema, table } = connector.config;
const fields = connector.fields || Object.keys(data[0]);
for (const row of data) {
const values = fields.map(f => row[f]);
const placeholders = fields.map((_, i) => `$${i + 1}`).join(', ');
const columns = fields.map(f => `"${f}"`).join(', ');
await pool.query(
`INSERT INTO "${schema}"."${table}" (${columns}) VALUES (${placeholders})`,
values
);
}
},
async disconnect() {
if (pool) {
await pool.end();
pool = null;
}
},
};
}
export { postgresql, PostgreSQLAdapter };
HTTP Adapter with OAuth2
import { HttpAdapter, Connector, AuthConfig, AdapterInstance } from 'openetl';
const OAuthAdapter: HttpAdapter = {
id: 'oauth-api',
name: 'OAuth API Adapter',
type: 'http',
action: ['download'],
credential_type: 'oauth2',
base_url: 'https://api.example.com',
endpoints: [
{ id: 'resources', path: '/resources', method: 'GET', supported_actions: ['download'] },
],
// OAuth helpers for authorization flow
helpers: {
getCode(redirectUrl: string, clientId: string): string {
const params = new URLSearchParams({
client_id: clientId,
redirect_uri: redirectUrl,
response_type: 'code',
scope: 'read',
});
return `https://api.example.com/oauth/authorize?${params}`;
},
async getTokens(
redirectUrl: string,
clientId: string,
clientSecret: string,
params: { code: string }
): Promise<{ access_token: string; refresh_token: string }> {
const response = await fetch('https://api.example.com/oauth/token', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
grant_type: 'authorization_code',
code: params.code,
client_id: clientId,
client_secret: clientSecret,
redirect_uri: redirectUrl,
}),
});
return response.json();
},
},
};
function oauthAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
async function refreshToken(): Promise<string> {
const response = await fetch('https://api.example.com/oauth/token', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
grant_type: 'refresh_token',
refresh_token: auth.credentials.refresh_token,
client_id: auth.credentials.client_id,
client_secret: auth.credentials.client_secret,
}),
});
const data = await response.json();
auth.credentials.access_token = data.access_token;
return data.access_token;
}
return {
async download({ limit, offset }) {
let token = auth.credentials.access_token;
const response = await fetch(
`${OAuthAdapter.base_url}/resources?limit=${limit}&offset=${offset}`,
{ headers: { 'Authorization': `Bearer ${token}` } }
);
// Handle token refresh
if (response.status === 401) {
token = await refreshToken();
const retryResponse = await fetch(
`${OAuthAdapter.base_url}/resources?limit=${limit}&offset=${offset}`,
{ headers: { 'Authorization': `Bearer ${token}` } }
);
return { data: (await retryResponse.json()).data };
}
return { data: (await response.json()).data };
},
};
}
export { oauthAdapter, OAuthAdapter };
Error Handling
Use AdapterError for consistent error reporting:
import { AdapterError } from 'openetl';
// Connection error (retryable)
throw new AdapterError(
'Connection timeout',
'CONNECTION_FAILED',
true, // retryable
{ host: 'example.com', timeout: 30000 } // context
);
// Authentication error (not retryable)
throw new AdapterError(
'Invalid API key',
'AUTHENTICATION_FAILED',
false
);
// Rate limit (retryable)
throw new AdapterError(
'Rate limit exceeded',
'RATE_LIMITED',
true,
{ retryAfter: 60 }
);
Error Codes
| Code | Description | Typically Retryable |
|---|---|---|
CONNECTION_FAILED |
Failed to connect | Yes |
AUTHENTICATION_FAILED |
Invalid credentials | No |
DOWNLOAD_FAILED |
Error fetching data | Depends |
UPLOAD_FAILED |
Error sending data | Depends |
RATE_LIMITED |
Rate limit exceeded | Yes |
TIMEOUT |
Operation timed out | Yes |
INVALID_CONFIG |
Invalid configuration | No |
Testing
Unit Tests
import { myAdapter, MyAdapter } from './my-adapter';
describe('MyAdapter', () => {
const mockConnector = {
id: 'test',
endpoint_id: 'records',
credential_id: 'test-auth',
fields: ['id', 'name'],
};
const mockAuth = {
id: 'test-auth',
type: 'api_key' as const,
credentials: { api_key: 'test-key' },
};
it('downloads data with pagination', async () => {
// Mock fetch
global.fetch = jest.fn().mockResolvedValue({
ok: true,
json: () => Promise.resolve({
items: [{ id: 1, name: 'Test' }],
nextCursor: 'abc123',
}),
});
const adapter = myAdapter(mockConnector, mockAuth);
const result = await adapter.download({ limit: 10, offset: 0 });
expect(result.data).toHaveLength(1);
expect(result.options?.nextOffset).toBe('abc123');
});
it('handles API errors', async () => {
global.fetch = jest.fn().mockResolvedValue({
ok: false,
status: 500,
});
const adapter = myAdapter(mockConnector, mockAuth);
await expect(adapter.download({ limit: 10, offset: 0 }))
.rejects.toThrow('API request failed');
});
});
Integration Tests
describe('MyAdapter Integration', () => {
// Skip in CI without credentials
const runIntegration = process.env.MY_API_KEY ? it : it.skip;
runIntegration('fetches real data', async () => {
const connector = {
id: 'integration-test',
endpoint_id: 'records',
credential_id: 'real-auth',
fields: [],
};
const auth = {
id: 'real-auth',
type: 'api_key' as const,
credentials: { api_key: process.env.MY_API_KEY! },
};
const adapter = myAdapter(connector, auth);
await adapter.connect?.();
const result = await adapter.download({ limit: 10, offset: 0 });
expect(result.data).toBeDefined();
expect(Array.isArray(result.data)).toBe(true);
await adapter.disconnect?.();
});
});
Best Practices
Implementation
- Use AdapterError - Consistent error handling with proper codes and retryability
- Support pagination - Return
nextOffsetfor large datasets - Handle rate limits - Return retryable errors for HTTP 429
- Validate configuration - Check required config properties early
- Clean up resources - Implement
disconnectfor database connections
Security
- Never log credentials - Avoid logging auth objects
- Use parameterized queries - Prevent SQL injection in database adapters
- Validate operators - Whitelist allowed filter operators
- Escape identifiers - Properly escape table/field names
Performance
- Connection pooling - Reuse database connections
- Batch operations - Upload multiple records per request when supported
- Implement timeouts - Set reasonable request timeouts
Publishing
To publish an adapter to the OpenETL ecosystem:
- Create npm package:
@openetl/adapter-name - Include TypeScript definitions
- Add comprehensive README with examples
- Include test suite
- Submit PR to OpenETL repository
Related
- Adapters - Official adapter documentation
- Error Handling - Error handling patterns
- Contributing - Contribution guidelines