Исходный код проекта
github.com/ivanshamaev/research-agent
Папка tools/ — все инструменты и реестр
Открыть →

Что такое инструмент агента

Инструмент — это обычная async Python-функция. Ничего магического. Магия в том, как LLM узнаёт о ней и решает её вызвать.

Схема работы принципиально отличается от обычного вызова функции:

  ┌──────────────────────────────────────────────────────────────┐
  │  Обычный вызов: вы решаете, что вызвать                      │
  │                                                              │
  │  your_code → search_web(query="RAG") → результат            │
  └──────────────────────────────────────────────────────────────┘

  ┌──────────────────────────────────────────────────────────────┐
  │  Вызов через LLM: LLM решает, что вызвать                   │
  │                                                              │
  │  1. Вы: отправляете сообщение + список инструментов (схемы) │
  │  2. LLM: анализирует задачу, выбирает инструмент            │
  │  3. LLM: возвращает {"name":"search_web","input":{...}}     │
  │  4. Вы: перехватываете, вызываете реальную функцию          │
  │  5. Вы: передаёте результат обратно LLM                     │
  └──────────────────────────────────────────────────────────────┘

LLM не имеет прямого доступа к вашему коду. Она работает с JSON-описаниями инструментов и возвращает JSON-блоки с вызовами. Вы сами выполняете реальные функции и возвращаете результаты.

JSON Schema: как описать инструмент для LLM

Перед каждым вызовом LLM получает список инструментов в формате JSON Schema. На основе этих описаний модель решает, какой инструмент вызвать и с какими аргументами.

{ "name": "search_web", "description": "Search the internet for information on a topic. Returns a list of relevant URLs with titles and snippets. Use this first to discover sources.", ↑ LLM читает это и решает: вызывать или нет, и когда "input_schema": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query. Be specific and include key terms." }, "max_results": { "type": "integer", "description": "Max results to return (1-10). Default: 5.", "default": 5 } }, "required": ["query"] ← max_results необязателен } }

Качество description определяет поведение агента

LLM читает description и на его основе принимает три решения: вызывать ли инструмент вообще, когда его вызывать, и какие аргументы передать. Плохое описание — непредсказуемое поведение.

❌ плохо
"Search web" ← LLM не знает когда использовать
❌ плохо
"Searches the internet" ← нет контекста и порядка
✅ хорошо
"Search the internet for information on a topic. Returns URLs with titles and snippets. Use this first to discover sources."
Три элемента хорошего description: 1) что делает инструмент, 2) что возвращает, 3) когда использовать. «Use this first» — явная инструкция для LLM о порядке использования.

Четыре инструмента проекта

🔍
search_web
DuckDuckGo поиск. Возвращает список URL с заголовками и сниппетами. Первый шаг любого исследования.
📥
fetch_pages
Загружает несколько страниц параллельно. BeautifulSoup удаляет HTML-шум. Возвращает чистый текст.
asyncio.gather
📝
summarize_page
Сжимает длинный текст через LLM-вызов. Используется если страница слишком большая.
LLM call
✍️
write_report
Оформляет финальный Markdown-отчёт. Вызов этого инструмента завершает сессию.
★ терминальный

Первый вызов в любой сессии. Ищет в DuckDuckGo без ключей API.

async def search_web(query: str, max_results: int = 5) -> list[dict[str, str]]:
    results = await asyncio.to_thread(_ddg_search, query, max_results)
    return results

def _ddg_search(query: str, max_results: int) -> list[dict]:
    try:
        raw = DDGS().text(query, max_results=max_results) or []
    except (RatelimitException, TimeoutException, DDGSException) as e:
        raise ToolError(str(e), tool_name="search_web") from e

    return [{"url": r["href"], "title": r["title"], "snippet": r["body"]} for r in raw]

Почему asyncio.to_thread?

Библиотека ddgs (DuckDuckGo) — синхронная. Она блокирует поток, пока ждёт ответа от сети. Агент работает в async event loop. Вызов синхронной функции напрямую заблокировал бы весь цикл.

  ❌ Без to_thread — event loop заблокирован:
  ─────────────────────────────────────────────
  event loop: ████████ [ждёт DuckDuckGo] ████████ → другие задачи стоят
              0ms                      1200ms

  ✅ С to_thread — event loop свободен:
  ─────────────────────────────────────────────
  event loop: ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ → может обрабатывать другие задачи
  thread pool: █████████ [ждёт DuckDuckGo] █
              0ms                         1200ms

Инструмент 2: fetch_pages

Загружает несколько страниц одновременно через asyncio.gather. При 5 страницах по 1 секунде каждая: последовательно — 5 сек, параллельно — ~1 сек.

