Зачем PostgreSQL агенту?

Прежде чем разбирать драйверы — зачем вообще PostgreSQL в системе с агентом? Не SQLite, не Redis, а именно Postgres?

Три ключевые роли PostgreSQL в AI-агентных системах:

PostgreSQL в AI-агенте

┌─────────────────────────────────────────────────────────────┐
│                                                             │
│  pgvector            Хранение embeddings, семантический      │
│                       поиск по долгосрочной памяти агента   │
│                                                             │
│  Реляционные таблицы  История сессий, факты о пользователе,  │
│                       результаты инструментов, entity memory │
│                                                             │
│  Трейсинг и логи      Все вызовы LLM, tool calls, стоимость,  │
│                       время ответа — для observability       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Особенно важен pgvector — расширение PostgreSQL, которое добавляет тип vector и операторы для косинусного сходства. Это позволяет хранить embeddings прямо в Postgres и делать семантический поиск без отдельной векторной БД. Один PostgreSQL заменяет и реляционное хранилище, и векторную базу.

ℹ️ Почему не psycopg2?

psycopg2 — синхронный драйвер. Вызов conn.execute() блокирует event loop до завершения запроса. Пока агент ждёт ответа от БД — он не может обрабатывать другие сообщения, вызывать инструменты или отвечать пользователю. Для агентов нужны асинхронные драйверы.

asyncpg vs aiopg: что выбрать

Два главных async-драйвера для PostgreSQL решают одну задачу, но с разными компромиссами:

Параметр asyncpg aiopg
Реализация Нативный протокол PostgreSQL (Cython) Обёртка над psycopg2
Производительность ✅ Очень высокая (до 3× быстрее) Средняя
API Собственный (fetch, fetchrow, fetchval) ✅ DBAPI 2.0 совместимый
Миграция с psycopg2 Требует изменений кода ✅ Минимальные изменения
pgvector ✅ Отличная поддержка ✅ Работает
Когда выбирать Новые проекты, высокая нагрузка Порт существующего кода на async

Рекомендация для AI-агентов: используй asyncpg для новых проектов. Производительность важна, когда агент делает десятки векторных запросов за одну сессию. aiopg — если есть существующая кодовая база на psycopg2 и нужно добавить async с минимальными изменениями.

asyncpg: подключение и базовые операции

Установка

Установка
bash
pip install asyncpg
# Или через uv (рекомендуется)
uv add asyncpg

Одиночное подключение

Самый простой случай — одно подключение для скриптов и тестов:

asyncpg — одиночное подключение
python
import asyncio
import asyncpg

