PostgreSQL Adapter
The PostgreSQL adapter provides secure, production-ready database connectivity with full SQL support and parameterized queries.
Installation
npm install @openetl/postgresql
Features
- SQL Injection Protection - All queries use parameterized statements
- Identifier Escaping - Schema, table, and field names are properly escaped
- Operator Validation - Only whitelisted SQL operators allowed
- Full CRUD Support - Download (SELECT) and Upload (INSERT) operations
- Custom Queries - Execute raw SQL for complex use cases
- Schema Introspection - Query table column metadata
- Connection Pooling - Efficient connection management via
pg.Pool
Authentication
The PostgreSQL adapter uses basic authentication:
const vault = {
'my-db': {
id: 'my-db',
type: 'basic',
credentials: {
host: 'localhost', // Default: 'localhost'
port: '5432', // Default: 5432
database: 'myapp', // Required
username: 'user', // Required
password: 'secret', // Required
},
},
};
Quick Start
import { Orchestrator } from 'openetl';
import { postgresql } from '@openetl/postgresql';
const vault = {
'my-db': {
id: 'my-db',
type: 'basic',
credentials: {
host: 'localhost',
port: '5432',
database: 'myapp',
username: 'user',
password: 'secret',
},
},
};
const etl = Orchestrator(vault, { postgresql });
const pipeline = {
id: 'export-users',
source: {
id: 'users-source',
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'my-db',
config: {
schema: 'public',
table: 'users',
},
fields: ['id', 'email', 'name', 'created_at'],
filters: [
{ field: 'active', operator: '=', value: 'true' },
],
sort: [{ field: 'created_at', type: 'desc' }],
pagination: { type: 'offset', itemsPerPage: 100 },
},
};
const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);
Endpoints
table_query
Query data from a table with filters, sorting, and pagination.
| Config Property | Required | Description |
|---|---|---|
schema |
Yes | Database schema name |
table |
Yes | Table name |
const connector = {
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'my-db',
config: {
schema: 'public',
table: 'users',
},
fields: ['id', 'name', 'email'], // Empty array = SELECT *
filters: [
{ field: 'status', operator: '=', value: 'active' },
],
sort: [
{ field: 'created_at', type: 'desc' },
],
pagination: { type: 'offset', itemsPerPage: 100 },
};
Generated Query:
SELECT "id", "name", "email"
FROM "public"."users"
WHERE "status" = $1
ORDER BY "created_at" DESC
LIMIT $2 OFFSET $3
table_columns
Get column metadata for schema introspection.
const connector = {
adapter_id: 'postgresql',
endpoint_id: 'table_columns',
credential_id: 'my-db',
config: {
schema: 'public',
table: 'users',
},
fields: [],
};
Returns:
[
{ "column_name": "id", "data_type": "integer", "is_nullable": "NO", "column_default": "nextval('users_id_seq')" },
{ "column_name": "email", "data_type": "character varying", "is_nullable": "NO", "column_default": null },
{ "column_name": "name", "data_type": "character varying", "is_nullable": "YES", "column_default": null }
]
custom_query
Execute a custom SQL query for complex use cases.
const connector = {
adapter_id: 'postgresql',
endpoint_id: 'custom_query',
credential_id: 'my-db',
config: {
custom_query: `
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2024-01-01'
GROUP BY u.id, u.name
HAVING COUNT(o.id) > 5
`,
},
fields: [],
};
Warning: Custom queries bypass parameterization. Only use with trusted input.
table_insert
Insert data into a table.
const connector = {
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'my-db',
config: {
schema: 'public',
table: 'contacts',
},
fields: ['email', 'name', 'company'],
};
Generated Query:
INSERT INTO "public"."contacts" ("email", "name", "company")
VALUES ($1, $2, $3), ($4, $5, $6), ...
Filtering
Simple Filters
filters: [
{ field: 'status', operator: '=', value: 'active' },
{ field: 'age', operator: '>=', value: '18' },
{ field: 'email', operator: 'LIKE', value: '%@example.com' },
{ field: 'deleted_at', operator: 'IS NULL', value: '' },
]
Filter Groups (AND/OR)
filters: [
{
op: 'OR',
filters: [
{ field: 'role', operator: '=', value: 'admin' },
{ field: 'role', operator: '=', value: 'moderator' },
],
},
]
Supported Operators
| Operator | Description |
|---|---|
= |
Equal |
!=, <> |
Not equal |
<, >, <=, >= |
Comparison |
LIKE, ILIKE |
Pattern matching |
NOT LIKE, NOT ILIKE |
Negated pattern matching |
IN, NOT IN |
Set membership |
IS NULL, IS NOT NULL |
Null checks |
BETWEEN, NOT BETWEEN |
Range checks |
Security
SQL Injection Protection
All user-provided values are passed as parameters:
// User input
filters: [{ field: 'name', operator: '=', value: "Robert'; DROP TABLE users;--" }]
// Generated query (safe)
// SELECT ... WHERE "name" = $1
// Parameters: ["Robert'; DROP TABLE users;--"]
Identifier Escaping
Schema, table, and field names are escaped with double quotes:
config: { schema: 'my"schema', table: 'user data' }
// Generated: FROM "my""schema"."user data"
Operator Validation
Only whitelisted operators are allowed. Invalid operators throw an error before reaching the database.
Complete Example
import { Orchestrator, Pipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';
const vault = {
'source-db': {
id: 'source-db',
type: 'basic',
credentials: {
host: 'source.example.com',
database: 'production',
username: 'readonly',
password: process.env.SOURCE_DB_PASSWORD,
},
},
'target-db': {
id: 'target-db',
type: 'basic',
credentials: {
host: 'analytics.example.com',
database: 'analytics',
username: 'writer',
password: process.env.TARGET_DB_PASSWORD,
},
},
};
const etl = Orchestrator(vault, { postgresql });
const pipeline: Pipeline = {
id: 'sync-orders',
source: {
id: 'orders-source',
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'source-db',
config: { schema: 'public', table: 'orders' },
fields: ['id', 'customer_id', 'total', 'status', 'created_at'],
filters: [
{ field: 'status', operator: '=', value: 'completed' },
{ field: 'created_at', operator: '>=', value: '2024-01-01' },
],
pagination: { type: 'offset', itemsPerPage: 1000 },
},
target: {
id: 'orders-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'target-db',
config: { schema: 'analytics', table: 'orders_archive' },
fields: ['id', 'customer_id', 'total', 'status', 'created_at'],
},
error_handling: {
max_retries: 3,
retry_interval: 1000,
fail_on_error: true,
},
};
const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} orders`);