async def fetch_pages(urls: list[str]) -> list[dict[str, str]]:
    async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
        raw_results = await asyncio.gather(
            *[_fetch_one(client, url) for url in urls],
            return_exceptions=True,   # ← один упавший URL не ломает остальные
        )

    output = []
    for url, result in zip(urls, raw_results):
        if isinstance(result, Exception):
            output.append({"url": url, "error": str(result)})   # LLM увидит ошибку
        else:
            output.append(result)
    return output

async def _fetch_one(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url)
    response.raise_for_status()

    soup = BeautifulSoup(response.text, "html.parser")

    # Удаляем навигацию и шумовые теги
    for tag in soup(["script", "style", "nav", "header", "footer", "aside"]):
        tag.decompose()

    text = soup.get_text(separator="\n", strip=True)

    # Обрезаем до 3000 символов — достаточно для анализа, не перегружает контекст
    if len(text) > 3_000:
        text = text[:3_000] + "\n... [truncated]"

    title = soup.find("title")
    return {"url": url, "title": title.text if title else url, "content": text}
Почему именно 3000 символов? 5 страниц × 3000 = 15 000 символов ≈ 3750 токенов. При 8000 символах на страницу — 40 000 символов, это больше половины контекстного окна многих моделей. 3000 — баланс между полнотой и экономией токенов.

Инструмент 3: summarize_page

Используется когда страница слишком длинная даже после обрезки, или когда нужно выделить только самое важное по конкретному аспекту.

SUMMARIZE_PROMPT = """\
Summarize the following content, focusing on: {focus}
Extract the most important facts, statistics, and insights.
Be concise — aim for 200-400 words.

Content:
{content}
"""

async def summarize_page(content: str, focus: str = "key findings") -> str:
    client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
    prompt = SUMMARIZE_PROMPT.format(focus=focus, content=content[:8_000])

    message = await client.messages.create(
        model=settings.DEFAULT_MODEL,
        max_tokens=600,
        messages=[{"role": "user", "content": prompt}],
    )
    return message.content[0].text
Ограничение: summarize_page использует Anthropic API напрямую. При использовании другого провайдера инструмент вернёт ошибку. Оркестратор перехватит её (ToolError) и продолжит работу без суммаризации. LLM увидит сообщение об ошибке и адаптируется.

Инструмент 4: write_report (терминальный)

Единственный инструмент, вызов которого завершает сессию. LLM не пишет отчёт построчно — она формирует весь Markdown-текст и передаёт его в аргументе content при вызове.

@dataclass
class ReportResult:
    title: str
    content: str
    sources: list[dict[str, str]]
    word_count: int

async def write_report(
    title: str,
    content: str,
    sources: list[dict[str, str]] | None = None
) -> ReportResult:
    sources = sources or []

    # Добавляем раздел References если есть источники
    if sources:
        refs = "\n\n## References\n\n"
        for i, src in enumerate(sources, 1):
            refs += f"{i}. [{src['title']}]({src['url']})\n"
        content = content.rstrip() + refs

    return ReportResult(
        title=title,
        content=content,
        sources=sources,
        word_count=len(content.split())
    )

Важно: write_report только форматирует готовый текст. LLM написала содержимое отчёта в поле content при вызове. Функция добавляет список источников и считает слова. Это делает инструмент детерминированным и легко тестируемым.

ToolRegistry: диспетчеризация вызовов

Оркестратор не знает о конкретных инструментах напрямую. Он работает только через ToolRegistry. Это позволяет добавлять новые инструменты без изменения оркестратора.

100%
колёсико — масштаб  ·  зажать и тянуть — перемещение
🤖 LLM tool_use блок name + input (JSON) dispatch() TOOL REGISTRY 1. normalize args url_list → urls 2. coerce types "5" → 5, str → list 3. func(**kwargs) 🔍 search_web → list[{url, title, snippet}] 📥 fetch_pages → list[{url, title, content}] 📝 summarize_page → str (краткое резюме) ✍️ write_report ★ → ReportResult tool_result → LLM json.dumps(result) добавляется в AgentState
class ToolRegistry:
    def __init__(self):
        self._schemas = {s["name"]: s for s in TOOL_SCHEMAS}
        self._dispatch = TOOL_DISPATCH   # dict: name → async function

    def get_schemas(self) -> list[dict]:
        """Вернуть все схемы для передачи в LLM."""
        return list(self._schemas.values())

    async def dispatch(self, tool_name: str, **kwargs) -> Any:
        """Найти и вызвать инструмент по имени."""
        if tool_name not in self._dispatch:
            raise ToolError(f"Unknown tool: {tool_name}")

        kwargs = _normalize_arg_names(tool_name, kwargs)  # Шаг 1
        kwargs = _coerce_args(kwargs, self._schemas[tool_name]["input_schema"])  # Шаг 2
        return await self._dispatch[tool_name](**kwargs)  # Шаг 3

