Зачем вообще нужна асинхронность?

Представь: твой AI-агент должен запросить данные из трёх источников — сделать поиск в интернете, обратиться к векторной базе данных и вызвать LLM API. Синхронно это выглядит так:

Запрос 1[ждём 1.2с] → результат
Запрос 2[ждём 0.8с] → результат
Запрос 3[ждём 2.1с] → результат

Итого: 1.2 + 0.8 + 2.1 = 4.1 секунды

Каждый запрос ждёт завершения предыдущего. Программа буквально ничего не делает во время ожидания — процессор простаивает.

Асинхронно — все три запроса стартуют одновременно:

Запрос 1━━━━━━━━━━━━ → результат
Запрос 2━━━━━━━━ → результат
Запрос 3━━━━━━━━━━━━━━━━━━━━━ → результат

Итого: max(1.2, 0.8, 2.1) = 2.1 секунды

Ускорение почти в 2 раза. На реальных агентах с 5–10 параллельными операциями разница ещё значительнее.

ℹ️ Важно понимать

Asyncio не делает программу быстрее в вычислительном смысле. Она решает проблему I/O-bound задач — когда программа тратит время на ожидание сети, диска, API. Для CPU-heavy задач (математика, обработка изображений) нужен multiprocessing.

AI-агенты — почти всегда I/O-bound: они ждут ответов от внешних сервисов.

Как работает asyncio: event loop

В основе asyncio — event loop (цикл событий). Это бесконечный цикл, который:

  1. Берёт задачу из очереди
  2. Выполняет её до первого await
  3. «Паркует» задачу ждать результата
  4. Берёт следующую задачу из очереди
  5. Возвращается к запаркованным задачам, как только они получили данные
Event Loop

┌─────────────────────────────────────────────────┐
│                                                 │
│  [Task A] → await запрос → ...паркуем...        │
│  [Task B] → await запрос → ...паркуем...        │
│  [Task C] → await запрос → ...паркуем...        │
│                                                 │
│  Сеть вернула ответ для Task B → возобновляем B │
│  Сеть вернула ответ для Task A → возобновляем A │
│  Сеть вернула ответ для Task C → возобновляем C │
│                                                 │
└─────────────────────────────────────────────────┘

Ключевое слово: кооперативная многозадачность. Задачи сами говорят event loop'у: «я жду, занимайся другими». Это происходит через await.

Синтаксис: async def и await

async def — объявление корутины

Обычная функция стает корутиной при добавлении async:

Обычная vs асинхронная функция
python
# Обычная функция — выполняется синхронно
def get_response():
    return "Привет!"

# Корутина — можно паузить и возобновлять
async def get_response_async():
    return "Привет!"

# ВАЖНО: вызов корутины НЕ выполняет её сразу!
result = get_response()          # Здесь: сразу вернёт строку
coro = get_response_async()      # Здесь: вернёт объект корутины (не строку!)
print(type(coro))                # <class 'coroutine'>

# Правильный вызов — через await или asyncio.run()
import asyncio
result = asyncio.run(get_response_async())  # "Привет!"
⚠️ Частая ошибка новичков

Забыть await перед вызовом async-функции. Python не выдаст ошибку — просто вернёт объект корутины вместо результата. Всегда используй await внутри async-функции.

await — точка ожидания

await говорит event loop'у: «жди здесь результата, и пока займись другими задачами».

Как работает await
python
import asyncio

async def fetch_data(name: str, delay: float) -> str:
    print(f"[{name}] Начинаем запрос...")
    await asyncio.sleep(delay)  # Имитируем сетевой запрос
    print(f"[{name}] Получили ответ!")
    return f"Данные от {name}"

async def main():
    # Последовательно — 3 секунды
    result1 = await fetch_data("API-1", 1.0)
    result2 = await fetch_data("API-2", 0.5)
    result3 = await fetch_data("API-3", 1.5)
    print(result1, result2, result3)

asyncio.run(main())

