Custom Adapters

Custom adapters extend OpenETL to support additional data sources. This guide covers the adapter interface, implementation patterns, and testing strategies.

Adapter Structure

An adapter consists of two parts:

  1. Adapter Description - Static metadata defining capabilities and endpoints
  2. Adapter Function - Implementation of the adapter interface

Adapter Description

The description object defines the adapter's metadata and available endpoints:

import { HttpAdapter } from 'openetl';

const MyAdapter: HttpAdapter = {
  id: 'my-adapter',
  name: 'My Custom Adapter',
  type: 'http',
  action: ['download', 'upload'],
  credential_type: 'api_key',
  base_url: 'https://api.example.com',
  metadata: {
    provider: 'example',
    description: 'Adapter for Example API',
    version: 'v1',
  },
  endpoints: [
    {
      id: 'records',
      path: '/records',
      method: 'GET',
      description: 'Retrieve records',
      supported_actions: ['download'],
    },
    {
      id: 'create_record',
      path: '/records',
      method: 'POST',
      description: 'Create a record',
      supported_actions: ['upload'],
    },
  ],
};

Description Properties

Property Type Description
id string Unique adapter identifier
name string Human-readable name
type 'http' | 'database' Adapter category
action string[] Supported actions: download, upload, sync
credential_type string Authentication type: api_key, oauth2, basic
base_url string Base URL for HTTP adapters
endpoints Endpoint[] Available endpoints

Adapter Function

The adapter function returns an object implementing the adapter interface:

import { Connector, AuthConfig, AdapterInstance, AdapterError } from 'openetl';

function myAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
  const endpoint = MyAdapter.endpoints.find(e => e.id === connector.endpoint_id);
  if (!endpoint) {
    throw new AdapterError(
      `Endpoint ${connector.endpoint_id} not found`,
      'INVALID_CONFIG'
    );
  }

  return {
    async connect(): Promise<void> {
      // Validate connection (optional)
    },

    async download(options: { limit: number; offset: number }): Promise<{
      data: any[];
      options?: { nextOffset?: string | number };
    }> {
      // Fetch data from source
      const response = await fetch(`${MyAdapter.base_url}${endpoint.path}`, {
        headers: {
          'Authorization': `Bearer ${auth.credentials.api_key}`,
        },
      });

      if (!response.ok) {
        throw new AdapterError(
          `API request failed: ${response.status}`,
          'DOWNLOAD_FAILED',
          response.status === 429  // Retryable if rate limited
        );
      }

      const data = await response.json();
      return {
        data: data.items,
        options: {
          nextOffset: data.nextCursor,
        },
      };
    },

    async upload(data: any[]): Promise<void> {
      // Send data to target
      for (const item of data) {
        const response = await fetch(`${MyAdapter.base_url}${endpoint.path}`, {
          method: 'POST',
          headers: {
            'Authorization': `Bearer ${auth.credentials.api_key}`,
            'Content-Type': 'application/json',
          },
          body: JSON.stringify(item),
        });

        if (!response.ok) {
          throw new AdapterError(
            `Upload failed: ${response.status}`,
            'UPLOAD_FAILED'
          );
        }
      }
    },

    async disconnect(): Promise<void> {
      // Cleanup resources (optional)
    },
  };
}

export { myAdapter, MyAdapter };

Adapter Interface

Required Methods

Method Description
download(options) Fetch data with pagination support

Optional Methods

Method Description
connect() Establish connection, validate credentials
upload(data) Send data to target
disconnect() Clean up resources
validate() Test configuration validity
getSchema() Return field/type information
count() Return total record count

Download Response

The download method must return:

{
  data: any[];              // Array of records
  options?: {
    nextOffset?: string | number;  // Cursor/offset for next page
    totalCount?: number;           // Total records (optional)
  };
}

Database Adapter Example

import { Pool } from 'pg';
import { Connector, AuthConfig, AdapterInstance, AdapterError } from 'openetl';

const PostgreSQLAdapter = {
  id: 'postgresql',
  name: 'PostgreSQL Adapter',
  type: 'database' as const,
  action: ['download', 'upload'],
  credential_type: 'basic',
  endpoints: [
    { id: 'table_query', description: 'Query table data', supported_actions: ['download'] },
    { id: 'table_insert', description: 'Insert into table', supported_actions: ['upload'] },
  ],
};

