Products

Custom Adapters

Custom adapters extend OpenETL to connect with any data source—APIs, databases, or files. This section covers how to create, test, and implement your own adapters.

Creating a Custom Adapter

To create a custom adapter, you need to:

  1. Define the adapter description object
  2. Implement the adapter function
  3. Export both for use

1. Define the Adapter Description

First, create a description object that defines the adapter's capabilities:

const StripeAdapter = {
  id: "stripe-adapter",
  name: "Stripe Payments Adapter",
  type: "http",
  action: ["download", "upload"],
  credential_type: "api_key",
  base_url: "https://api.stripe.com/v1",
  metadata: {
    provider: "stripe",
    description: "Adapter for Stripe payment processing API",
    version: "v1",
  },
  endpoints: [
    {
      id: "charges",
      path: "/charges",
      method: "GET",
      description: "Retrieve all charges",
      supported_actions: ["download"],
    },
    {
      id: "customers",
      path: "/customers",
      method: "GET",
      description: "Retrieve all customers",
      supported_actions: ["download"],
    },
    {
      id: "create-customer",
      path: "/customers",
      method: "POST",
      description: "Create a new customer",
      supported_actions: ["upload"],
    },
    // More endpoints...
  ],
};

2. Implement the Adapter Function

Next, create a function that returns an object with the required methods:

import axios from 'axios';

function stripeAdapter(connector, auth) {
  // Check if endpoint exists in the adapter description
  const endpoint = StripeAdapter.endpoints.find(e => e.id === connector.endpoint_id);
  if (!endpoint) {
    throw new Error(`Endpoint ${connector.endpoint_id} not found in Stripe adapter`);
  }
  
  // Create HTTP client
  const client = axios.create({
    baseURL: StripeAdapter.base_url,
    headers: { 
      'Authorization': `Bearer ${auth.credentials.api_key}`,
      'Content-Type': 'application/x-www-form-urlencoded'
    },
  });

  return {
    async connect() {
      // Test connection with a basic request
      await client.get('/charges', { params: { limit: 1 } });
      console.log("Successfully connected to Stripe API");
    },
    
    async download({ limit, offset }) {
      // Handle pagination based on Stripe's cursor-based approach
      const params = { limit };
      if (offset) {
        params.starting_after = offset;
      }
      
      // Add any filters from the connector configuration
      if (connector.filters) {
        // Transform OpenETL filters to Stripe filter format
        // ...
      }
      
      // Make the API request
      const response = await client.get(endpoint.path, { params });
      
      // Process response data
      const items = response.data.data;
      
      // Return data and pagination info
      return { 
        data: items,
        options: { 
          nextOffset: items.length === limit ? items[items.length - 1].id : undefined 
        }
      };
    },
    
    async upload(data) {
      // Only implement if the endpoint supports uploads
      if (!endpoint.supported_actions.includes('upload')) {
        throw new Error(`Upload not supported for endpoint ${connector.endpoint_id}`);
      }
      
      // Process each item in the data array
      for (const item of data) {
        // Convert to URL encoded format for Stripe
        const formData = new URLSearchParams();
        for (const [key, value] of Object.entries(item)) {
          formData.append(key, value);
        }
        
        // Send to Stripe
        await client.post(endpoint.path, formData);
      }
    },
    
    async disconnect() {
      // No specific cleanup needed for Stripe
      console.log("Disconnected from Stripe adapter");
    }
  };
}

3. Export the Adapter

Finally, export both the description and implementation:

export { stripeAdapter, StripeAdapter };

Testing Your Custom Adapter

Testing ensures reliability. Here's how to set up and run tests:

Setup for Testing

npm install --save-dev jest @types/jest ts-jest

Configure Jest in jest.config.js:

module.exports = {
  preset: 'ts-jest',
  testEnvironment: 'node',
};

Update package.json:

{
  "scripts": {
    "test": "jest"
  }
}

Writing Unit Tests

Create a test file for your adapter:

import { stripeAdapter, StripeAdapter } from './stripe-adapter';
import axios from 'axios';

// Mock axios
jest.mock('axios');
const mockAxios = axios as jest.Mocked<typeof axios>;

describe('Stripe Adapter', () => {
  // Mock connector and auth
  const connector = { 
    endpoint_id: 'charges',
    fields: ['id', 'amount', 'status'],
    filters: []
  };
  
  const auth = { 
    type: 'api_key', 
    credentials: { api_key: 'sk_test_123' } 
  };
  
  // Mock axios.create
  mockAxios.create.mockReturnValue({
    get: jest.fn(),
    post: jest.fn()
  } as any);
  
  let adapter;
  
  beforeEach(() => {
    adapter = stripeAdapter(connector, auth);
  });
  
  it('connects successfully', async () => {
    const mockClient = mockAxios.create();
    mockClient.get.mockResolvedValueOnce({ data: { data: [] } });
    
    await adapter.connect();
    expect(mockClient.get).toHaveBeenCalledWith('/charges', { params: { limit: 1 } });
  });
  
  it('downloads data with pagination', async () => {
    const mockClient = mockAxios.create();
    const mockData = { 
      data: { 
        data: [
          { id: 'ch_1', amount: 1000, status: 'succeeded' },
          { id: 'ch_2', amount: 2000, status: 'succeeded' }
        ],
        has_more: true 
      } 
    };
    mockClient.get.mockResolvedValueOnce(mockData);
    
    const result = await adapter.download({ limit: 2, offset: null });
    
    expect(mockClient.get).toHaveBeenCalledWith('/charges', { params: { limit: 2 } });
    expect(result.data).toHaveLength(2);
    expect(result.options.nextOffset).toBe('ch_2');
  });
  
  // Additional tests...
});

