agent/state.py — полная реализация AgentStateПроблема: LLM ничего не помнит
Когда вы вызываете llm.complete(messages), модель обрабатывает
переданный список сообщений и возвращает ответ. Связи между двумя разными вызовами
нет — как если бы каждый раз разговор начинался с чистого листа.
AgentState — это «записная книжка» сессии. Всё, что происходит,
записывается туда, и при каждом вызове LLM вся история передаётся заново.
Это называется краткосрочной памятью через контекстное окно.
Структура AgentState
Поле messages — ключевое. Именно его содержимое отправляется в LLM
при каждом вызове. Посмотрим, как оно накапливается по ходу работы агента.
Почему dataclass с field(default_factory)
Хранить изменяемые объекты (списки, словари) как значения по умолчанию в датаклассах — классическая ловушка Python:
# ❌ НЕПРАВИЛЬНО — все экземпляры делят ОДИН список
@dataclass
class Bad:
items: list = [] # изменяемый объект как дефолт!
a = Bad()
b = Bad()
a.items.append(1)
print(b.items) # [1] — катастрофа! b тоже изменился
# ✅ ПРАВИЛЬНО — каждый экземпляр получает свой список
@dataclass
class Good:
items: list = field(default_factory=list)
a = Good()
b = Good()
a.items.append(1)
print(b.items) # [] — всё правильно
field(default_factory=list) вызывает list() при создании
каждого нового экземпляра, а не один раз при определении класса.
Как накапливаются сообщения
Рассмотрим реальную историю сообщений после трёх шагов агента. Каждый шаг добавляет два сообщения: ответ LLM (assistant) и результат инструмента (user).
name: "search_web"
input: {query: "RAG best practices 2024"}
content: '[{"url":"https://arxiv.org/...","title":"RAG Survey",...},...]'
name: "fetch_pages"
input: {urls: ["https://arxiv.org/...", "https://towardsdatascience.com/..."]}
content: '[{"url":"...","title":"...","content":"Chunk size selection..."}]'
name: "write_report"
input: {title: "RAG Best Practices", content: "# RAG...", sources: [...]}
content: "Report written: RAG Best Practices (842 words)"
user → assistant → user → assistant.... Результаты инструментов
не от человека, но API требует именно такой порядок. OpenAI использует
другую схему: отдельный role: "tool".
Два типа контента в сообщениях
Поле content у Message может быть двух типов:
простая строка или список блоков. Это ключевое для понимания формата Anthropic API.
@dataclass
class Message:
role: Literal["user", "assistant"]
content: str | list[Any] # строка ИЛИ список блоков
# Тип 1: простая строка (начальный запрос пользователя)
Message(role="user", content="Research topic: RAG best practices")
# Тип 2: список блоков — ответ LLM с вызовом инструмента
Message(role="assistant", content=[
{"type": "text", "text": "Сначала поищу информацию в интернете."},
{
"type": "tool_use",
"id": "tu_001", # уникальный ID для матчинга с результатом
"name": "search_web",
"input": {"query": "RAG best practices 2024", "max_results": 5}
}
])
# Тип 3: список блоков — результаты инструментов (от user)
Message(role="user", content=[
{
"type": "tool_result",
"tool_use_id": "tu_001", # ← матчится с id из tool_use
"content": '[{"url": "https://arxiv.org/...", "title": "RAG Survey"}]'
}
])
Зачем список блоков, а не одна строка?
За один шаг LLM может вызвать несколько инструментов одновременно
(parallel tool calling). Например, сразу запросить поиск по двум разным темам.
Список блоков позволяет вернуть несколько tool_use в одном ответе:
# LLM вызывает два инструмента за один шаг
Message(role="assistant", content=[
{
"type": "tool_use", "id": "tu_001",
"name": "search_web",
"input": {"query": "RAG retrieval techniques"}
},
{
"type": "tool_use", "id": "tu_002",
"name": "search_web",
"input": {"query": "RAG evaluation metrics RAGAS"}
}
])
# Результаты обоих — одним сообщением user
Message(role="user", content=[
{"type": "tool_result", "tool_use_id": "tu_001", "content": "[...]"},
{"type": "tool_result", "tool_use_id": "tu_002", "content": "[...]"},
])
tool_result блоки должны быть в одном
сообщении user. Нельзя отправить два отдельных user
сообщения подряд — API вернёт ошибку «invalid message sequence».
Принцип append-only
Один из 5 инвариантов цикла: сообщения никогда не удаляются и не изменяются. Только добавляются новые.
def append_message(self, message: Message) -> None:
"""Добавить сообщение в историю. Только добавление — никогда не удалять."""
self.messages.append(message)
# В коде нет методов: remove_message, update_message, pop_message
# Это намеренное архитектурное решение
Зачем такое ограничение?
- LLM видит полную цепочку действий. Если удалить сообщение с результатом поиска, модель «забудет» что искала и может повторить тот же запрос.
- Отладка и воспроизводимость. По полной истории сообщений можно точно воспроизвести, что делал агент на каждом шаге. Это критично при отладке.
-
Корректность Anthropic API.
API требует строгого чередования
user/assistant. Удаление или изменение сообщений нарушает этот порядок.
Источники и дедупликация
Агент может найти один и тот же URL несколько раз — при разных поисковых
запросах. Метод add_source проверяет дубли по URL:
@dataclass
class Source:
url: str
title: str
snippet: str = "" # необязательный краткий сниппет
def add_source(self, source: Source) -> None:
"""Добавить источник без дублей (по URL)."""
if not any(s.url == source.url for s in self.sources):
self.sources.append(source)
# Пример:
state.add_source(Source(url="https://arxiv.org/1", title="RAG Survey"))
state.add_source(Source(url="https://arxiv.org/1", title="RAG Survey")) # дубль!
state.add_source(Source(url="https://arxiv.org/2", title="RAG Methods"))
print(len(state.sources)) # 2, не 3
Источники накапливаются автоматически в оркестраторе: после вызова
search_web найденные URL добавляются в state.sources.
В финальном отчёте они превращаются в список «References».
Конвертация для API: to_api_messages()
LLM API ожидает список словарей [{"role": ..., "content": ...}],
не список объектов Message. Метод to_api_messages()
делает это преобразование:
def to_api_messages(self) -> list[dict[str, Any]]:
"""Конвертировать в формат Anthropic API."""
return [{"role": msg.role, "content": msg.content} for msg in self.messages]
# Вызов в Orchestrator:
response = await self.llm.complete(
messages=state.to_api_messages(), # ← конвертируем
tools=tools,
system=SYSTEM_PROMPT,
)
Управление контекстным окном
У каждой модели есть лимит контекстного окна (context window). После многих шагов
история сообщений может превысить этот лимит, и API вернёт ошибку.
В llm_client.py реализована функция _trim_history:
def _trim_history(messages: list[dict], max_tokens: int = 80_000) -> list[dict]:
"""Обрезать историю если она слишком длинная.
Стратегия: сохраняем ПЕРВОЕ сообщение (запрос пользователя),
удаляем самые старые промежуточные сообщения.
"""
if _estimate_tokens(messages) <= max_tokens:
return messages # всё влезает — возвращаем как есть
trimmed = list(messages)
while len(trimmed) > 2 and _estimate_tokens(trimmed) > max_tokens:
trimmed.pop(1) # удаляем второе сообщение (самое старое промежуточное)
return trimmed
def _estimate_tokens(messages) -> int:
# Грубая оценка: 1 токен ≈ 4 символа
return len(str(messages)) // 4
Визуализация стратегии обрезки при 5 шагах агента:
Как выглядит состояние после работы
После трёх шагов агента AgentState выглядит так:
AgentState(
query="RAG best practices",
step=3,
messages=[
# 1. Начальный запрос
Message(role="user", content="Research topic: RAG best practices"),
# 2. Шаг 1: LLM решает искать
Message(role="assistant", content=[
{"type": "tool_use", "id": "tu_001",
"name": "search_web", "input": {"query": "RAG best practices 2024"}}
]),
# 3. Результат поиска
Message(role="user", content=[
{"type": "tool_result", "tool_use_id": "tu_001",
"content": '[{"url":"https://arxiv.org/...","title":"RAG Survey"}]'}
]),
# 4. Шаг 2: LLM решает загрузить страницы
Message(role="assistant", content=[
{"type": "tool_use", "id": "tu_002",
"name": "fetch_pages", "input": {"urls": ["https://arxiv.org/..."]}}
]),
# 5. Текст страниц
Message(role="user", content=[
{"type": "tool_result", "tool_use_id": "tu_002",
"content": '[{"url":"...","title":"...","content":"RAG combines..."}]'}
]),
# 6. Шаг 3: LLM пишет отчёт
Message(role="assistant", content=[
{"type": "tool_use", "id": "tu_003",
"name": "write_report",
"input": {"title": "RAG Best Practices", "content": "# RAG...", "sources": [...]}}
]),
# 7. Подтверждение записи отчёта
Message(role="user", content=[
{"type": "tool_result", "tool_use_id": "tu_003",
"content": "Report written: RAG Best Practices (842 words)"}
]),
],
sources=[
Source(url="https://arxiv.org/abs/2312.10997", title="RAG Survey"),
Source(url="https://towardsdatascience.com/...", title="Advanced RAG"),
],
report="# RAG Best Practices\n\n## 1. Стратегии чанкинга..."
)
Попробуйте сами
AgentState можно использовать и исследовать независимо от всего агента:
from agent.state import AgentState, Message, Source
# Создаём пустое состояние
state = AgentState(query="Что такое RAG?")
print(f"Шагов: {state.step}, Сообщений: {len(state.messages)}")
# Шагов: 0, Сообщений: 0
# Добавляем сообщения
state.append_message(Message(role="user", content="Research topic: RAG"))
state.append_message(Message(role="assistant", content=[
{"type": "tool_use", "id": "tu_001", "name": "search_web",
"input": {"query": "RAG retrieval augmented generation"}}
]))
state.increment_step()
print(f"Шагов: {state.step}, Сообщений: {len(state.messages)}")
# Шагов: 1, Сообщений: 2
# Смотрим что уйдёт в API
api_msgs = state.to_api_messages()
print(api_msgs[1]["content"][0]["name"]) # search_web
# Дедупликация источников
state.add_source(Source(url="https://arxiv.org/1", title="RAG Paper"))
state.add_source(Source(url="https://arxiv.org/1", title="RAG Paper")) # дубль
state.add_source(Source(url="https://arxiv.org/2", title="RAG Survey"))
print(f"Уникальных источников: {len(state.sources)}") # 2
Шпаргалка
| Концепт | Как реализовано | Зачем |
|---|---|---|
| Память сессии | AgentState.messages | LLM видит всю историю при каждом вызове |
| Append-only | Только append_message(), нет удаления | LLM не «забывает» уже сделанное |
| tool_use блок | content: [{"type":"tool_use","id":...}] | LLM сообщает что хочет вызвать |
| tool_result блок | role="user", content: [{"type":"tool_result"}] | Результат инструмента попадает в контекст |
| Дедупликация URL | add_source() проверяет по URL | Один источник — одна запись |
| Trim history | _trim_history() в llm_client.py | Защита от переполнения context window |
| API формат | to_api_messages() → list[dict] | API ожидает dict, не dataclass |
Практическое задание
-
Напишите функцию
count_tokens(state: AgentState) -> int, которая возвращает приблизительное количество токенов в истории сообщений. Используйте эвристику из_estimate_tokens: длина строки ÷ 4. -
Запустите агента с флагом
--verboseи найдите в логах строки сtool_dispatched. Посчитайте, сколько сообщений было в AgentState на каждом шаге. -
Измените
_trim_history: вместо удаления старых промежуточных сообщений, попробуйте сохранять только первое и три последних. Как это изменит поведение агента при длинных сессиях?