Нормализация аргументов для открытых моделей

Коммерческие модели (Claude, GPT-4) строго следуют JSON Schema. Открытые модели через Ollama или OpenRouter иногда ошибаются в именах аргументов. Реестр содержит защиту от известных отклонений.

Шаг 1: алиасы имён аргументов

_ARG_ALIASES = {
    "fetch_pages": {
        "url_list":  "urls",     # Llama3 называет так
        "url":       "urls",     # единственное число
        "page_urls": "urls",
        "links":     "urls",
    },
    "search_web": {
        "search_query": "query", # частое отклонение
        "q":            "query",
        "keywords":     "query",
        "num_results":  "max_results",
    },
    ...
}

def _normalize_arg_names(tool_name: str, kwargs: dict) -> dict:
    aliases = _ARG_ALIASES.get(tool_name, {})
    return {aliases.get(k, k): v for k, v in kwargs.items()}

Шаг 2: приведение типов

def _coerce_args(kwargs: dict, input_schema: dict) -> dict:
    """Привести типы аргументов к ожидаемым по схеме."""
    properties = input_schema.get("properties", {})
    result = dict(kwargs)

    for key, value in list(result.items()):
        prop_type = properties.get(key, {}).get("type")

        if prop_type == "integer" and not isinstance(value, int):
            result[key] = int(value)           # "10" → 10

        elif prop_type == "array":
            if isinstance(value, str):
                try:
                    parsed = json.loads(value) # '["a","b"]' → ["a", "b"]
                    if isinstance(parsed, list):
                        result[key] = parsed
                except json.JSONDecodeError:
                    result[key] = [value]      # одна строка → список
            elif not isinstance(value, list):
                result[key] = [value]          # любой объект → список

    return result
Пример: Llama3 передаёт fetch_pages(url_list='["https://a.com"]') 1. normalize: url_list urls # alias lookup {urls: '["https://a.com"]'} 2. coerce: '["https://a.com"]' ["https://a.com"] # json.loads {urls: ["https://a.com"]} 3. call: fetch_pages(urls=["https://a.com"]) # реальная функция

Обработка ошибок: ToolError

Все ошибки в инструментах должны бросать именно ToolError — не ValueError и не стандартные исключения. Оркестратор перехватывает ToolError и передаёт сообщение об ошибке обратно LLM. Цикл продолжается.

from tools.registry import ToolError

# ❌ НЕПРАВИЛЬНО — оркестратор упадёт с необработанным исключением
async def bad_tool(query: str) -> list:
    if not query:
        raise ValueError("Query is empty")  # оркестратор не знает об этом

# ✅ ПРАВИЛЬНО — оркестратор перехватит, LLM увидит ошибку и адаптируется
async def good_tool(query: str) -> list:
    if not query:
        raise ToolError("Query is empty", tool_name="good_tool")

# В оркестраторе (invariant 3):
try:
    result = await self.registry.dispatch(tool_name, **tool_input)
    return json.dumps(result, ...)
except ToolError as e:
    log.warning("tool_error", tool=tool_name, error=str(e))
    return f"Error executing {tool_name}: {e}"  # ← строка уходит в контекст LLM

Как добавить новый инструмент

Добавление инструмента занимает ~15 минут и не требует изменений в оркестраторе или LLM-клиенте. Разберём на примере инструмента extract_keywords.

1
Создать файл tools/extract_keywords.py с функцией async def extract_keywords(...)
2
Добавить JSON Schema в список TOOL_SCHEMAS в tools/registry.py
3
Зарегистрировать функцию в TOOL_DISPATCH в той же registry.py
4
Написать тест в tests/test_tools.py — хотя бы 2 кейса
5
Проверить: python3 -c "from tools.registry import ToolRegistry; print(ToolRegistry().list_tools())"

Шаг 1: файл инструмента

# tools/extract_keywords.py
from __future__ import annotations
import re
from collections import Counter
import structlog
from tools.registry import ToolError

log = structlog.get_logger(__name__)

async def extract_keywords(text: str, max_keywords: int = 10) -> list[str]:
    """Извлечь ключевые слова из текста по частоте.

    Args:
        text: Текст для анализа.
        max_keywords: Количество ключевых слов (1-20). Default: 10.

    Returns:
        Список ключевых слов по убыванию частоты.
    """
    if not text or len(text.strip()) < 10:
        raise ToolError("Text is too short", tool_name="extract_keywords")

    # Только слова длиннее 3 символов, без стоп-слов
    words = re.findall(r'\b[a-zA-Zа-яА-Я]{4,}\b', text.lower())
    stop_words = {"this", "that", "with", "from", "have", "been", "they"}
    filtered = [w for w in words if w not in stop_words]

    if not filtered:
        raise ToolError("No keywords found", tool_name="extract_keywords")

    keywords = [word for word, _ in Counter(filtered).most_common(max_keywords)]
    log.info("keywords_extracted", count=len(keywords))
    return keywords

