Connecting AI Agents to Data Sources with OpenETL

How to give your AI agents access to real-world data using an abstract, secure approach


Introduction

AI agents are becoming increasingly powerful, but their usefulness is limited by their ability to access real-world data. An AI assistant that can only work with information provided in the prompt is far less useful than one that can query databases, fetch CRM records, or retrieve analytics data on demand.

In this article, we'll explore how to connect AI agents to multiple data sources using OpenETL's abstract adapter pattern. This approach provides:

  • Unified interface: One API for all data sources
  • Security: Credential isolation and parameterized queries
  • Flexibility: Easy to add new data sources
  • Type safety: Compile-time validation

The Challenge

When building AI agents that interact with data, developers face several challenges:

  1. Multiple APIs: Each data source has its own API, authentication, and query format
  2. Security risks: AI-generated queries could be vulnerable to injection attacks
  3. Credential management: Keeping API keys and passwords secure
  4. Context limitations: LLMs have token limits; you can't dump entire databases into context

Architecture Overview

Here's how OpenETL fits into an AI agent architecture:

┌─────────────────────────────────────────────────────────────┐
│                        AI Agent                              │
│  ┌─────────────┐    ┌──────────────┐    ┌───────────────┐  │
│  │   LLM       │───▶│  Tool Router │───▶│  OpenETL      │  │
│  │  (GPT/Claude)│    │              │    │  Orchestrator │  │
│  └─────────────┘    └──────────────┘    └───────┬───────┘  │
└─────────────────────────────────────────────────┼───────────┘
                                                  │
                    ┌─────────────────────────────┼─────────────────────────────┐
                    │                             ▼                             │
                    │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
                    │  │PostgreSQL│  │ HubSpot  │  │  Stripe  │  │   Xero   │ │
                    │  │ Adapter  │  │ Adapter  │  │ Adapter  │  │ Adapter  │ │
                    │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘ │
                    │       │             │             │             │        │
                    │       ▼             ▼             ▼             ▼        │
                    │    Database       CRM API      Payments     Accounting   │
                    └──────────────────────────────────────────────────────────┘

Setting Up OpenETL for AI Agents

Step 1: Install Dependencies

npm install openetl
npm install @openetl/postgresql @openetl/hubspot @openetl/stripe

Step 2: Configure the Vault

Store credentials securely, separate from your agent code:

import { Vault } from 'openetl';

const vault: Vault = {
  'company-database': {
    id: 'company-database',
    type: 'basic',
    credentials: {
      host: process.env.DB_HOST,
      port: process.env.DB_PORT,
      database: process.env.DB_NAME,
      username: process.env.DB_USER,
      password: process.env.DB_PASSWORD,
    },
  },
  'hubspot-crm': {
    id: 'hubspot-crm',
    type: 'oauth2',
    credentials: {
      client_id: process.env.HUBSPOT_CLIENT_ID,
      client_secret: process.env.HUBSPOT_CLIENT_SECRET,
      access_token: process.env.HUBSPOT_ACCESS_TOKEN,
      refresh_token: process.env.HUBSPOT_REFRESH_TOKEN,
    },
  },
  'stripe-payments': {
    id: 'stripe-payments',
    type: 'api_key',
    credentials: {
      api_key: process.env.STRIPE_SECRET_KEY,
    },
  },
};

Step 3: Create the Data Access Layer

Build a service that exposes data operations to your AI agent:

import { Orchestrator, Pipeline, validatePipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';
import { hubspot } from '@openetl/hubspot';
import { stripe } from '@openetl/stripe';

const adapters = { postgresql, hubspot, stripe };

class DataService {
  private etl;

  constructor(vault: Vault) {
    this.etl = Orchestrator(vault, adapters);
  }

  async queryDatabase(table: string, filters?: any[], limit = 100) {
    const pipeline: Pipeline = {
      id: `query-${table}-${Date.now()}`,
      source: {
        id: 'db-source',
        adapter_id: 'postgresql',
        endpoint_id: 'table_query',
        credential_id: 'company-database',
        fields: [],  // All fields
        filters,
        config: { schema: 'public', table },
        limit,
      },
    };

    const result = await this.etl.runPipeline(pipeline);
    return result.data;
  }

  async getContacts(filters?: any[], limit = 50) {
    const pipeline: Pipeline = {
      id: `contacts-${Date.now()}`,
      source: {
        id: 'hubspot-source',
        adapter_id: 'hubspot',
        endpoint_id: 'contacts',
        credential_id: 'hubspot-crm',
        fields: ['email', 'firstname', 'lastname', 'company', 'phone'],
        filters,
        pagination: { type: 'cursor', itemsPerPage: limit },
      },
    };

    const result = await this.etl.runPipeline(pipeline);
    return result.data;
  }

  async getPayments(customerId?: string, limit = 20) {
    const filters = customerId
      ? [{ field: 'customer', operator: '=', value: customerId }]
      : undefined;

    const pipeline: Pipeline = {
      id: `payments-${Date.now()}`,
      source: {
        id: 'stripe-source',
        adapter_id: 'stripe',
        endpoint_id: 'charges',
        credential_id: 'stripe-payments',
        fields: ['id', 'amount', 'currency', 'status', 'created'],
        filters,
        limit,
      },
    };

    const result = await this.etl.runPipeline(pipeline);
    return result.data;
  }
}

Integrating with AI Agents

Tool Definitions

Define tools that your AI agent can use. Here's an example for OpenAI's function calling:

const tools = [
  {
    type: 'function',
    function: {
      name: 'query_database',
      description: 'Query the company database for records',
      parameters: {
        type: 'object',
        properties: {
          table: {
            type: 'string',
            description: 'Table name (users, orders, products)',
          },
          filters: {
            type: 'array',
            items: {
              type: 'object',
              properties: {
                field: { type: 'string' },
                operator: { type: 'string', enum: ['=', '!=', '>', '<', '>=', '<=', 'LIKE'] },
                value: { type: 'string' },
              },
            },
            description: 'Filter conditions',
          },
          limit: {
            type: 'number',
            description: 'Maximum records to return (default 100)',
          },
        },
        required: ['table'],
      },
    },
  },
  {
    type: 'function',
    function: {
      name: 'get_crm_contacts',
      description: 'Fetch contacts from HubSpot CRM',
      parameters: {
        type: 'object',
        properties: {
          email: {
            type: 'string',
            description: 'Filter by email address',
          },
          company: {
            type: 'string',
            description: 'Filter by company name',
          },
          limit: {
            type: 'number',
            description: 'Maximum contacts to return',
          },
        },
      },
    },
  },
  {
    type: 'function',
    function: {
      name: 'get_payments',
      description: 'Fetch payment records from Stripe',
      parameters: {
        type: 'object',
        properties: {
          customer_id: {
            type: 'string',
            description: 'Stripe customer ID to filter by',
          },
          limit: {
            type: 'number',
            description: 'Maximum payments to return',
          },
        },
      },
    },
  },
];

Tool Execution Handler

async function executeTool(
  toolName: string,
  args: any,
  dataService: DataService
): Promise<string> {
  try {
    let result;

    switch (toolName) {
      case 'query_database':
        result = await dataService.queryDatabase(
          args.table,
          args.filters,
          args.limit
        );
        break;

      case 'get_crm_contacts':
        const contactFilters = [];
        if (args.email) {
          contactFilters.push({ field: 'email', operator: '=', value: args.email });
        }
        if (args.company) {
          contactFilters.push({ field: 'company', operator: 'LIKE', value: `%${args.company}%` });
        }
        result = await dataService.getContacts(
          contactFilters.length > 0 ? contactFilters : undefined,
          args.limit
        );
        break;

      case 'get_payments':
        result = await dataService.getPayments(args.customer_id, args.limit);
        break;

      default:
        return JSON.stringify({ error: `Unknown tool: ${toolName}` });
    }

    return JSON.stringify(result);
  } catch (error) {
    return JSON.stringify({
      error: error instanceof Error ? error.message : 'Unknown error',
    });
  }
}

Complete Agent Example

import OpenAI from 'openai';

const openai = new OpenAI();
const dataService = new DataService(vault);

async function runAgent(userMessage: string) {
  const messages: any[] = [
    {
      role: 'system',
      content: `You are a helpful assistant with access to company data.
You can query the database, fetch CRM contacts, and retrieve payment information.
Always use the available tools to get accurate, up-to-date information.`,
    },
    { role: 'user', content: userMessage },
  ];

  while (true) {
    const response = await openai.chat.completions.create({
      model: 'gpt-4',
      messages,
      tools,
      tool_choice: 'auto',
    });

    const message = response.choices[0].message;
    messages.push(message);

    // Check if we need to execute tools
    if (message.tool_calls && message.tool_calls.length > 0) {
      for (const toolCall of message.tool_calls) {
        const args = JSON.parse(toolCall.function.arguments);
        const result = await executeTool(
          toolCall.function.name,
          args,
          dataService
        );

        messages.push({
          role: 'tool',
          tool_call_id: toolCall.id,
          content: result,
        });
      }
    } else {
      // No more tool calls, return the final response
      return message.content;
    }
  }
}

// Usage
const answer = await runAgent(
  "How many orders did customer [email protected] place last month?"
);
console.log(answer);

Security Considerations

SQL Injection Protection

OpenETL's database adapters use parameterized queries, protecting against SQL injection:

// User input is safely parameterized
const filters = [
  { field: 'email', operator: '=', value: userProvidedEmail }
];

// Generated query uses placeholders:
// SELECT * FROM users WHERE "email" = $1
// Values: [userProvidedEmail]

Operator Whitelisting

Only approved operators are allowed:

const ALLOWED_OPERATORS = [
  '=', '!=', '<>', '<', '>', '<=', '>=',
  'LIKE', 'ILIKE', 'NOT LIKE',
  'IN', 'NOT IN', 'IS NULL', 'IS NOT NULL',
];

// Invalid operators throw errors before reaching the database

Credential Isolation

Credentials are stored in the vault, never exposed to the AI agent:

// The AI only sees credential IDs, never actual secrets
const pipeline = {
  source: {
    credential_id: 'company-database',  // Just an ID
    // The actual password is never in the pipeline
  },
};

Rate Limiting

Prevent abuse with built-in rate limiting:

const pipeline: Pipeline = {
  id: 'agent-query',
  source: { /* ... */ },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,
  },
};