Запусти этот код — увидишь, что запросы выполняются по очереди. Несмотря на async/await, они не параллельны. Почему? Потому что каждый await ждёт завершения, прежде чем перейти к следующему. Чтобы запустить параллельно — нужен asyncio.gather().

asyncio.gather() — параллельный запуск

Это самая важная функция для AI-агентов. gather() запускает несколько корутин одновременно и ждёт завершения всех:

asyncio.gather() — параллельное выполнение
python
import asyncio
import time

async def fetch_data(name: str, delay: float) -> str:
    print(f"[{name}] Старт")
    await asyncio.sleep(delay)
    print(f"[{name}] Готово!")
    return f"Результат от {name}"

async def main():
    start = time.time()

    # Запускаем ВСЕ три задачи одновременно
    results = await asyncio.gather(
        fetch_data("Поиск", 1.2),
        fetch_data("Vector DB", 0.8),
        fetch_data("LLM API", 2.1),
    )

    elapsed = time.time() - start
    print(f"\nВсе готово за {elapsed:.1f}с")
    print(f"Результаты: {results}")

asyncio.run(main())

# Вывод:
# [Поиск] Старт
# [Vector DB] Старт
# [LLM API] Старт
# [Vector DB] Готово!    ← первым вернулся (0.8с)
# [Поиск] Готово!        ← вторым (1.2с)
# [LLM API] Готово!      ← третьим (2.1с)
#
# Все готово за 2.1с      ← не 4.1с!
# Результаты: ['Результат от Поиск', 'Результат от Vector DB', 'Результат от LLM API']
Ключевой момент

gather() возвращает список результатов в том же порядке, что и переданные корутины — независимо от порядка завершения. Можно безопасно делать results[0], results[1] etc.

gather() со списком задач

Часто задачи формируются динамически — например, для каждого документа в списке:

Динамический список корутин
python
import asyncio

async def summarize(document: str) -> str:
    # В реальности — вызов LLM API
    await asyncio.sleep(0.5)
    return f"Краткое содержание: {document[:50]}..."

async def main():
    documents = [
        "Длинный документ 1...",
        "Длинный документ 2...",
        "Длинный документ 3...",
        "Длинный документ 4...",
    ]

    # Создаём список корутин
    tasks = [summarize(doc) for doc in documents]

    # Запускаем все параллельно
    summaries = await asyncio.gather(*tasks)

    for doc, summary in zip(documents, summaries):
        print(f"Документ: {doc[:30]}... → {summary}")

asyncio.run(main())

Реальный пример: параллельные запросы к LLM

Вот практический сценарий для агента: нужно проанализировать несколько фрагментов текста с помощью Claude или GPT-4.

Параллельные запросы к OpenAI API
python
import asyncio
from openai import AsyncOpenAI  # Обратите внимание: AsyncOpenAI, не OpenAI

client = AsyncOpenAI()  # api_key читается из OPENAI_API_KEY

async def analyze_text(text: str, question: str) -> str:
    """Анализирует текст с помощью LLM."""
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Ты — аналитик. Отвечай кратко."},
            {"role": "user", "content": f"Текст: {text}\n\nВопрос: {question}"},
        ],
        max_tokens=200,
    )
    return response.choices[0].message.content

async def analyze_all(texts: list[str], question: str) -> list[str]:
    """Анализирует все тексты параллельно."""
    tasks = [analyze_text(text, question) for text in texts]
    return await asyncio.gather(*tasks)

async def main():
    texts = [
        "Python — интерпретируемый язык программирования...",
        "Machine learning — раздел искусственного интеллекта...",
        "Blockchain — распределённый реестр данных...",
    ]

    question = "О чём этот текст? Одно предложение."

    results = await analyze_all(texts, question)

    for text, result in zip(texts, results):
        print(f"Текст: {text[:40]}...")
        print(f"Ответ: {result}\n")

asyncio.run(main())
ℹ️ AsyncOpenAI vs OpenAI