function postgresql(connector: Connector, auth: AuthConfig): AdapterInstance {
  let pool: Pool | null = null;

  return {
    async connect() {
      pool = new Pool({
        host: auth.credentials.host,
        port: parseInt(auth.credentials.port || '5432'),
        database: auth.credentials.database,
        user: auth.credentials.username,
        password: auth.credentials.password,
      });

      // Test connection
      const client = await pool.connect();
      client.release();
    },

    async download({ limit, offset }) {
      if (!pool) {
        throw new AdapterError('Not connected', 'CONNECTION_FAILED');
      }

      const { schema, table } = connector.config;
      const fields = connector.fields?.length
        ? connector.fields.map(f => `"${f}"`).join(', ')
        : '*';

      const query = `SELECT ${fields} FROM "${schema}"."${table}" LIMIT $1 OFFSET $2`;
      const result = await pool.query(query, [limit, offset]);

      return {
        data: result.rows,
        options: {
          nextOffset: result.rows.length === limit ? offset + limit : undefined,
        },
      };
    },

    async upload(data) {
      if (!pool) {
        throw new AdapterError('Not connected', 'CONNECTION_FAILED');
      }

      const { schema, table } = connector.config;
      const fields = connector.fields || Object.keys(data[0]);

      for (const row of data) {
        const values = fields.map(f => row[f]);
        const placeholders = fields.map((_, i) => `$${i + 1}`).join(', ');
        const columns = fields.map(f => `"${f}"`).join(', ');

        await pool.query(
          `INSERT INTO "${schema}"."${table}" (${columns}) VALUES (${placeholders})`,
          values
        );
      }
    },

    async disconnect() {
      if (pool) {
        await pool.end();
        pool = null;
      }
    },
  };
}

export { postgresql, PostgreSQLAdapter };

HTTP Adapter with OAuth2

import { HttpAdapter, Connector, AuthConfig, AdapterInstance } from 'openetl';

const OAuthAdapter: HttpAdapter = {
  id: 'oauth-api',
  name: 'OAuth API Adapter',
  type: 'http',
  action: ['download'],
  credential_type: 'oauth2',
  base_url: 'https://api.example.com',
  endpoints: [
    { id: 'resources', path: '/resources', method: 'GET', supported_actions: ['download'] },
  ],
  // OAuth helpers for authorization flow
  helpers: {
    getCode(redirectUrl: string, clientId: string): string {
      const params = new URLSearchParams({
        client_id: clientId,
        redirect_uri: redirectUrl,
        response_type: 'code',
        scope: 'read',
      });
      return `https://api.example.com/oauth/authorize?${params}`;
    },

    async getTokens(
      redirectUrl: string,
      clientId: string,
      clientSecret: string,
      params: { code: string }
    ): Promise<{ access_token: string; refresh_token: string }> {
      const response = await fetch('https://api.example.com/oauth/token', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          grant_type: 'authorization_code',
          code: params.code,
          client_id: clientId,
          client_secret: clientSecret,
          redirect_uri: redirectUrl,
        }),
      });
      return response.json();
    },
  },
};

function oauthAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
  async function refreshToken(): Promise<string> {
    const response = await fetch('https://api.example.com/oauth/token', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        grant_type: 'refresh_token',
        refresh_token: auth.credentials.refresh_token,
        client_id: auth.credentials.client_id,
        client_secret: auth.credentials.client_secret,
      }),
    });
    const data = await response.json();
    auth.credentials.access_token = data.access_token;
    return data.access_token;
  }

  return {
    async download({ limit, offset }) {
      let token = auth.credentials.access_token;

      const response = await fetch(
        `${OAuthAdapter.base_url}/resources?limit=${limit}&offset=${offset}`,
        { headers: { 'Authorization': `Bearer ${token}` } }
      );

      // Handle token refresh
      if (response.status === 401) {
        token = await refreshToken();
        const retryResponse = await fetch(
          `${OAuthAdapter.base_url}/resources?limit=${limit}&offset=${offset}`,
          { headers: { 'Authorization': `Bearer ${token}` } }
        );
        return { data: (await retryResponse.json()).data };
      }

      return { data: (await response.json()).data };
    },
  };
}