async def main():
    # Подключаемся к PostgreSQL
    conn = await asyncpg.connect(
        host="localhost",
        port=5432,
        database="agent_db",
        user="postgres",
        password="secret",
    )
    # Или через DSN:
    # conn = await asyncpg.connect("postgresql://postgres:secret@localhost/agent_db")

    # Создаём таблицу
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS agent_memory (
            id SERIAL PRIMARY KEY,
            session_id TEXT NOT NULL,
            role TEXT NOT NULL,        -- 'user' или 'assistant'
            content TEXT NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW()
        )
    """)

    # Вставляем запись
    await conn.execute(
        "INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
        "session-123", "user", "Как работает asyncio?"
    )

    # Получаем несколько строк
    rows = await conn.fetch(
        "SELECT * FROM agent_memory WHERE session_id = $1 ORDER BY created_at",
        "session-123"
    )
    for row in rows:
        print(f"[{row['role']}] {row['content']}")

    # Получаем одну строку
    row = await conn.fetchrow(
        "SELECT * FROM agent_memory WHERE id = $1", 1
    )
    print(row['content'])

    # Получаем скалярное значение
    count = await conn.fetchval(
        "SELECT COUNT(*) FROM agent_memory WHERE session_id = $1",
        "session-123"
    )
    print(f"Сообщений в сессии: {count}")

    await conn.close()

asyncio.run(main())
ℹ️ Параметры через $1, $2, ...

asyncpg использует позиционные параметры $1, $2 — это нативный синтаксис протокола PostgreSQL. В отличие от psycopg2 с его %s, здесь параметры передаются напрямую без подстановки строк — никакого SQL-инъекшна.

Методы получения данных

Методы asyncpg

execute(sql, *args)       → None      INSERT, UPDATE, DELETE, DDL
fetch(sql, *args)         → list[Row]  SELECT несколько строк
fetchrow(sql, *args)      → Row|None   SELECT одна строка (или None)
fetchval(sql, *args)      → Any        SELECT одно значение (COUNT, MAX...)
executemany(sql, args)    → None      Пакетная вставка

Connection Pool: ключевой паттерн для агентов

Одиночное подключение — только для скриптов. В продакшне агент обрабатывает множество запросов параллельно, и на каждый открывать новое подключение дорого (50–100 мс на handshake). Connection Pool держит пул готовых подключений и раздаёт их по запросу.

Без пула                          С пулом (asyncpg.Pool)

Запрос 1 → [connect 80ms] → query   Запрос 1 → [берём conn] → query
Запрос 2 → [connect 90ms] → query   Запрос 2 → [берём conn] → query
Запрос 3 → [connect 75ms] → query   Запрос 3 → [берём conn] → query

                                     Подключения созданы один раз при старте
asyncpg — Connection Pool для агента
python
import asyncio
import asyncpg
from contextlib import asynccontextmanager

# Глобальный пул — создаётся один раз при старте приложения
pool: asyncpg.Pool | None = None

async def init_pool():
    global pool
    pool = await asyncpg.create_pool(
        dsn="postgresql://postgres:secret@localhost/agent_db",
        min_size=2,    # Минимум 2 постоянных подключения
        max_size=10,   # Максимум 10 (под нагрузкой)
        command_timeout=30,  # Таймаут запроса в секундах
    )

async def close_pool():
    global pool
    if pool:
        await pool.close()

# Использование пула: acquire + release через context manager
async def get_session_history(session_id: str) -> list[dict]:
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            """SELECT role, content, created_at
               FROM agent_memory
               WHERE session_id = $1
               ORDER BY created_at ASC""",
            session_id
        )
        return [dict(row) for row in rows]

async def save_message(session_id: str, role: str, content: str):
    async with pool.acquire() as conn:
        await conn.execute(
            "INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
            session_id, role, content
        )

# Транзакция: несколько операций атомарно
async def save_agent_turn(session_id: str, user_msg: str, agent_response: str):
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute(
                "INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
                session_id, "user", user_msg
            )
            await conn.execute(
                "INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
                session_id, "assistant", agent_response
            )
            # Если здесь исключение — оба INSERT откатятся

# Пакетная вставка (executemany) — намного быстрее цикла
async def bulk_save_messages(messages: list[tuple[str, str, str]]):
    """messages: [(session_id, role, content), ...]"""
    async with pool.acquire() as conn:
        await conn.executemany(
            "INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
            messages
        )

async def main():
    await init_pool()
    try:
        await save_agent_turn("sess-1", "Привет!", "Привет! Чем могу помочь?")
        history = await get_session_history("sess-1")
        for msg in history:
            print(f"[{msg['role']}] {msg['content']}")
    finally:
        await close_pool()

asyncio.run(main())
⚠️ Не держи conn.acquire() дольше необходимого

Блок async with pool.acquire() as conn удерживает подключение из пула на всё время блока. Если внутри ты делаешь дорогие операции (вызов LLM, долгое вычисление) — ты блокируешь слот пула. Правило: бери соединение только на время работы с БД, а не на время работы всей функции.

asyncpg + pgvector: векторная память агента

Именно здесь asyncpg становится по-настоящему важным для AI-инженера. pgvector — расширение PostgreSQL, которое добавляет тип vector и операторы поиска по сходству. Позволяет хранить embeddings и делать semantic search без отдельной векторной БД.

Инициализация pgvector

Установка pgvector и создание таблицы
python
import asyncpg
from pgvector.asyncpg import register_vector  # pip install pgvector

async def setup_vector_memory(conn: asyncpg.Connection):
    # Включаем расширение pgvector
    await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

    # Регистрируем тип vector в asyncpg (делается один раз на соединение)
    await register_vector(conn)

    # Создаём таблицу для долгосрочной памяти агента
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS agent_facts (
            id SERIAL PRIMARY KEY,
            user_id TEXT NOT NULL,
            content TEXT NOT NULL,           -- Факт в текстовом виде
            embedding vector(1536),          -- OpenAI ada-002: 1536 измерений
            source TEXT,                     -- Откуда факт: 'conversation', 'document'
            created_at TIMESTAMPTZ DEFAULT NOW()
        )
    """)

    # Индекс для ускорения поиска (HNSW — лучший для production)
    await conn.execute("""
        CREATE INDEX IF NOT EXISTS agent_facts_embedding_idx
        ON agent_facts
        USING hnsw (embedding vector_cosine_ops)
    """)
