MySQL Adapter

The MySQL adapter provides secure, production-ready database connectivity with full SQL support and parameterized queries.

Installation

npm install @openetl/mysql

Features

  • SQL Injection Protection - All queries use parameterized statements
  • Identifier Escaping - Database, table, and field names escaped with backticks
  • Operator Validation - Only whitelisted SQL operators allowed
  • Full CRUD Support - Download (SELECT) and Upload (INSERT) operations
  • Custom Queries - Execute raw SQL for complex use cases
  • Schema Introspection - Query table column metadata
  • Connection Pooling - Efficient connection management via mysql2

Authentication

The MySQL adapter uses basic authentication:

const vault = {
  'my-db': {
    id: 'my-db',
    type: 'basic',
    credentials: {
      host: 'localhost',      // Default: 'localhost'
      port: '3306',           // Default: 3306
      database: 'myapp',      // Required
      username: 'user',       // Required
      password: 'secret',     // Required
    },
  },
};

Quick Start

import { Orchestrator } from 'openetl';
import { mysql } from '@openetl/mysql';

const vault = {
  'my-db': {
    id: 'my-db',
    type: 'basic',
    credentials: {
      host: 'localhost',
      port: '3306',
      database: 'myapp',
      username: 'user',
      password: 'secret',
    },
  },
};

const etl = Orchestrator(vault, { mysql });

const pipeline = {
  id: 'export-users',
  source: {
    id: 'users-source',
    adapter_id: 'mysql',
    endpoint_id: 'table_query',
    credential_id: 'my-db',
    config: {
      database: 'myapp',
      table: 'users',
    },
    fields: ['id', 'email', 'name', 'created_at'],
    filters: [
      { field: 'active', operator: '=', value: '1' },
    ],
    sort: [{ field: 'created_at', type: 'desc' }],
    pagination: { type: 'offset', itemsPerPage: 100 },
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);

Endpoints

table_query

Query data from a table with filters, sorting, and pagination.

Config Property Required Description
database Yes Database name
table Yes Table name
const connector = {
  adapter_id: 'mysql',
  endpoint_id: 'table_query',
  credential_id: 'my-db',
  config: {
    database: 'myapp',
    table: 'users',
  },
  fields: ['id', 'name', 'email'],  // Empty array = SELECT *
  filters: [
    { field: 'status', operator: '=', value: 'active' },
  ],
  sort: [
    { field: 'created_at', type: 'desc' },
  ],
  pagination: { type: 'offset', itemsPerPage: 100 },
};

Generated Query:

SELECT `id`, `name`, `email`
FROM `myapp`.`users`
WHERE `status` = ?
ORDER BY `created_at` DESC
LIMIT ? OFFSET ?

table_columns

Get column metadata for schema introspection.

const connector = {
  adapter_id: 'mysql',
  endpoint_id: 'table_columns',
  credential_id: 'my-db',
  config: {
    database: 'myapp',
    table: 'users',
  },
  fields: [],
};

Returns:

[
  { "column_name": "id", "data_type": "int", "is_nullable": "NO", "column_default": null },
  { "column_name": "email", "data_type": "varchar", "is_nullable": "NO", "column_default": null },
  { "column_name": "name", "data_type": "varchar", "is_nullable": "YES", "column_default": null }
]

custom_query

Execute a custom SQL query for complex use cases.

const connector = {
  adapter_id: 'mysql',
  endpoint_id: 'custom_query',
  credential_id: 'my-db',
  config: {
    custom_query: `
      SELECT u.id, u.name, COUNT(o.id) as order_count
      FROM users u
      LEFT JOIN orders o ON u.id = o.user_id
      WHERE u.created_at > '2024-01-01'
      GROUP BY u.id, u.name
      HAVING COUNT(o.id) > 5
    `,
  },
  fields: [],
};

Warning: Custom queries bypass parameterization. Only use with trusted input.

table_insert

Insert data into a table.

const connector = {
  adapter_id: 'mysql',
  endpoint_id: 'table_insert',
  credential_id: 'my-db',
  config: {
    database: 'myapp',
    table: 'contacts',
  },
  fields: ['email', 'name', 'company'],
};

Generated Query:

INSERT INTO `myapp`.`contacts` (`email`, `name`, `company`)
VALUES (?, ?, ?), (?, ?, ?), ...

Filtering

Simple Filters

filters: [
  { field: 'status', operator: '=', value: 'active' },
  { field: 'age', operator: '>=', value: '18' },
  { field: 'email', operator: 'LIKE', value: '%@example.com' },
  { field: 'deleted_at', operator: 'IS NULL', value: '' },
]

Filter Groups (AND/OR)

filters: [
  {
    op: 'OR',
    filters: [
      { field: 'role', operator: '=', value: 'admin' },
      { field: 'role', operator: '=', value: 'moderator' },
    ],
  },
]

Supported Operators

Operator Description
= Equal
!=, <> Not equal
<, >, <=, >= Comparison
LIKE Pattern matching (case-insensitive by default in MySQL)
NOT LIKE Negated pattern matching
IN, NOT IN Set membership
IS NULL, IS NOT NULL Null checks
BETWEEN, NOT BETWEEN Range checks

Differences from PostgreSQL

Feature MySQL PostgreSQL
Identifier escaping Backticks (`) Double quotes (")
Config property database schema
Case-insensitive LIKE Default behavior Use ILIKE
Boolean values '1' / '0' 'true' / 'false'

Security

SQL Injection Protection

All user-provided values are passed as parameters:

// User input
filters: [{ field: 'name', operator: '=', value: "Robert'; DROP TABLE users;--" }]

// Generated query (safe)
// SELECT ... WHERE `name` = ?
// Parameters: ["Robert'; DROP TABLE users;--"]

Identifier Escaping

Database, table, and field names are escaped with backticks:

config: { database: 'my`db', table: 'user data' }
// Generated: FROM `my``db`.`user data`

Complete Example

import { Orchestrator, Pipeline } from 'openetl';
import { mysql } from '@openetl/mysql';
import { postgresql } from '@openetl/postgresql';

const vault = {
  'mysql-source': {
    id: 'mysql-source',
    type: 'basic',
    credentials: {
      host: 'mysql.example.com',
      database: 'production',
      username: 'readonly',
      password: process.env.MYSQL_PASSWORD,
    },
  },
  'pg-target': {
    id: 'pg-target',
    type: 'basic',
    credentials: {
      host: 'postgres.example.com',
      database: 'analytics',
      username: 'writer',
      password: process.env.PG_PASSWORD,
    },
  },
};

const etl = Orchestrator(vault, { mysql, postgresql });

const pipeline: Pipeline = {
  id: 'mysql-to-postgres',
  source: {
    id: 'orders-source',
    adapter_id: 'mysql',
    endpoint_id: 'table_query',
    credential_id: 'mysql-source',
    config: { database: 'shop', table: 'orders' },
    fields: ['id', 'customer_id', 'total', 'status'],
    pagination: { type: 'offset', itemsPerPage: 1000 },
  },
  target: {
    id: 'orders-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'pg-target',
    config: { schema: 'analytics', table: 'orders' },
    fields: ['id', 'customer_id', 'total', 'status'],
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Migrated ${result.data.length} orders`);

Related