Run tests with:

npm test

Example: MySQL Adapter

Here's another complete example of a custom adapter for MySQL:

import mysql from 'mysql2/promise';

// 1. Adapter Description
const MySQLAdapter = {
  id: "mysql-adapter",
  name: "MySQL Database Adapter",
  type: "database",
  action: ["download", "upload"],
  credential_type: "basic",
  metadata: {
    provider: "mysql",
    description: "Adapter for MySQL databases",
    version: "8.0",
  },
  endpoints: [
    {
      id: "table_query",
      description: "Query a MySQL table",
      supported_actions: ["download"],
    },
    {
      id: "table_insert",
      description: "Insert into a MySQL table",
      supported_actions: ["upload"],
    },
    {
      id: "raw_query",
      description: "Execute a raw SQL query",
      supported_actions: ["download"],
    },
  ],
};

// 2. Adapter Implementation
function mysqlAdapter(connector, auth) {
  // Create connection pool
  const pool = mysql.createPool({
    host: auth.credentials.host,
    user: auth.credentials.username,
    password: auth.credentials.password,
    database: auth.credentials.database,
    waitForConnections: true,
    connectionLimit: 10,
  });

  return {
    async connect() {
      // Test connection
      const connection = await pool.getConnection();
      connection.release();
      console.log("Successfully connected to MySQL database");
    },
    
    async download({ limit, offset }) {
      const table = connector.config?.table;
      if (!table && connector.endpoint_id !== 'raw_query') {
        throw new Error("Table name is required for MySQL adapter");
      }
      
      let query;
      let params = [];
      
      // Handle different endpoint types
      switch (connector.endpoint_id) {
        case 'table_query':
          // Build query from connector configuration
          const fields = connector.fields?.length ? connector.fields.join(', ') : '*';
          query = `SELECT ${fields} FROM ${table}`;
          
          // Add WHERE clauses for filters
          if (connector.filters?.length) {
            const whereClauses = connector.filters.map(filter => {
              // Simple filter handling - can be expanded
              params.push(filter.value);
              return `${filter.field} ${filter.operator} ?`;
            }).join(' AND ');
            query += ` WHERE ${whereClauses}`;
          }
          
          // Add pagination
          query += ' LIMIT ? OFFSET ?';
          params.push(limit, offset || 0);
          break;
          
        case 'raw_query':
          // Use raw SQL from connector config
          if (!connector.config?.query) {
            throw new Error("SQL query is required for raw_query endpoint");
          }
          query = connector.config.query;
          params = connector.config.params || [];
          break;
          
        default:
          throw new Error(`Unsupported endpoint: ${connector.endpoint_id}`);
      }
      
      // Execute query
      const [rows] = await pool.query(query, params);
      return { 
        data: Array.isArray(rows) ? rows : [rows],
        options: {
          nextOffset: offset ? offset + limit : limit
        }
      };
    },
    
    async upload(data) {
      if (connector.endpoint_id !== 'table_insert') {
        throw new Error(`Upload not supported for endpoint ${connector.endpoint_id}`);
      }
      
      const table = connector.config?.table;
      if (!table) {
        throw new Error("Table name is required for MySQL adapter upload");
      }
      
      // For each data item, build an insert query
      for (const item of data) {
        const fields = Object.keys(item);
        const placeholders = fields.map(() => '?').join(', ');
        const values = fields.map(field => item[field]);
        
        const query = `INSERT INTO ${table} (${fields.join(', ')}) VALUES (${placeholders})`;
        await pool.query(query, values);
      }
    },
    
    async disconnect() {
      await pool.end();
      console.log("Disconnected from MySQL database");
    }
  };
}

// 3. Export
export { mysqlAdapter, MySQLAdapter };

Best Practices for Custom Adapters

When creating custom adapters, follow these guidelines:

  1. Complete Description: Include comprehensive metadata in your adapter description.
  2. Error Handling: Implement robust error handling, especially for network requests and authentication.
  3. Rate Limiting: Add retry logic for rate-limited APIs.
  4. Pagination: Support the appropriate pagination method for your data source.
  5. Test Coverage: Write tests for all adapter methods and edge cases.
  6. Documentation: Document your adapter's endpoints and configuration options.
  7. Stateless Design: Keep adapters stateless, relying on Vault and connector configurations.

Contributing Your Adapter

Once you've built and tested your adapter, consider contributing it to the OpenETL ecosystem:

  1. Package It: Create an npm package following the naming convention @openetl/adapter-name.
  2. Document It: Include thorough README and examples.
  3. Submit PR: Submit a pull request to the OpenETL repository.

For more information on contributing to OpenETL, see Contributing.