asyncpg + pgvector — сохранение и семантический поиск
python
import asyncio
import asyncpg
import numpy as np
from openai import AsyncOpenAI
from pgvector.asyncpg import register_vector

client = AsyncOpenAI()

async def get_embedding(text: str) -> list[float]:
    """Получить embedding через OpenAI API"""
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

async def save_fact(pool: asyncpg.Pool, user_id: str, content: str, source: str = "conversation"):
    """Сохранить факт с embedding в долгосрочную память агента"""
    embedding = await get_embedding(content)

    async with pool.acquire() as conn:
        await register_vector(conn)
        await conn.execute(
            """INSERT INTO agent_facts (user_id, content, embedding, source)
               VALUES ($1, $2, $3, $4)""",
            user_id, content, embedding, source
        )
    print(f"✅ Факт сохранён: {content[:60]}...")

async def search_memory(pool: asyncpg.Pool, user_id: str, query: str, top_k: int = 5) -> list[dict]:
    """Найти релевантные факты по семантическому сходству"""
    query_embedding = await get_embedding(query)

    async with pool.acquire() as conn:
        await register_vector(conn)
        rows = await conn.fetch(
            """SELECT content, source, created_at,
                      1 - (embedding <=> $1) AS similarity  -- cosine similarity
               FROM agent_facts
               WHERE user_id = $2
               ORDER BY embedding <=> $1   -- ORDER BY расстоянию (меньше = ближе)
               LIMIT $3""",
            query_embedding, user_id, top_k
        )
    return [dict(row) for row in rows]

async def main():
    pool = await asyncpg.create_pool("postgresql://postgres:secret@localhost/agent_db")

    async with pool.acquire() as conn:
        await setup_vector_memory(conn)

    # Сохраняем факты о пользователе
    await save_fact(pool, "user-42", "Пользователь предпочитает краткие ответы без технических деталей")
    await save_fact(pool, "user-42", "Работает в сфере маркетинга, не разработчик")
    await save_fact(pool, "user-42", "Интересуется применением AI для анализа конкурентов")

    # Ищем релевантную память перед ответом
    query = "Какой формат ответов предпочтительнее для этого пользователя?"
    results = await search_memory(pool, "user-42", query, top_k=3)

    print(f"\nРелевантные факты для запроса: '{query}'")
    for r in results:
        print(f"  [{r['similarity']:.3f}] {r['content']}")

    await pool.close()

asyncio.run(main())
Оператор <=> — косинусное расстояние

В pgvector embedding <=> $1 — косинусное расстояние (0 = одинаковые, 2 = противоположные). Similarity = 1 - distance. Для точечного произведения: <#>. Для евклидова расстояния: <->. Для семантического поиска текстов почти всегда используй косинусное.

aiopg: когда нужна совместимость с psycopg2