Advanced Patterns

Schema Discovery

Let the AI agent discover available tables and fields:

async function getTableSchema(table: string) {
  const pipeline: Pipeline = {
    id: `schema-${table}`,
    source: {
      id: 'schema-source',
      adapter_id: 'postgresql',
      endpoint_id: 'table_columns',
      credential_id: 'company-database',
      fields: [],
      config: { schema: 'public', table },
    },
  };

  const result = await this.etl.runPipeline(pipeline);
  return result.data;  // Returns column names and types
}

Add a tool for schema discovery:

{
  type: 'function',
  function: {
    name: 'get_table_schema',
    description: 'Get the columns and data types for a database table',
    parameters: {
      type: 'object',
      properties: {
        table: { type: 'string', description: 'Table name' },
      },
      required: ['table'],
    },
  },
}

Cross-Source Queries

Combine data from multiple sources:

async function getCustomerOverview(email: string) {
  // Get contact from CRM
  const [contact] = await dataService.getContacts(
    [{ field: 'email', operator: '=', value: email }],
    1
  );

  // Get orders from database
  const orders = await dataService.queryDatabase('orders', [
    { field: 'customer_email', operator: '=', value: email },
  ]);

  // Get payments from Stripe
  const payments = await dataService.getPayments(contact?.stripe_id);

  return {
    contact,
    orders,
    payments,
    summary: {
      totalOrders: orders.length,
      totalSpent: payments.reduce((sum, p) => sum + p.amount, 0) / 100,
    },
  };
}

Caching for Performance

Implement caching to reduce API calls:

import NodeCache from 'node-cache';

const cache = new NodeCache({ stdTTL: 300 }); // 5 minute TTL

async function cachedQuery(key: string, queryFn: () => Promise<any>) {
  const cached = cache.get(key);
  if (cached) return cached;

  const result = await queryFn();
  cache.set(key, result);
  return result;
}

// Usage
const contacts = await cachedQuery(
  `contacts-${company}`,
  () => dataService.getContacts([{ field: 'company', operator: '=', value: company }])
);

Example Conversations

Customer Lookup

User: "Find all contacts from Acme Corp and show their recent orders"

Agent: Uses get_crm_contacts with company filter, then query_database for orders

"I found 3 contacts from Acme Corp:

Payment Investigation

User: "Why did the payment for order #12345 fail?"

Agent: Uses query_database to get order details, then get_payments with customer ID

"Order #12345 for customer [email protected] had a payment failure. The Stripe charge (ch_xxx) was declined with reason 'insufficient_funds'. The customer has a total of 3 failed payment attempts this month."

Conclusion

By combining AI agents with OpenETL's abstract data access layer, you can build powerful assistants that:

  • Access multiple data sources through a unified interface
  • Maintain security with parameterized queries and credential isolation
  • Scale easily by adding new adapters without changing agent code
  • Stay type-safe with TypeScript validation

This pattern keeps your AI agent focused on reasoning and natural language, while OpenETL handles the complexity of data integration.

Resources


OpenETL is open source under the MIT license.