Зачем PostgreSQL агенту?
Прежде чем разбирать драйверы — зачем вообще PostgreSQL в системе с агентом? Не SQLite, не Redis, а именно Postgres?
Три ключевые роли PostgreSQL в AI-агентных системах:
PostgreSQL в AI-агенте ┌─────────────────────────────────────────────────────────────┐ │ │ │ pgvector Хранение embeddings, семантический │ │ поиск по долгосрочной памяти агента │ │ │ │ Реляционные таблицы История сессий, факты о пользователе, │ │ результаты инструментов, entity memory │ │ │ │ Трейсинг и логи Все вызовы LLM, tool calls, стоимость, │ │ время ответа — для observability │ │ │ └─────────────────────────────────────────────────────────────┘
Особенно важен pgvector — расширение PostgreSQL, которое добавляет тип vector и операторы для косинусного сходства. Это позволяет хранить embeddings прямо в Postgres и делать семантический поиск без отдельной векторной БД. Один PostgreSQL заменяет и реляционное хранилище, и векторную базу.
psycopg2 — синхронный драйвер. Вызов conn.execute() блокирует event loop до завершения запроса. Пока агент ждёт ответа от БД — он не может обрабатывать другие сообщения, вызывать инструменты или отвечать пользователю. Для агентов нужны асинхронные драйверы.
asyncpg vs aiopg: что выбрать
Два главных async-драйвера для PostgreSQL решают одну задачу, но с разными компромиссами:
| Параметр | asyncpg | aiopg |
|---|---|---|
| Реализация | Нативный протокол PostgreSQL (Cython) | Обёртка над psycopg2 |
| Производительность | ✅ Очень высокая (до 3× быстрее) | Средняя |
| API | Собственный (fetch, fetchrow, fetchval) | ✅ DBAPI 2.0 совместимый |
| Миграция с psycopg2 | Требует изменений кода | ✅ Минимальные изменения |
| pgvector | ✅ Отличная поддержка | ✅ Работает |
| Когда выбирать | Новые проекты, высокая нагрузка | Порт существующего кода на async |
Рекомендация для AI-агентов: используй asyncpg для новых проектов. Производительность важна, когда агент делает десятки векторных запросов за одну сессию. aiopg — если есть существующая кодовая база на psycopg2 и нужно добавить async с минимальными изменениями.
asyncpg: подключение и базовые операции
Установка
pip install asyncpg
# Или через uv (рекомендуется)
uv add asyncpg
Одиночное подключение
Самый простой случай — одно подключение для скриптов и тестов:
import asyncio
import asyncpg
async def main():
# Подключаемся к PostgreSQL
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="agent_db",
user="postgres",
password="secret",
)
# Или через DSN:
# conn = await asyncpg.connect("postgresql://postgres:secret@localhost/agent_db")
# Создаём таблицу
await conn.execute("""
CREATE TABLE IF NOT EXISTS agent_memory (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL, -- 'user' или 'assistant'
content TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Вставляем запись
await conn.execute(
"INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
"session-123", "user", "Как работает asyncio?"
)
# Получаем несколько строк
rows = await conn.fetch(
"SELECT * FROM agent_memory WHERE session_id = $1 ORDER BY created_at",
"session-123"
)
for row in rows:
print(f"[{row['role']}] {row['content']}")
# Получаем одну строку
row = await conn.fetchrow(
"SELECT * FROM agent_memory WHERE id = $1", 1
)
print(row['content'])
# Получаем скалярное значение
count = await conn.fetchval(
"SELECT COUNT(*) FROM agent_memory WHERE session_id = $1",
"session-123"
)
print(f"Сообщений в сессии: {count}")
await conn.close()
asyncio.run(main())
asyncpg использует позиционные параметры $1, $2 — это нативный синтаксис протокола PostgreSQL. В отличие от psycopg2 с его %s, здесь параметры передаются напрямую без подстановки строк — никакого SQL-инъекшна.
Методы получения данных
Методы asyncpg execute(sql, *args) → None INSERT, UPDATE, DELETE, DDL fetch(sql, *args) → list[Row] SELECT несколько строк fetchrow(sql, *args) → Row|None SELECT одна строка (или None) fetchval(sql, *args) → Any SELECT одно значение (COUNT, MAX...) executemany(sql, args) → None Пакетная вставка
Connection Pool: ключевой паттерн для агентов
Одиночное подключение — только для скриптов. В продакшне агент обрабатывает множество запросов параллельно, и на каждый открывать новое подключение дорого (50–100 мс на handshake). Connection Pool держит пул готовых подключений и раздаёт их по запросу.
Без пула С пулом (asyncpg.Pool) Запрос 1 → [connect 80ms] → query Запрос 1 → [берём conn] → query Запрос 2 → [connect 90ms] → query Запрос 2 → [берём conn] → query Запрос 3 → [connect 75ms] → query Запрос 3 → [берём conn] → query Подключения созданы один раз при старте
import asyncio
import asyncpg
from contextlib import asynccontextmanager
# Глобальный пул — создаётся один раз при старте приложения
pool: asyncpg.Pool | None = None
async def init_pool():
global pool
pool = await asyncpg.create_pool(
dsn="postgresql://postgres:secret@localhost/agent_db",
min_size=2, # Минимум 2 постоянных подключения
max_size=10, # Максимум 10 (под нагрузкой)
command_timeout=30, # Таймаут запроса в секундах
)
async def close_pool():
global pool
if pool:
await pool.close()
# Использование пула: acquire + release через context manager
async def get_session_history(session_id: str) -> list[dict]:
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT role, content, created_at
FROM agent_memory
WHERE session_id = $1
ORDER BY created_at ASC""",
session_id
)
return [dict(row) for row in rows]
async def save_message(session_id: str, role: str, content: str):
async with pool.acquire() as conn:
await conn.execute(
"INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
session_id, role, content
)
# Транзакция: несколько операций атомарно
async def save_agent_turn(session_id: str, user_msg: str, agent_response: str):
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
session_id, "user", user_msg
)
await conn.execute(
"INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
session_id, "assistant", agent_response
)
# Если здесь исключение — оба INSERT откатятся
# Пакетная вставка (executemany) — намного быстрее цикла
async def bulk_save_messages(messages: list[tuple[str, str, str]]):
"""messages: [(session_id, role, content), ...]"""
async with pool.acquire() as conn:
await conn.executemany(
"INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
messages
)
async def main():
await init_pool()
try:
await save_agent_turn("sess-1", "Привет!", "Привет! Чем могу помочь?")
history = await get_session_history("sess-1")
for msg in history:
print(f"[{msg['role']}] {msg['content']}")
finally:
await close_pool()
asyncio.run(main())
Блок async with pool.acquire() as conn удерживает подключение из пула на всё время блока. Если внутри ты делаешь дорогие операции (вызов LLM, долгое вычисление) — ты блокируешь слот пула. Правило: бери соединение только на время работы с БД, а не на время работы всей функции.
asyncpg + pgvector: векторная память агента
Именно здесь asyncpg становится по-настоящему важным для AI-инженера. pgvector — расширение PostgreSQL, которое добавляет тип vector и операторы поиска по сходству. Позволяет хранить embeddings и делать semantic search без отдельной векторной БД.
Инициализация pgvector
import asyncpg
from pgvector.asyncpg import register_vector # pip install pgvector
async def setup_vector_memory(conn: asyncpg.Connection):
# Включаем расширение pgvector
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
# Регистрируем тип vector в asyncpg (делается один раз на соединение)
await register_vector(conn)
# Создаём таблицу для долгосрочной памяти агента
await conn.execute("""
CREATE TABLE IF NOT EXISTS agent_facts (
id SERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
content TEXT NOT NULL, -- Факт в текстовом виде
embedding vector(1536), -- OpenAI ada-002: 1536 измерений
source TEXT, -- Откуда факт: 'conversation', 'document'
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Индекс для ускорения поиска (HNSW — лучший для production)
await conn.execute("""
CREATE INDEX IF NOT EXISTS agent_facts_embedding_idx
ON agent_facts
USING hnsw (embedding vector_cosine_ops)
""")
Сохранение и поиск по embedding
import asyncio
import asyncpg
import numpy as np
from openai import AsyncOpenAI
from pgvector.asyncpg import register_vector
client = AsyncOpenAI()
async def get_embedding(text: str) -> list[float]:
"""Получить embedding через OpenAI API"""
response = await client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
async def save_fact(pool: asyncpg.Pool, user_id: str, content: str, source: str = "conversation"):
"""Сохранить факт с embedding в долгосрочную память агента"""
embedding = await get_embedding(content)
async with pool.acquire() as conn:
await register_vector(conn)
await conn.execute(
"""INSERT INTO agent_facts (user_id, content, embedding, source)
VALUES ($1, $2, $3, $4)""",
user_id, content, embedding, source
)
print(f"✅ Факт сохранён: {content[:60]}...")
async def search_memory(pool: asyncpg.Pool, user_id: str, query: str, top_k: int = 5) -> list[dict]:
"""Найти релевантные факты по семантическому сходству"""
query_embedding = await get_embedding(query)
async with pool.acquire() as conn:
await register_vector(conn)
rows = await conn.fetch(
"""SELECT content, source, created_at,
1 - (embedding <=> $1) AS similarity -- cosine similarity
FROM agent_facts
WHERE user_id = $2
ORDER BY embedding <=> $1 -- ORDER BY расстоянию (меньше = ближе)
LIMIT $3""",
query_embedding, user_id, top_k
)
return [dict(row) for row in rows]
async def main():
pool = await asyncpg.create_pool("postgresql://postgres:secret@localhost/agent_db")
async with pool.acquire() as conn:
await setup_vector_memory(conn)
# Сохраняем факты о пользователе
await save_fact(pool, "user-42", "Пользователь предпочитает краткие ответы без технических деталей")
await save_fact(pool, "user-42", "Работает в сфере маркетинга, не разработчик")
await save_fact(pool, "user-42", "Интересуется применением AI для анализа конкурентов")
# Ищем релевантную память перед ответом
query = "Какой формат ответов предпочтительнее для этого пользователя?"
results = await search_memory(pool, "user-42", query, top_k=3)
print(f"\nРелевантные факты для запроса: '{query}'")
for r in results:
print(f" [{r['similarity']:.3f}] {r['content']}")
await pool.close()
asyncio.run(main())
В pgvector embedding <=> $1 — косинусное расстояние (0 = одинаковые, 2 = противоположные). Similarity = 1 - distance. Для точечного произведения: <#>. Для евклидова расстояния: <->. Для семантического поиска текстов почти всегда используй косинусное.
aiopg: когда нужна совместимость с psycopg2
Если у тебя есть существующий код на psycopg2 и нужно перевести его на async с минимальными изменениями — aiopg твой выбор. API почти идентичен psycopg2.
import asyncio
import aiopg # pip install aiopg
DSN = "dbname=agent_db user=postgres password=secret host=localhost"
async def main():
# Создаём пул
async with aiopg.create_pool(DSN, minsize=2, maxsize=10) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# API cursor-based (как psycopg2)
await cur.execute(
"SELECT role, content FROM agent_memory WHERE session_id = %s", # %s, не $1
("session-123",)
)
rows = await cur.fetchall()
for row in rows:
print(f"[{row[0]}] {row[1]}")
# Вставка
await cur.execute(
"INSERT INTO agent_memory (session_id, role, content) VALUES (%s, %s, %s)",
("session-123", "user", "Привет!")
)
asyncio.run(main())
- Параметры через
%s(psycopg2 стиль), а не$1 - Cursor-based API: нужно явно
await cur.execute(), потомawait cur.fetchall() - Нет встроенных
fetch()/fetchrow()— нужно работать через cursor - Медленнее asyncpg, но легче мигрировать с psycopg2
Практические паттерны для агентов
FastAPI: пул в lifespan
В FastAPI-агентах пул инициализируется при старте приложения и закрывается при остановке через lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
import asyncpg
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: создаём пул
app.state.db = await asyncpg.create_pool(
dsn="postgresql://postgres:secret@localhost/agent_db",
min_size=2,
max_size=10,
)
yield
# Shutdown: закрываем пул
await app.state.db.close()
app = FastAPI(lifespan=lifespan)
@app.post("/chat/{session_id}")
async def chat(session_id: str, message: str, request: Request):
pool: asyncpg.Pool = request.app.state.db
# Сохраняем сообщение пользователя
async with pool.acquire() as conn:
await conn.execute(
"INSERT INTO agent_memory (session_id, role, content) VALUES ($1, $2, $3)",
session_id, "user", message
)
# Получаем историю для контекста
history = await conn.fetch(
"SELECT role, content FROM agent_memory WHERE session_id = $1 ORDER BY created_at DESC LIMIT 20",
session_id
)
# ... вызов LLM с историей ...
return {"status": "ok"}
Параллельные запросы к БД
asyncpg + asyncio.gather = одновременные запросы к нескольким таблицам:
import asyncio
import asyncpg
async def load_agent_context(pool: asyncpg.Pool, user_id: str, session_id: str, query: str):
"""
Загружаем всё нужное параллельно:
- история текущей сессии
- релевантные факты из долгосрочной памяти
- профиль пользователя
"""
async def get_history():
async with pool.acquire() as conn:
return await conn.fetch(
"SELECT role, content FROM agent_memory WHERE session_id = $1 ORDER BY created_at DESC LIMIT 10",
session_id
)
async def get_relevant_facts():
embedding = await get_embedding(query) # API-запрос к OpenAI
async with pool.acquire() as conn:
from pgvector.asyncpg import register_vector
await register_vector(conn)
return await conn.fetch(
"SELECT content FROM agent_facts WHERE user_id = $1 ORDER BY embedding <=> $2 LIMIT 5",
user_id, embedding
)
async def get_user_profile():
async with pool.acquire() as conn:
return await conn.fetchrow(
"SELECT name, preferences, language FROM users WHERE id = $1",
user_id
)
# Все три запроса выполняются одновременно
history, facts, profile = await asyncio.gather(
get_history(),
get_relevant_facts(),
get_user_profile(),
)
return {
"history": [dict(r) for r in history],
"facts": [r["content"] for r in facts],
"profile": dict(profile) if profile else {},
}
Типичные ошибки
# ❌ НЕЛЬЗЯ — блокирует event loop!
import psycopg2
conn = psycopg2.connect(...)
cursor = conn.cursor()
cursor.execute("SELECT ...") # Блокирует всё приложение
# ✅ НУЖНО — async драйвер
async with pool.acquire() as conn:
await conn.fetch("SELECT ...")
# ❌ Очень медленно: каждый запрос создаёт TCP-соединение (~80ms)
async def bad_handler(session_id: str):
conn = await asyncpg.connect(dsn=DSN) # новое подключение каждый раз!
rows = await conn.fetch(...)
await conn.close()
# ✅ Используй пул: подключения переиспользуются (~0ms overhead)
async def good_handler(session_id: str, pool: asyncpg.Pool):
async with pool.acquire() as conn:
rows = await conn.fetch(...)
# ❌ asyncpg не знает тип vector без регистрации
async with pool.acquire() as conn:
await conn.fetch("SELECT embedding FROM agent_facts") # Exception!
# ✅ Регистрируй на каждом новом соединении
from pgvector.asyncpg import register_vector
async with pool.acquire() as conn:
await register_vector(conn)
await conn.fetch("SELECT embedding FROM agent_facts") # OK
Шпаргалка
import asyncpg
# --- Подключение ---
conn = await asyncpg.connect(dsn="postgresql://user:pass@host/db")
pool = await asyncpg.create_pool(dsn=..., min_size=2, max_size=10)
# --- Запросы ---
await conn.execute("INSERT INTO t VALUES ($1, $2)", val1, val2)
rows = await conn.fetch("SELECT * FROM t WHERE id = $1", id) # → list[Record]
row = await conn.fetchrow("SELECT * FROM t WHERE id = $1", id) # → Record | None
val = await conn.fetchval("SELECT COUNT(*) FROM t") # → Any
await conn.executemany("INSERT INTO t VALUES ($1)", [(1,), (2,)])
# --- Пул ---
async with pool.acquire() as conn:
rows = await conn.fetch(...)
# --- Транзакция ---
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(...)
await conn.execute(...) # откат если исключение
# --- pgvector ---
from pgvector.asyncpg import register_vector
async with pool.acquire() as conn:
await register_vector(conn)
# Сохранить
await conn.execute("INSERT INTO t (emb) VALUES ($1)", [0.1, 0.2, ...])
# Поиск по косинусному сходству
rows = await conn.fetch(
"SELECT *, 1 - (emb <=> $1) AS sim FROM t ORDER BY emb <=> $1 LIMIT 5",
query_embedding
)
# --- aiopg (psycopg2 стиль) ---
import aiopg
async with aiopg.create_pool(dsn) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM t WHERE id = %s", (1,))
rows = await cur.fetchall() # list[tuple]
Практическое задание
Закрепи тему самостоятельно:
Задание: Агент с долгосрочной памятью на pgvector
- Разверни PostgreSQL локально (через Docker:
docker run -e POSTGRES_PASSWORD=secret -p 5432:5432 ankane/pgvector) - Создай таблицы:
sessions(история сообщений) иuser_facts(embeddings долгосрочной памяти) - Напиши функцию
remember_fact(user_id, text)— сохраняет текст с его embedding через OpenAI API - Напиши функцию
recall(user_id, query, top_k=3)— возвращает топ-3 релевантных факта по запросу - Напиши консольный чат-агент, который перед каждым ответом загружает историю сессии и релевантные факты параллельно через
asyncio.gather() - *Дополнительно: добавь HNSW-индекс и замерь скорость поиска с ним и без него на 10 000 записях