Custom Adapters
Custom adapters extend OpenETL to connect with any data source—APIs, databases, or files. This section covers how to create, test, and implement your own adapters.
Creating a Custom Adapter
To create a custom adapter, you need to:
- Define the adapter description object
- Implement the adapter function
- Export both for use
1. Define the Adapter Description
First, create a description object that defines the adapter's capabilities:
const StripeAdapter = {
id: "stripe-adapter",
name: "Stripe Payments Adapter",
type: "http",
action: ["download", "upload"],
credential_type: "api_key",
base_url: "https://api.stripe.com/v1",
metadata: {
provider: "stripe",
description: "Adapter for Stripe payment processing API",
version: "v1",
},
endpoints: [
{
id: "charges",
path: "/charges",
method: "GET",
description: "Retrieve all charges",
supported_actions: ["download"],
},
{
id: "customers",
path: "/customers",
method: "GET",
description: "Retrieve all customers",
supported_actions: ["download"],
},
{
id: "create-customer",
path: "/customers",
method: "POST",
description: "Create a new customer",
supported_actions: ["upload"],
},
// More endpoints...
],
};
2. Implement the Adapter Function
Next, create a function that returns an object with the required methods:
import axios from 'axios';
function stripeAdapter(connector, auth) {
// Check if endpoint exists in the adapter description
const endpoint = StripeAdapter.endpoints.find(e => e.id === connector.endpoint_id);
if (!endpoint) {
throw new Error(`Endpoint ${connector.endpoint_id} not found in Stripe adapter`);
}
// Create HTTP client
const client = axios.create({
baseURL: StripeAdapter.base_url,
headers: {
'Authorization': `Bearer ${auth.credentials.api_key}`,
'Content-Type': 'application/x-www-form-urlencoded'
},
});
return {
async connect() {
// Test connection with a basic request
await client.get('/charges', { params: { limit: 1 } });
console.log("Successfully connected to Stripe API");
},
async download({ limit, offset }) {
// Handle pagination based on Stripe's cursor-based approach
const params = { limit };
if (offset) {
params.starting_after = offset;
}
// Add any filters from the connector configuration
if (connector.filters) {
// Transform OpenETL filters to Stripe filter format
// ...
}
// Make the API request
const response = await client.get(endpoint.path, { params });
// Process response data
const items = response.data.data;
// Return data and pagination info
return {
data: items,
options: {
nextOffset: items.length === limit ? items[items.length - 1].id : undefined
}
};
},
async upload(data) {
// Only implement if the endpoint supports uploads
if (!endpoint.supported_actions.includes('upload')) {
throw new Error(`Upload not supported for endpoint ${connector.endpoint_id}`);
}
// Process each item in the data array
for (const item of data) {
// Convert to URL encoded format for Stripe
const formData = new URLSearchParams();
for (const [key, value] of Object.entries(item)) {
formData.append(key, value);
}
// Send to Stripe
await client.post(endpoint.path, formData);
}
},
async disconnect() {
// No specific cleanup needed for Stripe
console.log("Disconnected from Stripe adapter");
}
};
}
3. Export the Adapter
Finally, export both the description and implementation:
export { stripeAdapter, StripeAdapter };
Testing Your Custom Adapter
Testing ensures reliability. Here's how to set up and run tests:
Setup for Testing
npm install --save-dev jest @types/jest ts-jest
Configure Jest in jest.config.js
:
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
};
Update package.json
:
{
"scripts": {
"test": "jest"
}
}
Writing Unit Tests
Create a test file for your adapter:
import { stripeAdapter, StripeAdapter } from './stripe-adapter';
import axios from 'axios';
// Mock axios
jest.mock('axios');
const mockAxios = axios as jest.Mocked<typeof axios>;
describe('Stripe Adapter', () => {
// Mock connector and auth
const connector = {
endpoint_id: 'charges',
fields: ['id', 'amount', 'status'],
filters: []
};
const auth = {
type: 'api_key',
credentials: { api_key: 'sk_test_123' }
};
// Mock axios.create
mockAxios.create.mockReturnValue({
get: jest.fn(),
post: jest.fn()
} as any);
let adapter;
beforeEach(() => {
adapter = stripeAdapter(connector, auth);
});
it('connects successfully', async () => {
const mockClient = mockAxios.create();
mockClient.get.mockResolvedValueOnce({ data: { data: [] } });
await adapter.connect();
expect(mockClient.get).toHaveBeenCalledWith('/charges', { params: { limit: 1 } });
});
it('downloads data with pagination', async () => {
const mockClient = mockAxios.create();
const mockData = {
data: {
data: [
{ id: 'ch_1', amount: 1000, status: 'succeeded' },
{ id: 'ch_2', amount: 2000, status: 'succeeded' }
],
has_more: true
}
};
mockClient.get.mockResolvedValueOnce(mockData);
const result = await adapter.download({ limit: 2, offset: null });
expect(mockClient.get).toHaveBeenCalledWith('/charges', { params: { limit: 2 } });
expect(result.data).toHaveLength(2);
expect(result.options.nextOffset).toBe('ch_2');
});
// Additional tests...
});
Run tests with:
npm test
Example: MySQL Adapter
Here's another complete example of a custom adapter for MySQL:
import mysql from 'mysql2/promise';
// 1. Adapter Description
const MySQLAdapter = {
id: "mysql-adapter",
name: "MySQL Database Adapter",
type: "database",
action: ["download", "upload"],
credential_type: "basic",
metadata: {
provider: "mysql",
description: "Adapter for MySQL databases",
version: "8.0",
},
endpoints: [
{
id: "table_query",
description: "Query a MySQL table",
supported_actions: ["download"],
},
{
id: "table_insert",
description: "Insert into a MySQL table",
supported_actions: ["upload"],
},
{
id: "raw_query",
description: "Execute a raw SQL query",
supported_actions: ["download"],
},
],
};
// 2. Adapter Implementation
function mysqlAdapter(connector, auth) {
// Create connection pool
const pool = mysql.createPool({
host: auth.credentials.host,
user: auth.credentials.username,
password: auth.credentials.password,
database: auth.credentials.database,
waitForConnections: true,
connectionLimit: 10,
});
return {
async connect() {
// Test connection
const connection = await pool.getConnection();
connection.release();
console.log("Successfully connected to MySQL database");
},
async download({ limit, offset }) {
const table = connector.config?.table;
if (!table && connector.endpoint_id !== 'raw_query') {
throw new Error("Table name is required for MySQL adapter");
}
let query;
let params = [];
// Handle different endpoint types
switch (connector.endpoint_id) {
case 'table_query':
// Build query from connector configuration
const fields = connector.fields?.length ? connector.fields.join(', ') : '*';
query = `SELECT ${fields} FROM ${table}`;
// Add WHERE clauses for filters
if (connector.filters?.length) {
const whereClauses = connector.filters.map(filter => {
// Simple filter handling - can be expanded
params.push(filter.value);
return `${filter.field} ${filter.operator} ?`;
}).join(' AND ');
query += ` WHERE ${whereClauses}`;
}
// Add pagination
query += ' LIMIT ? OFFSET ?';
params.push(limit, offset || 0);
break;
case 'raw_query':
// Use raw SQL from connector config
if (!connector.config?.query) {
throw new Error("SQL query is required for raw_query endpoint");
}
query = connector.config.query;
params = connector.config.params || [];
break;
default:
throw new Error(`Unsupported endpoint: ${connector.endpoint_id}`);
}
// Execute query
const [rows] = await pool.query(query, params);
return {
data: Array.isArray(rows) ? rows : [rows],
options: {
nextOffset: offset ? offset + limit : limit
}
};
},
async upload(data) {
if (connector.endpoint_id !== 'table_insert') {
throw new Error(`Upload not supported for endpoint ${connector.endpoint_id}`);
}
const table = connector.config?.table;
if (!table) {
throw new Error("Table name is required for MySQL adapter upload");
}
// For each data item, build an insert query
for (const item of data) {
const fields = Object.keys(item);
const placeholders = fields.map(() => '?').join(', ');
const values = fields.map(field => item[field]);
const query = `INSERT INTO ${table} (${fields.join(', ')}) VALUES (${placeholders})`;
await pool.query(query, values);
}
},
async disconnect() {
await pool.end();
console.log("Disconnected from MySQL database");
}
};
}
// 3. Export
export { mysqlAdapter, MySQLAdapter };
Best Practices for Custom Adapters
When creating custom adapters, follow these guidelines:
- Complete Description: Include comprehensive metadata in your adapter description.
- Error Handling: Implement robust error handling, especially for network requests and authentication.
- Rate Limiting: Add retry logic for rate-limited APIs.
- Pagination: Support the appropriate pagination method for your data source.
- Test Coverage: Write tests for all adapter methods and edge cases.
- Documentation: Document your adapter's endpoints and configuration options.
- Stateless Design: Keep adapters stateless, relying on Vault and connector configurations.
Contributing Your Adapter
Once you've built and tested your adapter, consider contributing it to the OpenETL ecosystem:
-
Package It: Create an npm package following the naming convention
@openetl/adapter-name
. - Document It: Include thorough README and examples.
- Submit PR: Submit a pull request to the OpenETL repository.
For more information on contributing to OpenETL, see Contributing.