Большинство SDK для LLM имеют асинхронный вариант клиента: AsyncOpenAI, anthropic.AsyncAnthropic. Используй именно его в async-коде — иначе получишь блокирующий вызов внутри event loop, что нейтрализует все преимущества asyncio.

То же с Anthropic Claude

Параллельные запросы к Claude API
python
import asyncio
import anthropic

# Асинхронный клиент Anthropic
client = anthropic.AsyncAnthropic()

async def ask_claude(prompt: str) -> str:
    message = await client.messages.create(
        model="claude-haiku-4-5-20251001",  # Быстрая модель для простых задач
        max_tokens=300,
        messages=[{"role": "user", "content": prompt}],
    )
    return message.content[0].text

async def main():
    prompts = [
        "Что такое coroutine в Python? Два предложения.",
        "Что такое event loop? Два предложения.",
        "Зачем нужен asyncio.gather? Два предложения.",
    ]

    # Все три запроса идут параллельно
    answers = await asyncio.gather(*[ask_claude(p) for p in prompts])

    for prompt, answer in zip(prompts, answers):
        print(f"Q: {prompt}")
        print(f"A: {answer}\n")

asyncio.run(main())

asyncio.create_task() — фоновые задачи

Иногда нужно запустить задачу в фоне и не ждать её прямо сейчас. Для этого есть create_task():

create_task() — задача в фоне
python
import asyncio

async def heavy_task(name: str) -> str:
    print(f"{name}: начинаем...")
    await asyncio.sleep(2)
    print(f"{name}: готово!")
    return f"Результат {name}"

async def quick_task() -> str:
    print("Быстрая задача: старт")
    await asyncio.sleep(0.1)
    return "Быстрый результат"

async def main():
    # Запускаем heavy_task в фоне — не ждём сразу
    bg_task = asyncio.create_task(heavy_task("Фоновая задача"))

    # Пока фоновая задача работает — делаем другое
    quick_result = await quick_task()
    print(f"Получили быстрый результат: {quick_result}")

    # Только теперь ждём фоновую задачу
    bg_result = await bg_task
    print(f"Получили фоновый результат: {bg_result}")

asyncio.run(main())

# Вывод:
# Фоновая задача: начинаем...
# Быстрая задача: старт
# Быстрый результат: Быстрый результат   ← через 0.1с
# Фоновая задача: готово!                ← через 2с
# Фоновый результат: Результат Фоновая задача

В AI-агентах create_task() используют, например, чтобы начать загрузку данных пока агент планирует следующие шаги.

Таймауты и отмена задач

Агенты должны быть устойчивы к зависаниям. Если LLM API не отвечает 30 секунд — нужно отменить запрос:

asyncio.timeout() — ограничение времени ожидания
python
import asyncio

async def slow_llm_call() -> str:
    await asyncio.sleep(10)  # Имитируем зависший API
    return "Ответ"

async def main():
    # Способ 1: asyncio.timeout() (Python 3.11+)
    try:
        async with asyncio.timeout(5.0):  # 5 секунд максимум
            result = await slow_llm_call()
    except asyncio.TimeoutError:
        print("Таймаут! LLM API не ответил за 5 секунд")
        result = "Запасной ответ"

    print(f"Результат: {result}")

    # Способ 2: asyncio.wait_for() (Python 3.8+)
    try:
        result = await asyncio.wait_for(
            slow_llm_call(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("Таймаут через wait_for!")

asyncio.run(main())

Semaphore — контроль параллельности

Проблема: если запустить 100 запросов к LLM API одновременно, получишь 429 Rate Limit. Semaphore ограничивает максимальное число одновременных запросов:

asyncio.Semaphore — ограничение concurrent запросов
python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

# Не более 5 одновременных запросов к API
semaphore = asyncio.Semaphore(5)

async def process_document(doc: str, index: int) -> str:
    async with semaphore:  # Занимаем слот
        print(f"Обрабатываем документ {index}...")
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"Summarize: {doc}"}],
            max_tokens=100,
        )
        return response.choices[0].message.content
    # Слот освобождается автоматически при выходе из with