export { oauthAdapter, OAuthAdapter };

Error Handling

Use AdapterError for consistent error reporting:

import { AdapterError } from 'openetl';

// Connection error (retryable)
throw new AdapterError(
  'Connection timeout',
  'CONNECTION_FAILED',
  true,  // retryable
  { host: 'example.com', timeout: 30000 }  // context
);

// Authentication error (not retryable)
throw new AdapterError(
  'Invalid API key',
  'AUTHENTICATION_FAILED',
  false
);

// Rate limit (retryable)
throw new AdapterError(
  'Rate limit exceeded',
  'RATE_LIMITED',
  true,
  { retryAfter: 60 }
);

Error Codes

Code Description Typically Retryable
CONNECTION_FAILED Failed to connect Yes
AUTHENTICATION_FAILED Invalid credentials No
DOWNLOAD_FAILED Error fetching data Depends
UPLOAD_FAILED Error sending data Depends
RATE_LIMITED Rate limit exceeded Yes
TIMEOUT Operation timed out Yes
INVALID_CONFIG Invalid configuration No

Testing

Unit Tests

import { myAdapter, MyAdapter } from './my-adapter';

describe('MyAdapter', () => {
  const mockConnector = {
    id: 'test',
    endpoint_id: 'records',
    credential_id: 'test-auth',
    fields: ['id', 'name'],
  };

  const mockAuth = {
    id: 'test-auth',
    type: 'api_key' as const,
    credentials: { api_key: 'test-key' },
  };

  it('downloads data with pagination', async () => {
    // Mock fetch
    global.fetch = jest.fn().mockResolvedValue({
      ok: true,
      json: () => Promise.resolve({
        items: [{ id: 1, name: 'Test' }],
        nextCursor: 'abc123',
      }),
    });

    const adapter = myAdapter(mockConnector, mockAuth);
    const result = await adapter.download({ limit: 10, offset: 0 });

    expect(result.data).toHaveLength(1);
    expect(result.options?.nextOffset).toBe('abc123');
  });

  it('handles API errors', async () => {
    global.fetch = jest.fn().mockResolvedValue({
      ok: false,
      status: 500,
    });

    const adapter = myAdapter(mockConnector, mockAuth);

    await expect(adapter.download({ limit: 10, offset: 0 }))
      .rejects.toThrow('API request failed');
  });
});

Integration Tests

describe('MyAdapter Integration', () => {
  // Skip in CI without credentials
  const runIntegration = process.env.MY_API_KEY ? it : it.skip;

  runIntegration('fetches real data', async () => {
    const connector = {
      id: 'integration-test',
      endpoint_id: 'records',
      credential_id: 'real-auth',
      fields: [],
    };

    const auth = {
      id: 'real-auth',
      type: 'api_key' as const,
      credentials: { api_key: process.env.MY_API_KEY! },
    };

    const adapter = myAdapter(connector, auth);
    await adapter.connect?.();
    const result = await adapter.download({ limit: 10, offset: 0 });

    expect(result.data).toBeDefined();
    expect(Array.isArray(result.data)).toBe(true);

    await adapter.disconnect?.();
  });
});

Best Practices

Implementation

  1. Use AdapterError - Consistent error handling with proper codes and retryability
  2. Support pagination - Return nextOffset for large datasets
  3. Handle rate limits - Return retryable errors for HTTP 429
  4. Validate configuration - Check required config properties early
  5. Clean up resources - Implement disconnect for database connections

Security

  1. Never log credentials - Avoid logging auth objects
  2. Use parameterized queries - Prevent SQL injection in database adapters
  3. Validate operators - Whitelist allowed filter operators
  4. Escape identifiers - Properly escape table/field names

Performance

  1. Connection pooling - Reuse database connections
  2. Batch operations - Upload multiple records per request when supported
  3. Implement timeouts - Set reasonable request timeouts

Publishing

To publish an adapter to the OpenETL ecosystem:

  1. Create npm package: @openetl/adapter-name
  2. Include TypeScript definitions
  3. Add comprehensive README with examples
  4. Include test suite
  5. Submit PR to OpenETL repository

Related