Зачем вообще нужна асинхронность?
Представь: твой AI-агент должен запросить данные из трёх источников — сделать поиск в интернете, обратиться к векторной базе данных и вызвать LLM API. Синхронно это выглядит так:
Запрос 1 → [ждём 1.2с] → результат Запрос 2 → [ждём 0.8с] → результат Запрос 3 → [ждём 2.1с] → результат Итого: 1.2 + 0.8 + 2.1 = 4.1 секунды
Каждый запрос ждёт завершения предыдущего. Программа буквально ничего не делает во время ожидания — процессор простаивает.
Асинхронно — все три запроса стартуют одновременно:
Запрос 1 → ━━━━━━━━━━━━ → результат Запрос 2 → ━━━━━━━━ → результат Запрос 3 → ━━━━━━━━━━━━━━━━━━━━━ → результат Итого: max(1.2, 0.8, 2.1) = 2.1 секунды
Ускорение почти в 2 раза. На реальных агентах с 5–10 параллельными операциями разница ещё значительнее.
Asyncio не делает программу быстрее в вычислительном смысле. Она решает проблему I/O-bound задач — когда программа тратит время на ожидание сети, диска, API. Для CPU-heavy задач (математика, обработка изображений) нужен multiprocessing.
AI-агенты — почти всегда I/O-bound: они ждут ответов от внешних сервисов.
Как работает asyncio: event loop
В основе asyncio — event loop (цикл событий). Это бесконечный цикл, который:
- Берёт задачу из очереди
- Выполняет её до первого
await - «Паркует» задачу ждать результата
- Берёт следующую задачу из очереди
- Возвращается к запаркованным задачам, как только они получили данные
Event Loop
┌─────────────────────────────────────────────────┐
│ │
│ [Task A] → await запрос → ...паркуем... │
│ [Task B] → await запрос → ...паркуем... │
│ [Task C] → await запрос → ...паркуем... │
│ │
│ Сеть вернула ответ для Task B → возобновляем B │
│ Сеть вернула ответ для Task A → возобновляем A │
│ Сеть вернула ответ для Task C → возобновляем C │
│ │
└─────────────────────────────────────────────────┘
Ключевое слово: кооперативная многозадачность. Задачи сами говорят event loop'у: «я жду, занимайся другими». Это происходит через await.
Синтаксис: async def и await
async def — объявление корутины
Обычная функция стает корутиной при добавлении async:
# Обычная функция — выполняется синхронно
def get_response():
return "Привет!"
# Корутина — можно паузить и возобновлять
async def get_response_async():
return "Привет!"
# ВАЖНО: вызов корутины НЕ выполняет её сразу!
result = get_response() # Здесь: сразу вернёт строку
coro = get_response_async() # Здесь: вернёт объект корутины (не строку!)
print(type(coro)) # <class 'coroutine'>
# Правильный вызов — через await или asyncio.run()
import asyncio
result = asyncio.run(get_response_async()) # "Привет!"
Забыть await перед вызовом async-функции. Python не выдаст ошибку — просто вернёт объект корутины вместо результата. Всегда используй await внутри async-функции.
await — точка ожидания
await говорит event loop'у: «жди здесь результата, и пока займись другими задачами».
import asyncio
async def fetch_data(name: str, delay: float) -> str:
print(f"[{name}] Начинаем запрос...")
await asyncio.sleep(delay) # Имитируем сетевой запрос
print(f"[{name}] Получили ответ!")
return f"Данные от {name}"
async def main():
# Последовательно — 3 секунды
result1 = await fetch_data("API-1", 1.0)
result2 = await fetch_data("API-2", 0.5)
result3 = await fetch_data("API-3", 1.5)
print(result1, result2, result3)
asyncio.run(main())
Запусти этот код — увидишь, что запросы выполняются по очереди. Несмотря на async/await, они не параллельны. Почему? Потому что каждый await ждёт завершения, прежде чем перейти к следующему. Чтобы запустить параллельно — нужен asyncio.gather().
asyncio.gather() — параллельный запуск
Это самая важная функция для AI-агентов. gather() запускает несколько корутин одновременно и ждёт завершения всех:
import asyncio
import time
async def fetch_data(name: str, delay: float) -> str:
print(f"[{name}] Старт")
await asyncio.sleep(delay)
print(f"[{name}] Готово!")
return f"Результат от {name}"
async def main():
start = time.time()
# Запускаем ВСЕ три задачи одновременно
results = await asyncio.gather(
fetch_data("Поиск", 1.2),
fetch_data("Vector DB", 0.8),
fetch_data("LLM API", 2.1),
)
elapsed = time.time() - start
print(f"\nВсе готово за {elapsed:.1f}с")
print(f"Результаты: {results}")
asyncio.run(main())
# Вывод:
# [Поиск] Старт
# [Vector DB] Старт
# [LLM API] Старт
# [Vector DB] Готово! ← первым вернулся (0.8с)
# [Поиск] Готово! ← вторым (1.2с)
# [LLM API] Готово! ← третьим (2.1с)
#
# Все готово за 2.1с ← не 4.1с!
# Результаты: ['Результат от Поиск', 'Результат от Vector DB', 'Результат от LLM API']
gather() возвращает список результатов в том же порядке, что и переданные корутины — независимо от порядка завершения. Можно безопасно делать results[0], results[1] etc.
gather() со списком задач
Часто задачи формируются динамически — например, для каждого документа в списке:
import asyncio
async def summarize(document: str) -> str:
# В реальности — вызов LLM API
await asyncio.sleep(0.5)
return f"Краткое содержание: {document[:50]}..."
async def main():
documents = [
"Длинный документ 1...",
"Длинный документ 2...",
"Длинный документ 3...",
"Длинный документ 4...",
]
# Создаём список корутин
tasks = [summarize(doc) for doc in documents]
# Запускаем все параллельно
summaries = await asyncio.gather(*tasks)
for doc, summary in zip(documents, summaries):
print(f"Документ: {doc[:30]}... → {summary}")
asyncio.run(main())
Реальный пример: параллельные запросы к LLM
Вот практический сценарий для агента: нужно проанализировать несколько фрагментов текста с помощью Claude или GPT-4.
import asyncio
from openai import AsyncOpenAI # Обратите внимание: AsyncOpenAI, не OpenAI
client = AsyncOpenAI() # api_key читается из OPENAI_API_KEY
async def analyze_text(text: str, question: str) -> str:
"""Анализирует текст с помощью LLM."""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Ты — аналитик. Отвечай кратко."},
{"role": "user", "content": f"Текст: {text}\n\nВопрос: {question}"},
],
max_tokens=200,
)
return response.choices[0].message.content
async def analyze_all(texts: list[str], question: str) -> list[str]:
"""Анализирует все тексты параллельно."""
tasks = [analyze_text(text, question) for text in texts]
return await asyncio.gather(*tasks)
async def main():
texts = [
"Python — интерпретируемый язык программирования...",
"Machine learning — раздел искусственного интеллекта...",
"Blockchain — распределённый реестр данных...",
]
question = "О чём этот текст? Одно предложение."
results = await analyze_all(texts, question)
for text, result in zip(texts, results):
print(f"Текст: {text[:40]}...")
print(f"Ответ: {result}\n")
asyncio.run(main())
Большинство SDK для LLM имеют асинхронный вариант клиента: AsyncOpenAI, anthropic.AsyncAnthropic. Используй именно его в async-коде — иначе получишь блокирующий вызов внутри event loop, что нейтрализует все преимущества asyncio.
То же с Anthropic Claude
import asyncio
import anthropic
# Асинхронный клиент Anthropic
client = anthropic.AsyncAnthropic()
async def ask_claude(prompt: str) -> str:
message = await client.messages.create(
model="claude-haiku-4-5-20251001", # Быстрая модель для простых задач
max_tokens=300,
messages=[{"role": "user", "content": prompt}],
)
return message.content[0].text
async def main():
prompts = [
"Что такое coroutine в Python? Два предложения.",
"Что такое event loop? Два предложения.",
"Зачем нужен asyncio.gather? Два предложения.",
]
# Все три запроса идут параллельно
answers = await asyncio.gather(*[ask_claude(p) for p in prompts])
for prompt, answer in zip(prompts, answers):
print(f"Q: {prompt}")
print(f"A: {answer}\n")
asyncio.run(main())
asyncio.create_task() — фоновые задачи
Иногда нужно запустить задачу в фоне и не ждать её прямо сейчас. Для этого есть create_task():
import asyncio
async def heavy_task(name: str) -> str:
print(f"{name}: начинаем...")
await asyncio.sleep(2)
print(f"{name}: готово!")
return f"Результат {name}"
async def quick_task() -> str:
print("Быстрая задача: старт")
await asyncio.sleep(0.1)
return "Быстрый результат"
async def main():
# Запускаем heavy_task в фоне — не ждём сразу
bg_task = asyncio.create_task(heavy_task("Фоновая задача"))
# Пока фоновая задача работает — делаем другое
quick_result = await quick_task()
print(f"Получили быстрый результат: {quick_result}")
# Только теперь ждём фоновую задачу
bg_result = await bg_task
print(f"Получили фоновый результат: {bg_result}")
asyncio.run(main())
# Вывод:
# Фоновая задача: начинаем...
# Быстрая задача: старт
# Быстрый результат: Быстрый результат ← через 0.1с
# Фоновая задача: готово! ← через 2с
# Фоновый результат: Результат Фоновая задача
В AI-агентах create_task() используют, например, чтобы начать загрузку данных пока агент планирует следующие шаги.
Таймауты и отмена задач
Агенты должны быть устойчивы к зависаниям. Если LLM API не отвечает 30 секунд — нужно отменить запрос:
import asyncio
async def slow_llm_call() -> str:
await asyncio.sleep(10) # Имитируем зависший API
return "Ответ"
async def main():
# Способ 1: asyncio.timeout() (Python 3.11+)
try:
async with asyncio.timeout(5.0): # 5 секунд максимум
result = await slow_llm_call()
except asyncio.TimeoutError:
print("Таймаут! LLM API не ответил за 5 секунд")
result = "Запасной ответ"
print(f"Результат: {result}")
# Способ 2: asyncio.wait_for() (Python 3.8+)
try:
result = await asyncio.wait_for(
slow_llm_call(),
timeout=5.0
)
except asyncio.TimeoutError:
print("Таймаут через wait_for!")
asyncio.run(main())
Semaphore — контроль параллельности
Проблема: если запустить 100 запросов к LLM API одновременно, получишь 429 Rate Limit. Semaphore ограничивает максимальное число одновременных запросов:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
# Не более 5 одновременных запросов к API
semaphore = asyncio.Semaphore(5)
async def process_document(doc: str, index: int) -> str:
async with semaphore: # Занимаем слот
print(f"Обрабатываем документ {index}...")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Summarize: {doc}"}],
max_tokens=100,
)
return response.choices[0].message.content
# Слот освобождается автоматически при выходе из with
async def main():
# 20 документов, но не более 5 одновременных запросов
documents = [f"Document {i} content..." for i in range(20)]
tasks = [process_document(doc, i) for i, doc in enumerate(documents)]
results = await asyncio.gather(*tasks)
print(f"Обработано {len(results)} документов")
asyncio.run(main())
Паттерн для AI-агента
Соберём всё вместе в типичный паттерн async-агента:
import asyncio
import httpx
from openai import AsyncOpenAI
client = AsyncOpenAI()
search_semaphore = asyncio.Semaphore(3) # Макс 3 поиска одновременно
# ---- Инструменты агента ----
async def web_search(query: str) -> str:
"""Поиск в интернете."""
async with search_semaphore:
async with httpx.AsyncClient() as http:
# В реальности — API Tavily, SerpAPI и т.п.
await asyncio.sleep(0.5) # Имитируем запрос
return f"Результаты поиска по запросу: {query}"
async def vector_search(query: str) -> list[str]:
"""Поиск в векторной базе."""
await asyncio.sleep(0.3)
return [f"Документ 1 о {query}", f"Документ 2 о {query}"]
# ---- Ядро агента ----
async def gather_context(user_query: str) -> dict:
"""Собираем контекст параллельно из нескольких источников."""
web_task = asyncio.create_task(web_search(user_query))
vec_task = asyncio.create_task(vector_search(user_query))
web_results, vec_results = await asyncio.gather(web_task, vec_task)
return {
"web": web_results,
"documents": vec_results,
}
async def generate_answer(query: str, context: dict) -> str:
"""Генерируем финальный ответ через LLM."""
context_text = f"Веб: {context['web']}\nДокументы: {', '.join(context['documents'])}"
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Ты помощник. Отвечай на основе предоставленного контекста."},
{"role": "user", "content": f"Вопрос: {query}\n\nКонтекст:\n{context_text}"},
],
)
return response.choices[0].message.content
async def agent(query: str) -> str:
"""Главная функция агента."""
print(f"Обрабатываем запрос: {query}")
# Шаг 1: Параллельный сбор контекста
context = await gather_context(query)
print(f"Контекст собран: {len(context)} источника")
# Шаг 2: Генерация ответа
answer = await generate_answer(query, context)
return answer
async def main():
# Несколько запросов параллельно
queries = [
"Что такое asyncio в Python?",
"Как работает LangGraph?",
"Что такое RAG в LLM?",
]
answers = await asyncio.gather(*[agent(q) for q in queries])
for query, answer in zip(queries, answers):
print(f"\n{'='*50}")
print(f"Вопрос: {query}")
print(f"Ответ: {answer}")
asyncio.run(main())
Типичные ошибки
Если использовать синхронный requests или time.sleep() внутри async def — весь event loop заблокируется. Никакие другие задачи не будут выполняться во время ожидания.
import asyncio
import time
import requests
import httpx
# ❌ НЕПРАВИЛЬНО — блокирует весь event loop
async def bad_fetch(url: str) -> str:
time.sleep(1) # Блокирует!
response = requests.get(url) # Блокирует!
return response.text
# ✅ ПРАВИЛЬНО — не блокирует event loop
async def good_fetch(url: str) -> str:
await asyncio.sleep(1) # Отдаёт управление
async with httpx.AsyncClient() as c:
response = await c.get(url) # Неблокирующий
return response.text
# ❌ НЕПРАВИЛЬНО — синхронный OpenAI клиент
from openai import OpenAI
sync_client = OpenAI()
async def bad_llm_call(prompt: str) -> str:
response = sync_client.chat.completions.create(...) # Блокирует!
return response.choices[0].message.content
# ✅ ПРАВИЛЬНО — асинхронный клиент
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def good_llm_call(prompt: str) -> str:
response = await async_client.chat.completions.create(...) # Ок!
return response.choices[0].message.content
Python выдаст предупреждение RuntimeWarning: coroutine was never awaited, но не ошибку. Функция просто не выполнится.
async def get_data():
await asyncio.sleep(1)
return "данные"
async def main():
# ❌ НЕПРАВИЛЬНО
result = get_data() # result — объект корутины, не "данные"!
print(result) # <coroutine object get_data at 0x...>
# ✅ ПРАВИЛЬНО
result = await get_data() # result — "данные"
print(result) # данные
asyncio.run() создаёт новый event loop. Если вызвать его внутри уже работающего event loop — получишь ошибку RuntimeError: This event loop is already running.
async def inner():
return "результат"
# ❌ НЕПРАВИЛЬНО — asyncio.run() внутри async-функции
async def outer():
result = asyncio.run(inner()) # RuntimeError!
# ✅ ПРАВИЛЬНО — используй await внутри async
async def outer():
result = await inner() # Правильно
# asyncio.run() — только на верхнем уровне программы
if __name__ == "__main__":
asyncio.run(outer())
Когда НЕ нужен asyncio
| Ситуация | Asyncio | Что использовать |
|---|---|---|
| Много параллельных HTTP-запросов / LLM API | ✅ Отлично | asyncio + httpx/async клиент |
| Чтение/запись файлов (много операций) | ✅ Хорошо | asyncio + aiofiles |
| Работа с PostgreSQL/MongoDB | ✅ Хорошо | asyncpg, motor, sqlalchemy async |
| Тяжёлые вычисления (numpy, torch) | ❌ Не поможет | multiprocessing / threading |
| Простой скрипт с одним запросом | ❌ Оверкилл | Синхронный код |
| CPU-heavy обработка изображений/аудио | ❌ Не поможет | multiprocessing |
Шпаргалка: async/await одним взглядом
import asyncio
# 1. Объявление async-функции
async def my_coroutine():
return "result"
# 2. Запуск из синхронного кода (точка входа)
asyncio.run(my_coroutine())
# 3. await — ждём результат внутри async-функции
async def main():
result = await my_coroutine()
# 4. Параллельный запуск нескольких корутин
results = await asyncio.gather(coro1(), coro2(), coro3())
# 5. Параллельный запуск списка
tasks = [my_coroutine() for _ in range(10)]
results = await asyncio.gather(*tasks)
# 6. Фоновая задача
task = asyncio.create_task(my_coroutine())
# ... делаем другое ...
result = await task # Ждём когда нужно
# 7. Таймаут
try:
async with asyncio.timeout(5.0):
result = await slow_function()
except asyncio.TimeoutError:
result = "default"
# 8. Ограничение параллельности
sem = asyncio.Semaphore(5)
async def limited():
async with sem:
await my_coroutine()
# 9. Сон без блокировки
await asyncio.sleep(1.0) # Не time.sleep()!
# 10. Проверка: корутина или нет?
import inspect
inspect.iscoroutinefunction(my_coroutine) # True
Практическое задание
Попробуй реализовать следующее самостоятельно:
Задание: Async новостной агрегатор
- Создай функцию
fetch_headlines(source: str) -> list[str], которая имитирует получение заголовков новостей (простоasyncio.sleep+ возврат тестовых данных) - Создай список из 5 источников новостей
- Запусти сбор со всех источников параллельно через
asyncio.gather() - Добавь таймаут 3 секунды для каждого источника
- Добавь
Semaphore(2)чтобы одновременно обращаться не более чем к 2 источникам - Выведи все собранные заголовки