async def main():
    # 20 документов, но не более 5 одновременных запросов
    documents = [f"Document {i} content..." for i in range(20)]
    tasks = [process_document(doc, i) for i, doc in enumerate(documents)]

    results = await asyncio.gather(*tasks)
    print(f"Обработано {len(results)} документов")

asyncio.run(main())

Паттерн для AI-агента

Соберём всё вместе в типичный паттерн async-агента:

Структура async AI-агента
python
import asyncio
import httpx
from openai import AsyncOpenAI

client = AsyncOpenAI()
search_semaphore = asyncio.Semaphore(3)  # Макс 3 поиска одновременно

# ---- Инструменты агента ----

async def web_search(query: str) -> str:
    """Поиск в интернете."""
    async with search_semaphore:
        async with httpx.AsyncClient() as http:
            # В реальности — API Tavily, SerpAPI и т.п.
            await asyncio.sleep(0.5)  # Имитируем запрос
            return f"Результаты поиска по запросу: {query}"

async def vector_search(query: str) -> list[str]:
    """Поиск в векторной базе."""
    await asyncio.sleep(0.3)
    return [f"Документ 1 о {query}", f"Документ 2 о {query}"]

# ---- Ядро агента ----

async def gather_context(user_query: str) -> dict:
    """Собираем контекст параллельно из нескольких источников."""
    web_task = asyncio.create_task(web_search(user_query))
    vec_task = asyncio.create_task(vector_search(user_query))

    web_results, vec_results = await asyncio.gather(web_task, vec_task)

    return {
        "web": web_results,
        "documents": vec_results,
    }

async def generate_answer(query: str, context: dict) -> str:
    """Генерируем финальный ответ через LLM."""
    context_text = f"Веб: {context['web']}\nДокументы: {', '.join(context['documents'])}"

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Ты помощник. Отвечай на основе предоставленного контекста."},
            {"role": "user", "content": f"Вопрос: {query}\n\nКонтекст:\n{context_text}"},
        ],
    )
    return response.choices[0].message.content

async def agent(query: str) -> str:
    """Главная функция агента."""
    print(f"Обрабатываем запрос: {query}")

    # Шаг 1: Параллельный сбор контекста
    context = await gather_context(query)
    print(f"Контекст собран: {len(context)} источника")

    # Шаг 2: Генерация ответа
    answer = await generate_answer(query, context)
    return answer

async def main():
    # Несколько запросов параллельно
    queries = [
        "Что такое asyncio в Python?",
        "Как работает LangGraph?",
        "Что такое RAG в LLM?",
    ]

    answers = await asyncio.gather(*[agent(q) for q in queries])

    for query, answer in zip(queries, answers):
        print(f"\n{'='*50}")
        print(f"Вопрос: {query}")
        print(f"Ответ: {answer}")

asyncio.run(main())

Типичные ошибки

Ошибка 1: Блокирующий вызов внутри async-функции

Если использовать синхронный requests или time.sleep() внутри async def — весь event loop заблокируется. Никакие другие задачи не будут выполняться во время ожидания.

Неправильно vs правильно
python
import asyncio
import time
import requests
import httpx

# ❌ НЕПРАВИЛЬНО — блокирует весь event loop
async def bad_fetch(url: str) -> str:
    time.sleep(1)                          # Блокирует!
    response = requests.get(url)           # Блокирует!
    return response.text

# ✅ ПРАВИЛЬНО — не блокирует event loop
async def good_fetch(url: str) -> str:
    await asyncio.sleep(1)                 # Отдаёт управление
    async with httpx.AsyncClient() as c:
        response = await c.get(url)        # Неблокирующий
    return response.text

# ❌ НЕПРАВИЛЬНО — синхронный OpenAI клиент
from openai import OpenAI
sync_client = OpenAI()

async def bad_llm_call(prompt: str) -> str:
    response = sync_client.chat.completions.create(...)  # Блокирует!
    return response.choices[0].message.content