Шаг 2: JSON Schema в registry.py

# В tools/registry.py, добавить в список TOOL_SCHEMAS:
{
    "name": "extract_keywords",
    "description": (
        "Extract key terms and concepts from text content. "
        "Use this to identify main topics before writing the report."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "text": {
                "type": "string",
                "description": "The text content to analyze.",
            },
            "max_keywords": {
                "type": "integer",
                "description": "Max keywords to return (1-20). Default: 10.",
                "default": 10,
            },
        },
        "required": ["text"],
    },
},

Шаг 3: регистрация в TOOL_DISPATCH

# В tools/registry.py, в функции _register_tools():
def _register_tools() -> None:
    from tools.search import search_web
    from tools.fetch import fetch_pages
    from tools.summarize import summarize_page
    from tools.report import write_report
    from tools.extract_keywords import extract_keywords   # ← добавить

    TOOL_DISPATCH.update({
        "search_web":       search_web,
        "fetch_pages":      fetch_pages,
        "summarize_page":   summarize_page,
        "write_report":     write_report,
        "extract_keywords": extract_keywords,             # ← добавить
    })
Почему импорты внутри функции? registry.py импортирует из tools/search.py, который импортирует ToolError из registry.py. Это циклический импорт. Отложенный импорт внутри функции разрывает цикл: к моменту вызова _register_tools() все модули уже загружены.

Шаг 4: тест

# В tests/test_tools.py:
import pytest
from tools.registry import ToolRegistry, ToolError

@pytest.mark.anyio
async def test_extract_keywords_returns_list():
    from tools.extract_keywords import extract_keywords
    result = await extract_keywords(
        "Retrieval-Augmented Generation combines retrieval with generation models.",
        max_keywords=5
    )
    assert isinstance(result, list)
    assert len(result) <= 5
    assert all(isinstance(k, str) for k in result)

@pytest.mark.anyio
async def test_extract_keywords_raises_on_empty():
    from tools.extract_keywords import extract_keywords
    with pytest.raises(ToolError):
        await extract_keywords("")

@pytest.mark.anyio
async def test_extract_keywords_via_registry():
    registry = ToolRegistry()
    result = await registry.dispatch("extract_keywords", text="Machine learning models.")
    assert isinstance(result, list)

Проверяем, что всё работает:

python3 -c "from tools.registry import ToolRegistry; print(ToolRegistry().list_tools())"
# ['search_web', 'fetch_pages', 'summarize_page', 'write_report', 'extract_keywords']

pytest tests/test_tools.py -v -k "keyword"
# test_extract_keywords_returns_list PASSED
# test_extract_keywords_raises_on_empty PASSED
# test_extract_keywords_via_registry PASSED
Оркестратор узнаёт о новом инструменте автоматически. При каждом вызове оркестратор делает self.registry.get_schemas() — он получит актуальный список включая новый инструмент. Никаких изменений в orchestrator.py не нужно.

Шпаргалка

КонцептГдеСуть
JSON SchemaTOOL_SCHEMAS в registry.pyОписание инструмента: name, description, input_schema
descriptionПоле в схемеLLM читает и решает когда использовать — пишите тщательно
requiredinput_schema.requiredТолько обязательные аргументы; необязательные с default в description
TOOL_DISPATCHDict в registry.pyname → async функция; именно отсюда вызывается реальный код
_normalize_arg_namesregistry.pyАлиасы для opensource-моделей: url_list → urls
_coerce_argsregistry.pyПриведение типов: "5" → 5, "[]" → []
ToolErrorregistry.pyЕдинственный тип ошибок в инструментах; цикл не ломается
asyncio.to_threadtools/search.pyСинхронный код в пуле потоков без блокировки event loop
asyncio.gathertools/fetch.pyПараллельная загрузка нескольких страниц
return_exceptions=Trueasyncio.gatherОдин упавший URL не ломает остальные

Практическое задание

  1. Добавьте инструмент count_words(text: str) -> dict, который считает количество слов, предложений и уникальных слов в тексте. Пройдите все 5 шагов чеклиста. Убедитесь, что тесты проходят.
  2. Проверьте description. Измените description у search_web: уберите фразу «Use this first». Запустите агента дважды — изменилось ли поведение? Когда теперь LLM решает начать с поиска?
  3. Сломайте нормализацию. В _ARG_ALIASES удалите алиас url_list → urls. Если у вас есть доступ к Ollama с Llama — попробуйте запустить агента. Что происходит? Если нет — напишите тест, который эмулирует передачу url_list.