Connecting AI Agents to Data Sources with OpenETL
How to give your AI agents access to real-world data using an abstract, secure approach
Introduction
AI agents are becoming increasingly powerful, but their usefulness is limited by their ability to access real-world data. An AI assistant that can only work with information provided in the prompt is far less useful than one that can query databases, fetch CRM records, or retrieve analytics data on demand.
In this article, we'll explore how to connect AI agents to multiple data sources using OpenETL's abstract adapter pattern. This approach provides:
- Unified interface: One API for all data sources
- Security: Credential isolation and parameterized queries
- Flexibility: Easy to add new data sources
- Type safety: Compile-time validation
The Challenge
When building AI agents that interact with data, developers face several challenges:
- Multiple APIs: Each data source has its own API, authentication, and query format
- Security risks: AI-generated queries could be vulnerable to injection attacks
- Credential management: Keeping API keys and passwords secure
- Context limitations: LLMs have token limits; you can't dump entire databases into context
Architecture Overview
Here's how OpenETL fits into an AI agent architecture:
┌─────────────────────────────────────────────────────────────┐
│ AI Agent │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ LLM │───▶│ Tool Router │───▶│ OpenETL │ │
│ │ (GPT/Claude)│ │ │ │ Orchestrator │ │
│ └─────────────┘ └──────────────┘ └───────┬───────┘ │
└─────────────────────────────────────────────────┼───────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │PostgreSQL│ │ HubSpot │ │ Stripe │ │ Xero │ │
│ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Database CRM API Payments Accounting │
└──────────────────────────────────────────────────────────┘
Setting Up OpenETL for AI Agents
Step 1: Install Dependencies
npm install openetl
npm install @openetl/postgresql @openetl/hubspot @openetl/stripe
Step 2: Configure the Vault
Store credentials securely, separate from your agent code:
import { Vault } from 'openetl';
const vault: Vault = {
'company-database': {
id: 'company-database',
type: 'basic',
credentials: {
host: process.env.DB_HOST,
port: process.env.DB_PORT,
database: process.env.DB_NAME,
username: process.env.DB_USER,
password: process.env.DB_PASSWORD,
},
},
'hubspot-crm': {
id: 'hubspot-crm',
type: 'oauth2',
credentials: {
client_id: process.env.HUBSPOT_CLIENT_ID,
client_secret: process.env.HUBSPOT_CLIENT_SECRET,
access_token: process.env.HUBSPOT_ACCESS_TOKEN,
refresh_token: process.env.HUBSPOT_REFRESH_TOKEN,
},
},
'stripe-payments': {
id: 'stripe-payments',
type: 'api_key',
credentials: {
api_key: process.env.STRIPE_SECRET_KEY,
},
},
};
Step 3: Create the Data Access Layer
Build a service that exposes data operations to your AI agent:
import { Orchestrator, Pipeline, validatePipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';
import { hubspot } from '@openetl/hubspot';
import { stripe } from '@openetl/stripe';
const adapters = { postgresql, hubspot, stripe };
class DataService {
private etl;
constructor(vault: Vault) {
this.etl = Orchestrator(vault, adapters);
}
async queryDatabase(table: string, filters?: any[], limit = 100) {
const pipeline: Pipeline = {
id: `query-${table}-${Date.now()}`,
source: {
id: 'db-source',
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'company-database',
fields: [], // All fields
filters,
config: { schema: 'public', table },
limit,
},
};
const result = await this.etl.runPipeline(pipeline);
return result.data;
}
async getContacts(filters?: any[], limit = 50) {
const pipeline: Pipeline = {
id: `contacts-${Date.now()}`,
source: {
id: 'hubspot-source',
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hubspot-crm',
fields: ['email', 'firstname', 'lastname', 'company', 'phone'],
filters,
pagination: { type: 'cursor', itemsPerPage: limit },
},
};
const result = await this.etl.runPipeline(pipeline);
return result.data;
}
async getPayments(customerId?: string, limit = 20) {
const filters = customerId
? [{ field: 'customer', operator: '=', value: customerId }]
: undefined;
const pipeline: Pipeline = {
id: `payments-${Date.now()}`,
source: {
id: 'stripe-source',
adapter_id: 'stripe',
endpoint_id: 'charges',
credential_id: 'stripe-payments',
fields: ['id', 'amount', 'currency', 'status', 'created'],
filters,
limit,
},
};
const result = await this.etl.runPipeline(pipeline);
return result.data;
}
}
Integrating with AI Agents
Tool Definitions
Define tools that your AI agent can use. Here's an example for OpenAI's function calling:
const tools = [
{
type: 'function',
function: {
name: 'query_database',
description: 'Query the company database for records',
parameters: {
type: 'object',
properties: {
table: {
type: 'string',
description: 'Table name (users, orders, products)',
},
filters: {
type: 'array',
items: {
type: 'object',
properties: {
field: { type: 'string' },
operator: { type: 'string', enum: ['=', '!=', '>', '<', '>=', '<=', 'LIKE'] },
value: { type: 'string' },
},
},
description: 'Filter conditions',
},
limit: {
type: 'number',
description: 'Maximum records to return (default 100)',
},
},
required: ['table'],
},
},
},
{
type: 'function',
function: {
name: 'get_crm_contacts',
description: 'Fetch contacts from HubSpot CRM',
parameters: {
type: 'object',
properties: {
email: {
type: 'string',
description: 'Filter by email address',
},
company: {
type: 'string',
description: 'Filter by company name',
},
limit: {
type: 'number',
description: 'Maximum contacts to return',
},
},
},
},
},
{
type: 'function',
function: {
name: 'get_payments',
description: 'Fetch payment records from Stripe',
parameters: {
type: 'object',
properties: {
customer_id: {
type: 'string',
description: 'Stripe customer ID to filter by',
},
limit: {
type: 'number',
description: 'Maximum payments to return',
},
},
},
},
},
];
Tool Execution Handler
async function executeTool(
toolName: string,
args: any,
dataService: DataService
): Promise<string> {
try {
let result;
switch (toolName) {
case 'query_database':
result = await dataService.queryDatabase(
args.table,
args.filters,
args.limit
);
break;
case 'get_crm_contacts':
const contactFilters = [];
if (args.email) {
contactFilters.push({ field: 'email', operator: '=', value: args.email });
}
if (args.company) {
contactFilters.push({ field: 'company', operator: 'LIKE', value: `%${args.company}%` });
}
result = await dataService.getContacts(
contactFilters.length > 0 ? contactFilters : undefined,
args.limit
);
break;
case 'get_payments':
result = await dataService.getPayments(args.customer_id, args.limit);
break;
default:
return JSON.stringify({ error: `Unknown tool: ${toolName}` });
}
return JSON.stringify(result);
} catch (error) {
return JSON.stringify({
error: error instanceof Error ? error.message : 'Unknown error',
});
}
}
Complete Agent Example
import OpenAI from 'openai';
const openai = new OpenAI();
const dataService = new DataService(vault);
async function runAgent(userMessage: string) {
const messages: any[] = [
{
role: 'system',
content: `You are a helpful assistant with access to company data.
You can query the database, fetch CRM contacts, and retrieve payment information.
Always use the available tools to get accurate, up-to-date information.`,
},
{ role: 'user', content: userMessage },
];
while (true) {
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages,
tools,
tool_choice: 'auto',
});
const message = response.choices[0].message;
messages.push(message);
// Check if we need to execute tools
if (message.tool_calls && message.tool_calls.length > 0) {
for (const toolCall of message.tool_calls) {
const args = JSON.parse(toolCall.function.arguments);
const result = await executeTool(
toolCall.function.name,
args,
dataService
);
messages.push({
role: 'tool',
tool_call_id: toolCall.id,
content: result,
});
}
} else {
// No more tool calls, return the final response
return message.content;
}
}
}
// Usage
const answer = await runAgent(
"How many orders did customer [email protected] place last month?"
);
console.log(answer);
Security Considerations
SQL Injection Protection
OpenETL's database adapters use parameterized queries, protecting against SQL injection:
// User input is safely parameterized
const filters = [
{ field: 'email', operator: '=', value: userProvidedEmail }
];
// Generated query uses placeholders:
// SELECT * FROM users WHERE "email" = $1
// Values: [userProvidedEmail]
Operator Whitelisting
Only approved operators are allowed:
const ALLOWED_OPERATORS = [
'=', '!=', '<>', '<', '>', '<=', '>=',
'LIKE', 'ILIKE', 'NOT LIKE',
'IN', 'NOT IN', 'IS NULL', 'IS NOT NULL',
];
// Invalid operators throw errors before reaching the database
Credential Isolation
Credentials are stored in the vault, never exposed to the AI agent:
// The AI only sees credential IDs, never actual secrets
const pipeline = {
source: {
credential_id: 'company-database', // Just an ID
// The actual password is never in the pipeline
},
};
Rate Limiting
Prevent abuse with built-in rate limiting:
const pipeline: Pipeline = {
id: 'agent-query',
source: { /* ... */ },
error_handling: {
max_retries: 3,
retry_interval: 1000,
},
};
Advanced Patterns
Schema Discovery
Let the AI agent discover available tables and fields:
async function getTableSchema(table: string) {
const pipeline: Pipeline = {
id: `schema-${table}`,
source: {
id: 'schema-source',
adapter_id: 'postgresql',
endpoint_id: 'table_columns',
credential_id: 'company-database',
fields: [],
config: { schema: 'public', table },
},
};
const result = await this.etl.runPipeline(pipeline);
return result.data; // Returns column names and types
}
Add a tool for schema discovery:
{
type: 'function',
function: {
name: 'get_table_schema',
description: 'Get the columns and data types for a database table',
parameters: {
type: 'object',
properties: {
table: { type: 'string', description: 'Table name' },
},
required: ['table'],
},
},
}
Cross-Source Queries
Combine data from multiple sources:
async function getCustomerOverview(email: string) {
// Get contact from CRM
const [contact] = await dataService.getContacts(
[{ field: 'email', operator: '=', value: email }],
1
);
// Get orders from database
const orders = await dataService.queryDatabase('orders', [
{ field: 'customer_email', operator: '=', value: email },
]);
// Get payments from Stripe
const payments = await dataService.getPayments(contact?.stripe_id);
return {
contact,
orders,
payments,
summary: {
totalOrders: orders.length,
totalSpent: payments.reduce((sum, p) => sum + p.amount, 0) / 100,
},
};
}
Caching for Performance
Implement caching to reduce API calls:
import NodeCache from 'node-cache';
const cache = new NodeCache({ stdTTL: 300 }); // 5 minute TTL
async function cachedQuery(key: string, queryFn: () => Promise<any>) {
const cached = cache.get(key);
if (cached) return cached;
const result = await queryFn();
cache.set(key, result);
return result;
}
// Usage
const contacts = await cachedQuery(
`contacts-${company}`,
() => dataService.getContacts([{ field: 'company', operator: '=', value: company }])
);
Example Conversations
Customer Lookup
User: "Find all contacts from Acme Corp and show their recent orders"
Agent: Uses get_crm_contacts with company filter, then query_database for orders
"I found 3 contacts from Acme Corp:
- John Smith ([email protected]) - 5 orders totaling $2,450
- Jane Doe ([email protected]) - 2 orders totaling $890
- Bob Wilson ([email protected]) - No orders yet"
Payment Investigation
User: "Why did the payment for order #12345 fail?"
Agent: Uses query_database to get order details, then get_payments with customer ID
"Order #12345 for customer [email protected] had a payment failure. The Stripe charge (ch_xxx) was declined with reason 'insufficient_funds'. The customer has a total of 3 failed payment attempts this month."
Conclusion
By combining AI agents with OpenETL's abstract data access layer, you can build powerful assistants that:
- Access multiple data sources through a unified interface
- Maintain security with parameterized queries and credential isolation
- Scale easily by adding new adapters without changing agent code
- Stay type-safe with TypeScript validation
This pattern keeps your AI agent focused on reasoning and natural language, while OpenETL handles the complexity of data integration.
Resources
OpenETL is open source under the MIT license.