Если у тебя есть существующий код на psycopg2 и нужно перевести его на async с минимальными изменениями — aiopg твой выбор. API почти идентичен psycopg2.

aiopg — пул подключений
python
import asyncio
import aiopg  # pip install aiopg

DSN = "dbname=agent_db user=postgres password=secret host=localhost"

async def main():
    # Создаём пул
    async with aiopg.create_pool(DSN, minsize=2, maxsize=10) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # API cursor-based (как psycopg2)
                await cur.execute(
                    "SELECT role, content FROM agent_memory WHERE session_id = %s",  # %s, не $1
                    ("session-123",)
                )
                rows = await cur.fetchall()
                for row in rows:
                    print(f"[{row[0]}] {row[1]}")

                # Вставка
                await cur.execute(
                    "INSERT INTO agent_memory (session_id, role, content) VALUES (%s, %s, %s)",
                    ("session-123", "user", "Привет!")
                )

asyncio.run(main())
ℹ️ Ключевые отличия aiopg от asyncpg
  • Параметры через %s (psycopg2 стиль), а не $1
  • Cursor-based API: нужно явно await cur.execute(), потом await cur.fetchall()
  • Нет встроенных fetch() / fetchrow() — нужно работать через cursor
  • Медленнее asyncpg, но легче мигрировать с psycopg2

Практические паттерны для агентов

FastAPI: пул в lifespan

В FastAPI-агентах пул инициализируется при старте приложения и закрывается при остановке через lifespan:

FastAPI + asyncpg pool через lifespan
python
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
import asyncpg

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: создаём пул
    app.state.db = await asyncpg.create_pool(
        dsn="postgresql://postgres:secret@localhost/agent_db",
        min_size=2,
        max_size=10,
    )
    yield
    # Shutdown: закрываем пул
    await app.state.db.close()

app = FastAPI(lifespan=lifespan)

@app.post("/chat/{session_id}")
async def chat(session_id: str, message: str, request: Request):
    pool: asyncpg.Pool = request.app.state.db

    # Сохраняем сообщение пользователя
    async with pool.acquire() as conn:
        await conn.execute(
            "INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
            session_id, "user", message
        )
        # Получаем историю для контекста
        history = await conn.fetch(
            "SELECT role, content FROM agent_memory WHERE session_id = $1 ORDER BY created_at DESC LIMIT 20",
            session_id
        )

    # ... вызов LLM с историей ...
    return {"status": "ok"}

Параллельные запросы к БД

asyncpg + asyncio.gather = одновременные запросы к нескольким таблицам:

Параллельная загрузка контекста агента
python
import asyncio
import asyncpg

async def load_agent_context(pool: asyncpg.Pool, user_id: str, session_id: str, query: str):
    """
    Загружаем всё нужное параллельно:
    - история текущей сессии
    - релевантные факты из долгосрочной памяти
    - профиль пользователя
    """

    async def get_history():
        async with pool.acquire() as conn:
            return await conn.fetch(
                "SELECT role, content FROM agent_memory WHERE session_id = $1 ORDER BY created_at DESC LIMIT 10",
                session_id
            )

    async def get_relevant_facts():
        embedding = await get_embedding(query)  # API-запрос к OpenAI
        async with pool.acquire() as conn:
            from pgvector.asyncpg import register_vector
            await register_vector(conn)
            return await conn.fetch(
                "SELECT content FROM agent_facts WHERE user_id = $1 ORDER BY embedding <=> $2 LIMIT 5",
                user_id, embedding
            )

    async def get_user_profile():
        async with pool.acquire() as conn:
            return await conn.fetchrow(
                "SELECT name, preferences, language FROM users WHERE id = $1",
                user_id
            )

    # Все три запроса выполняются одновременно
    history, facts, profile = await asyncio.gather(
        get_history(),
        get_relevant_facts(),
        get_user_profile(),
    )

    return {
        "history": [dict(r) for r in history],
        "facts": [r["content"] for r in facts],
        "profile": dict(profile) if profile else {},
    }