# ✅ ПРАВИЛЬНО — асинхронный клиент
from openai import AsyncOpenAI
async_client = AsyncOpenAI()

async def good_llm_call(prompt: str) -> str:
    response = await async_client.chat.completions.create(...)  # Ок!
    return response.choices[0].message.content
Ошибка 2: Забыть await

Python выдаст предупреждение RuntimeWarning: coroutine was never awaited, но не ошибку. Функция просто не выполнится.

Забытый await
python
async def get_data():
    await asyncio.sleep(1)
    return "данные"

async def main():
    # ❌ НЕПРАВИЛЬНО
    result = get_data()   # result — объект корутины, не "данные"!
    print(result)         # <coroutine object get_data at 0x...>

    # ✅ ПРАВИЛЬНО
    result = await get_data()   # result — "данные"
    print(result)               # данные
Ошибка 3: asyncio.run() внутри async-функции

asyncio.run() создаёт новый event loop. Если вызвать его внутри уже работающего event loop — получишь ошибку RuntimeError: This event loop is already running.

asyncio.run() — только на верхнем уровне
python
async def inner():
    return "результат"

# ❌ НЕПРАВИЛЬНО — asyncio.run() внутри async-функции
async def outer():
    result = asyncio.run(inner())  # RuntimeError!

# ✅ ПРАВИЛЬНО — используй await внутри async
async def outer():
    result = await inner()  # Правильно

# asyncio.run() — только на верхнем уровне программы
if __name__ == "__main__":
    asyncio.run(outer())

Когда НЕ нужен asyncio

Ситуация Asyncio Что использовать
Много параллельных HTTP-запросов / LLM API ✅ Отлично asyncio + httpx/async клиент
Чтение/запись файлов (много операций) ✅ Хорошо asyncio + aiofiles
Работа с PostgreSQL/MongoDB ✅ Хорошо asyncpg, motor, sqlalchemy async
Тяжёлые вычисления (numpy, torch) ❌ Не поможет multiprocessing / threading
Простой скрипт с одним запросом ❌ Оверкилл Синхронный код
CPU-heavy обработка изображений/аудио ❌ Не поможет multiprocessing

Шпаргалка: async/await одним взглядом

Asyncio cheatsheet
python
import asyncio

# 1. Объявление async-функции
async def my_coroutine():
    return "result"

# 2. Запуск из синхронного кода (точка входа)
asyncio.run(my_coroutine())

# 3. await — ждём результат внутри async-функции
async def main():
    result = await my_coroutine()

# 4. Параллельный запуск нескольких корутин
results = await asyncio.gather(coro1(), coro2(), coro3())

# 5. Параллельный запуск списка
tasks = [my_coroutine() for _ in range(10)]
results = await asyncio.gather(*tasks)

# 6. Фоновая задача
task = asyncio.create_task(my_coroutine())
# ... делаем другое ...
result = await task  # Ждём когда нужно

# 7. Таймаут
try:
    async with asyncio.timeout(5.0):
        result = await slow_function()
except asyncio.TimeoutError:
    result = "default"

# 8. Ограничение параллельности
sem = asyncio.Semaphore(5)
async def limited():
    async with sem:
        await my_coroutine()

# 9. Сон без блокировки
await asyncio.sleep(1.0)  # Не time.sleep()!

# 10. Проверка: корутина или нет?
import inspect
inspect.iscoroutinefunction(my_coroutine)  # True

Практическое задание

Попробуй реализовать следующее самостоятельно:

Задание: Async новостной агрегатор

  1. Создай функцию fetch_headlines(source: str) -> list[str], которая имитирует получение заголовков новостей (просто asyncio.sleep + возврат тестовых данных)
  2. Создай список из 5 источников новостей
  3. Запусти сбор со всех источников параллельно через asyncio.gather()
  4. Добавь таймаут 3 секунды для каждого источника
  5. Добавь Semaphore(2) чтобы одновременно обращаться не более чем к 2 источникам
  6. Выведи все собранные заголовки

Что дальше