PostgreSQL Adapter

The PostgreSQL adapter provides secure, production-ready database connectivity with full SQL support and parameterized queries.

Installation

npm install @openetl/postgresql

Features

  • SQL Injection Protection - All queries use parameterized statements
  • Identifier Escaping - Schema, table, and field names are properly escaped
  • Operator Validation - Only whitelisted SQL operators allowed
  • Full CRUD Support - Download (SELECT) and Upload (INSERT) operations
  • Custom Queries - Execute raw SQL for complex use cases
  • Schema Introspection - Query table column metadata
  • Connection Pooling - Efficient connection management via pg.Pool

Authentication

The PostgreSQL adapter uses basic authentication:

const vault = {
  'my-db': {
    id: 'my-db',
    type: 'basic',
    credentials: {
      host: 'localhost',      // Default: 'localhost'
      port: '5432',           // Default: 5432
      database: 'myapp',      // Required
      username: 'user',       // Required
      password: 'secret',     // Required
    },
  },
};

Quick Start

import { Orchestrator } from 'openetl';
import { postgresql } from '@openetl/postgresql';

const vault = {
  'my-db': {
    id: 'my-db',
    type: 'basic',
    credentials: {
      host: 'localhost',
      port: '5432',
      database: 'myapp',
      username: 'user',
      password: 'secret',
    },
  },
};

const etl = Orchestrator(vault, { postgresql });

const pipeline = {
  id: 'export-users',
  source: {
    id: 'users-source',
    adapter_id: 'postgresql',
    endpoint_id: 'table_query',
    credential_id: 'my-db',
    config: {
      schema: 'public',
      table: 'users',
    },
    fields: ['id', 'email', 'name', 'created_at'],
    filters: [
      { field: 'active', operator: '=', value: 'true' },
    ],
    sort: [{ field: 'created_at', type: 'desc' }],
    pagination: { type: 'offset', itemsPerPage: 100 },
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);

Endpoints

table_query

Query data from a table with filters, sorting, and pagination.

Config Property Required Description
schema Yes Database schema name
table Yes Table name
const connector = {
  adapter_id: 'postgresql',
  endpoint_id: 'table_query',
  credential_id: 'my-db',
  config: {
    schema: 'public',
    table: 'users',
  },
  fields: ['id', 'name', 'email'],  // Empty array = SELECT *
  filters: [
    { field: 'status', operator: '=', value: 'active' },
  ],
  sort: [
    { field: 'created_at', type: 'desc' },
  ],
  pagination: { type: 'offset', itemsPerPage: 100 },
};

Generated Query:

SELECT "id", "name", "email"
FROM "public"."users"
WHERE "status" = $1
ORDER BY "created_at" DESC
LIMIT $2 OFFSET $3

table_columns

Get column metadata for schema introspection.

const connector = {
  adapter_id: 'postgresql',
  endpoint_id: 'table_columns',
  credential_id: 'my-db',
  config: {
    schema: 'public',
    table: 'users',
  },
  fields: [],
};

Returns:

[
  { "column_name": "id", "data_type": "integer", "is_nullable": "NO", "column_default": "nextval('users_id_seq')" },
  { "column_name": "email", "data_type": "character varying", "is_nullable": "NO", "column_default": null },
  { "column_name": "name", "data_type": "character varying", "is_nullable": "YES", "column_default": null }
]

custom_query

Execute a custom SQL query for complex use cases.

const connector = {
  adapter_id: 'postgresql',
  endpoint_id: 'custom_query',
  credential_id: 'my-db',
  config: {
    custom_query: `
      SELECT u.id, u.name, COUNT(o.id) as order_count
      FROM users u
      LEFT JOIN orders o ON u.id = o.user_id
      WHERE u.created_at > '2024-01-01'
      GROUP BY u.id, u.name
      HAVING COUNT(o.id) > 5
    `,
  },
  fields: [],
};

Warning: Custom queries bypass parameterization. Only use with trusted input.

table_insert

Insert data into a table.

const connector = {
  adapter_id: 'postgresql',
  endpoint_id: 'table_insert',
  credential_id: 'my-db',
  config: {
    schema: 'public',
    table: 'contacts',
  },
  fields: ['email', 'name', 'company'],
};

Generated Query:

INSERT INTO "public"."contacts" ("email", "name", "company")
VALUES ($1, $2, $3), ($4, $5, $6), ...

Filtering

Simple Filters

filters: [
  { field: 'status', operator: '=', value: 'active' },
  { field: 'age', operator: '>=', value: '18' },
  { field: 'email', operator: 'LIKE', value: '%@example.com' },
  { field: 'deleted_at', operator: 'IS NULL', value: '' },
]

Filter Groups (AND/OR)

filters: [
  {
    op: 'OR',
    filters: [
      { field: 'role', operator: '=', value: 'admin' },
      { field: 'role', operator: '=', value: 'moderator' },
    ],
  },
]

Supported Operators

Operator Description
= Equal
!=, <> Not equal
<, >, <=, >= Comparison
LIKE, ILIKE Pattern matching
NOT LIKE, NOT ILIKE Negated pattern matching
IN, NOT IN Set membership
IS NULL, IS NOT NULL Null checks
BETWEEN, NOT BETWEEN Range checks

Security

SQL Injection Protection

All user-provided values are passed as parameters:

// User input
filters: [{ field: 'name', operator: '=', value: "Robert'; DROP TABLE users;--" }]

// Generated query (safe)
// SELECT ... WHERE "name" = $1
// Parameters: ["Robert'; DROP TABLE users;--"]

Identifier Escaping

Schema, table, and field names are escaped with double quotes:

config: { schema: 'my"schema', table: 'user data' }
// Generated: FROM "my""schema"."user data"

Operator Validation

Only whitelisted operators are allowed. Invalid operators throw an error before reaching the database.

Complete Example

import { Orchestrator, Pipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';

const vault = {
  'source-db': {
    id: 'source-db',
    type: 'basic',
    credentials: {
      host: 'source.example.com',
      database: 'production',
      username: 'readonly',
      password: process.env.SOURCE_DB_PASSWORD,
    },
  },
  'target-db': {
    id: 'target-db',
    type: 'basic',
    credentials: {
      host: 'analytics.example.com',
      database: 'analytics',
      username: 'writer',
      password: process.env.TARGET_DB_PASSWORD,
    },
  },
};

const etl = Orchestrator(vault, { postgresql });

const pipeline: Pipeline = {
  id: 'sync-orders',
  source: {
    id: 'orders-source',
    adapter_id: 'postgresql',
    endpoint_id: 'table_query',
    credential_id: 'source-db',
    config: { schema: 'public', table: 'orders' },
    fields: ['id', 'customer_id', 'total', 'status', 'created_at'],
    filters: [
      { field: 'status', operator: '=', value: 'completed' },
      { field: 'created_at', operator: '>=', value: '2024-01-01' },
    ],
    pagination: { type: 'offset', itemsPerPage: 1000 },
  },
  target: {
    id: 'orders-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'target-db',
    config: { schema: 'analytics', table: 'orders_archive' },
    fields: ['id', 'customer_id', 'total', 'status', 'created_at'],
  },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,
    fail_on_error: true,
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} orders`);

Related