Типичные ошибки

⚠️ Синхронный psycopg2 в async коде
# ❌ НЕЛЬЗЯ — блокирует event loop!
import psycopg2
conn = psycopg2.connect(...)
cursor = conn.cursor()
cursor.execute("SELECT ...")  # Блокирует всё приложение

# ✅ НУЖНО — async драйвер
async with pool.acquire() as conn:
    await conn.fetch("SELECT ...")
⚠️ Создание нового подключения на каждый запрос
# ❌ Очень медленно: каждый запрос создаёт TCP-соединение (~80ms)
async def bad_handler(session_id: str):
    conn = await asyncpg.connect(dsn=DSN)  # новое подключение каждый раз!
    rows = await conn.fetch(...)
    await conn.close()

# ✅ Используй пул: подключения переиспользуются (~0ms overhead)
async def good_handler(session_id: str, pool: asyncpg.Pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch(...)
⚠️ Забытый register_vector при работе с pgvector
# ❌ asyncpg не знает тип vector без регистрации
async with pool.acquire() as conn:
    await conn.fetch("SELECT embedding FROM agent_facts")  # Exception!

# ✅ Регистрируй на каждом новом соединении
from pgvector.asyncpg import register_vector
async with pool.acquire() as conn:
    await register_vector(conn)
    await conn.fetch("SELECT embedding FROM agent_facts")  # OK

Шпаргалка

asyncpg cheatsheet
python
import asyncpg

# --- Подключение ---
conn = await asyncpg.connect(dsn="postgresql://user:pass@host/db")
pool = await asyncpg.create_pool(dsn=..., min_size=2, max_size=10)

# --- Запросы ---
await conn.execute("INSERT INTO t VALUES ($1, $2)", val1, val2)
rows = await conn.fetch("SELECT * FROM t WHERE id = $1", id)       # → list[Record]
row  = await conn.fetchrow("SELECT * FROM t WHERE id = $1", id)    # → Record | None
val  = await conn.fetchval("SELECT COUNT(*) FROM t")               # → Any
await conn.executemany("INSERT INTO t VALUES ($1)", [(1,), (2,)])

# --- Пул ---
async with pool.acquire() as conn:
    rows = await conn.fetch(...)

# --- Транзакция ---
async with pool.acquire() as conn:
    async with conn.transaction():
        await conn.execute(...)
        await conn.execute(...)  # откат если исключение

# --- pgvector ---
from pgvector.asyncpg import register_vector
async with pool.acquire() as conn:
    await register_vector(conn)
    # Сохранить
    await conn.execute("INSERT INTO t (emb) VALUES ($1)", [0.1, 0.2, ...])
    # Поиск по косинусному сходству
    rows = await conn.fetch(
        "SELECT *, 1 - (emb <=> $1) AS sim FROM t ORDER BY emb <=> $1 LIMIT 5",
        query_embedding
    )

# --- aiopg (psycopg2 стиль) ---
import aiopg
async with aiopg.create_pool(dsn) as pool:
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM t WHERE id = %s", (1,))
            rows = await cur.fetchall()  # list[tuple]

Практическое задание

Закрепи тему самостоятельно:

Задание: Агент с долгосрочной памятью на pgvector

  1. Разверни PostgreSQL локально (через Docker: docker run -e POSTGRES_PASSWORD=secret -p 5432:5432 ankane/pgvector)
  2. Создай таблицы: sessions (история сообщений) и user_facts (embeddings долгосрочной памяти)
  3. Напиши функцию remember_fact(user_id, text) — сохраняет текст с его embedding через OpenAI API
  4. Напиши функцию recall(user_id, query, top_k=3) — возвращает топ-3 релевантных факта по запросу
  5. Напиши консольный чат-агент, который перед каждым ответом загружает историю сессии и релевантные факты параллельно через asyncio.gather()
  6. *Дополнительно: добавь HNSW-индекс и замерь скорость поиска с ним и без него на 10 000 записях

Что дальше