requests vs httpx: почему httpx?
requests — великолепная библиотека, стандарт де-факто много лет. Но у неё одна фундаментальная проблема для AI-агентов: она синхронная. Нельзя просто взять и вызвать её внутри async def — заблокируешь весь event loop.
| Возможность | requests | httpx |
|---|---|---|
| Синхронные запросы | ✅ | ✅ |
| Async/await поддержка | ❌ | ✅ AsyncClient |
| HTTP/2 поддержка | ❌ | ✅ |
| Совместимый API с requests | — | ✅ почти идентичен |
| Стриминг ответов | ✅ | ✅ sync и async |
| Таймауты (connect + read раздельно) | ⚠️ базово | ✅ детально |
| Используется в LLM SDK | ❌ | ✅ openai, anthropic, langchain |
Если пишешь простой одноразовый скрипт без asyncio — requests прекрасно работает и проще. Но для любого агента, LangGraph-графа или FastAPI-приложения — используй httpx.
pip install httpx
# HTTP/2 поддержка (опционально, нужен h2)
pip install "httpx[http2]"
# Для retry-логики
pip install tenacity
Основы: первые запросы
Синхронные запросы
API httpx намеренно максимально близок к requests — переход почти не требует переучивания:
import httpx
# GET-запрос
response = httpx.get("https://httpbin.org/get")
print(response.status_code) # 200
print(response.json()) # dict из JSON-тела ответа
print(response.text) # строка
print(response.headers) # заголовки ответа
# GET с параметрами строки запроса (?q=python&limit=5)
response = httpx.get(
"https://api.example.com/search",
params={"q": "asyncio python", "limit": 5},
)
print(response.url) # https://api.example.com/search?q=asyncio+python&limit=5
# POST с JSON-телом
response = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": "Bearer sk-..."},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Привет!"}],
},
)
data = response.json()
print(data["choices"][0]["message"]["content"])
# Проверка на ошибки — бросает HTTPStatusError при 4xx/5xx
response.raise_for_status()
# Или без исключения — проверяем вручную
if response.is_success: # 2xx
print("Успех!")
elif response.is_error: # 4xx или 5xx
print(f"Ошибка: {response.status_code}")
Асинхронные запросы — AsyncClient
В async-коде всегда используй AsyncClient. Это ключевое отличие от requests:
import asyncio
import httpx
async def fetch_page(url: str) -> str:
# async with — автоматически закрывает соединение
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.text
async def main():
html = await fetch_page("https://example.com")
print(html[:200])
asyncio.run(main())
Создание AsyncClient() внутри каждой функции — расточительно: каждый раз поднимается новый пул соединений. Правильно: создай один клиент на время жизни приложения и переиспользуй его.
Жизненный цикл клиента
Клиент хранит пул соединений (connection pool), куки, заголовки сессии. Его нужно явно закрывать или использовать как контекстный менеджер:
import httpx
import asyncio
# ── Паттерн 1: контекстный менеджер (для скриптов) ──
async def run_script():
async with httpx.AsyncClient(
base_url="https://api.openai.com",
headers={"Authorization": "Bearer sk-..."},
timeout=30.0,
) as client:
# Клиент живёт внутри блока
r1 = await client.post("/v1/chat/completions", json={...})
r2 = await client.get("/v1/models")
# Здесь клиент уже закрыт
# ── Паттерн 2: глобальный клиент (для приложений) ──
# http_client.py — создаём один раз
_client: httpx.AsyncClient | None = None
async def get_client() -> httpx.AsyncClient:
global _client
if _client is None:
_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=5.0, read=30.0, write=10.0, pool=5.0),
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
)
return _client
async def close_client():
global _client
if _client:
await _client.aclose()
_client = None
# В FastAPI — lifespan events:
# @asynccontextmanager
# async def lifespan(app):
# yield
# await close_client()
# ── Паттерн 3: синглтон через dependency injection ──
from functools import lru_cache
@lru_cache(maxsize=1)
def get_sync_client() -> httpx.Client:
"""Синхронный клиент — синглтон."""
return httpx.Client(timeout=30.0)
Таймауты: не зависай навсегда
LLM API может отвечать секундами. Внешние сервисы могут зависнуть. Без таймаутов агент ждёт бесконечно — это неприемлемо в production:
import httpx
# Простой общий таймаут (секунды)
client = httpx.AsyncClient(timeout=30.0)
# Детальный контроль каждой фазы
timeout = httpx.Timeout(
connect=5.0, # Время на установку TCP-соединения
read=60.0, # Время на получение ответа (большое для LLM!)
write=10.0, # Время на отправку тела запроса
pool=5.0, # Время ожидания свободного соединения из пула
)
client = httpx.AsyncClient(timeout=timeout)
# Переопределить таймаут для конкретного запроса
async with httpx.AsyncClient(timeout=30.0) as client:
# Долгий запрос — нужен большой read timeout
resp = await client.post(
"/v1/chat/completions",
json={"model": "gpt-4o", "messages": [...]},
timeout=httpx.Timeout(connect=5.0, read=120.0), # 2 минуты на чтение
)
# Быстрый health-check — короткий таймаут
health = await client.get("/health", timeout=2.0)
# Обработка таймаута
try:
resp = await client.get("https://slow-api.com/data", timeout=5.0)
except httpx.TimeoutException as e:
print(f"Таймаут: {type(e).__name__} — {e}")
# TimeoutException — базовый класс
# ConnectTimeout — не смогли подключиться
# ReadTimeout — сервер не ответил вовремя
Заголовки и аутентификация
import httpx
import os
api_key = os.environ["OPENAI_API_KEY"]
# Заголовки на уровне клиента — применяются ко всем запросам
client = httpx.AsyncClient(
base_url="https://api.openai.com/v1",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "MyAgent/1.0",
},
)
# Дополнительные заголовки для конкретного запроса
# (мёрджатся с клиентскими)
response = await client.post(
"/chat/completions",
json={"model": "gpt-4o", "messages": [...]},
headers={"X-Request-Id": "req-abc-123"}, # Добавляется к общим
)
# Bearer-токен — через встроенный класс
client = httpx.AsyncClient(
auth=httpx.BearerAuth("your-token-here"),
)
# Basic auth
client = httpx.AsyncClient(
auth=("username", "password"),
)
Параллельные запросы
Главная причина использовать AsyncClient — параллельные запросы. Комбинируем с asyncio.gather() из первого урока:
import asyncio
import httpx
from pydantic import BaseModel
class SearchResult(BaseModel):
query: str
results: list[str]
source: str
async def search_source(
client: httpx.AsyncClient,
source: str,
query: str,
) -> SearchResult:
"""Поиск в одном источнике."""
response = await client.get(
f"https://{source}/search",
params={"q": query},
timeout=10.0,
)
response.raise_for_status()
data = response.json()
return SearchResult(
query=query,
results=data.get("items", []),
source=source,
)
async def multi_source_search(query: str) -> list[SearchResult]:
"""Ищем во всех источниках одновременно."""
sources = ["news-api.com", "wiki-api.com", "scholar-api.com"]
async with httpx.AsyncClient() as client:
# Один клиент — несколько параллельных запросов
# Эффективнее: переиспользуются соединения из пула
tasks = [search_source(client, src, query) for src in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Фильтруем упавшие запросы
good = []
for src, r in zip(sources, results):
if isinstance(r, Exception):
print(f"Источник {src} недоступен: {r}")
else:
good.append(r)
return good
async def main():
results = await multi_source_search("asyncio python tutorial")
for r in results:
print(f"{r.source}: {len(r.results)} результатов")
asyncio.run(main())
Ограничение параллельности через Semaphore
import asyncio
import httpx
async def scrape_pages(urls: list[str], max_concurrent: int = 5) -> list[str]:
"""Скрейпим страницы, не более max_concurrent одновременно."""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch(client: httpx.AsyncClient, url: str) -> str:
async with semaphore:
try:
r = await client.get(url, timeout=15.0)
r.raise_for_status()
return r.text
except httpx.HTTPError as e:
print(f"Ошибка {url}: {e}")
return ""
async with httpx.AsyncClient(
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; AgentBot/1.0)"},
) as client:
tasks = [fetch(client, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [f"https://example.com/page/{i}" for i in range(20)]
pages = await scrape_pages(urls, max_concurrent=5)
print(f"Загружено {sum(1 for p in pages if p)} страниц")
Retry-логика: устойчивость к сбоям
LLM API иногда возвращает 429 Too Many Requests или 500 Internal Server Error. Агент должен повторять запрос с задержкой, а не падать:
Простой retry вручную
import asyncio
import httpx
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
async def request_with_retry(
client: httpx.AsyncClient,
method: str,
url: str,
max_retries: int = 3,
**kwargs,
) -> httpx.Response:
"""HTTP-запрос с exponential backoff."""
last_exc = None
for attempt in range(max_retries + 1):
try:
response = await client.request(method, url, **kwargs)
if response.status_code in RETRYABLE_STATUS:
# 429 может вернуть Retry-After заголовок
retry_after = response.headers.get("Retry-After")
wait = float(retry_after) if retry_after else 2 ** attempt
print(f"Статус {response.status_code}, ждём {wait:.1f}с (попытка {attempt+1})")
await asyncio.sleep(wait)
continue
return response # Успех или неретраябельная ошибка
except (httpx.ConnectError, httpx.ReadTimeout) as e:
last_exc = e
if attempt == max_retries:
break
wait = 2 ** attempt # 1с, 2с, 4с...
print(f"Ошибка соединения, ждём {wait}с (попытка {attempt+1}): {e}")
await asyncio.sleep(wait)
raise last_exc or httpx.RequestError(f"Превышено число попыток: {max_retries}")
Retry через tenacity — production-ready
tenacity — мощная библиотека для retry с декораторами. Именно её используют openai и anthropic SDK под капотом:
import httpx
import asyncio
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
wait_random_exponential,
retry_if_exception_type,
retry_if_result,
before_sleep_log,
)
import logging
logger = logging.getLogger(__name__)
# Retry при сетевых ошибках и rate limit
@retry(
stop=stop_after_attempt(4), # Максимум 4 попытки
wait=wait_exponential(multiplier=1, min=1, max=30), # 1с, 2с, 4с, 8с... макс 30с
retry=retry_if_exception_type((
httpx.ConnectError,
httpx.ReadTimeout,
httpx.RemoteProtocolError,
)),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
async def fetch_with_retry(url: str) -> dict:
async with httpx.AsyncClient(timeout=20.0) as client:
r = await client.get(url)
r.raise_for_status()
return r.json()
# Для LLM API — retry на 429 и 5xx
def is_retryable_status(response: httpx.Response) -> bool:
return response.status_code in {429, 500, 502, 503, 504}
@retry(
stop=stop_after_attempt(5),
# Случайный джиттер — важно для параллельных запросов!
# Без джиттера все retry происходят одновременно → thundering herd
wait=wait_random_exponential(min=1, max=60),
retry=(
retry_if_exception_type(httpx.TransportError) |
retry_if_result(is_retryable_status)
),
)
async def call_llm_api(client: httpx.AsyncClient, payload: dict) -> httpx.Response:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json=payload,
)
return response
async def main():
result = await fetch_with_retry("https://api.example.com/data")
print(result)
Стриминг: читаем ответ по частям
LLM API отдаёт токены по мере генерации — это Server-Sent Events (SSE). Стриминг позволяет показывать ответ пользователю в реальном времени, не ожидая полного завершения:
import asyncio
import httpx
import json
import os
async def stream_chat(prompt: str) -> str:
"""Стримим ответ от OpenAI и печатаем по мере поступления."""
full_response = ""
async with httpx.AsyncClient(timeout=httpx.Timeout(read=120.0)) as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"stream": True, # ← включаем стриминг
},
) as response:
response.raise_for_status()
# Читаем поток строк (SSE формат: "data: {...}\n\n")
async for line in response.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[len("data: "):]
if data_str == "[DONE]":
break # Конец стрима
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
delta = chunk["choices"][0].get("delta", {})
token = delta.get("content", "")
if token:
print(token, end="", flush=True) # Печатаем токен сразу
full_response += token
print() # Перевод строки в конце
return full_response
async def main():
answer = await stream_chat("Объясни asyncio в Python за 3 предложения")
print(f"\nПолный ответ ({len(answer)} символов)")
asyncio.run(main())
Стриминг больших файлов
import httpx
import asyncio
from pathlib import Path
async def download_file(url: str, dest: Path, chunk_size: int = 8192) -> int:
"""Скачивает файл стримингом, не загружая всё в RAM."""
total = 0
async with httpx.AsyncClient(timeout=httpx.Timeout(read=300.0)) as client:
async with client.stream("GET", url, follow_redirects=True) as response:
response.raise_for_status()
file_size = int(response.headers.get("content-length", 0))
print(f"Размер файла: {file_size / 1024 / 1024:.1f} МБ")
with open(dest, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=chunk_size):
f.write(chunk)
total += len(chunk)
if file_size:
pct = total / file_size * 100
print(f"\rЗагружено: {pct:.1f}%", end="", flush=True)
print()
return total
async def main():
dest = Path("dataset.jsonl")
size = await download_file("https://example.com/large-dataset.jsonl", dest)
print(f"Загружено {size / 1024:.1f} КБ → {dest}")
asyncio.run(main())
Обработка ошибок: все случаи
import httpx
import asyncio
async def safe_request(url: str) -> dict | None:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status() # Бросает HTTPStatusError при 4xx/5xx
return response.json()
# ── Ошибки сети ──
except httpx.ConnectError:
print("Не удалось подключиться — проверь URL и сеть")
except httpx.ConnectTimeout:
print("Таймаут подключения — сервер не отвечает")
except httpx.ReadTimeout:
print("Таймаут чтения — сервер завис после подключения")
# ── HTTP ошибки ──
except httpx.HTTPStatusError as e:
code = e.response.status_code
if code == 401:
print("Неверный API-ключ (401 Unauthorized)")
elif code == 403:
print("Нет доступа к ресурсу (403 Forbidden)")
elif code == 404:
print(f"Ресурс не найден: {e.request.url}")
elif code == 422:
print(f"Невалидные параметры запроса: {e.response.json()}")
elif code == 429:
retry_after = e.response.headers.get("Retry-After", "неизвестно")
print(f"Rate limit! Повтори через {retry_after}с")
elif code >= 500:
print(f"Ошибка сервера {code} — попробуй позже")
# ── Прочие проблемы ──
except httpx.DecodingError:
print("Ответ не удалось декодировать — сервер вернул невалидные данные")
except httpx.TooManyRedirects:
print("Слишком много редиректов")
except httpx.RequestError as e:
# Базовый класс для всех сетевых ошибок — ловим остаток
print(f"Ошибка запроса: {type(e).__name__}: {e}")
return None
Клиент для AI-агента: собираем всё вместе
Типичный переиспользуемый HTTP-клиент, который используется в реальных агентах — с retry, таймаутами, логированием и rate limiting:
# http_client.py
import asyncio
import logging
import httpx
from typing import Any
from tenacity import retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type
logger = logging.getLogger(__name__)
RETRYABLE = (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError)
class AgentHttpClient:
"""
Переиспользуемый async HTTP-клиент для AI-агента.
Один экземпляр на всё приложение.
"""
def __init__(
self,
*,
base_url: str = "",
api_key: str | None = None,
timeout: float = 30.0,
max_connections: int = 20,
max_retries: int = 3,
):
headers = {"User-Agent": "AgentBot/1.0"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
self._client = httpx.AsyncClient(
base_url=base_url,
headers=headers,
timeout=httpx.Timeout(connect=5.0, read=timeout, write=10.0),
limits=httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_connections // 2,
),
follow_redirects=True,
)
self._semaphore = asyncio.Semaphore(max_connections)
self._max_retries = max_retries
async def get(self, url: str, **kwargs) -> httpx.Response:
return await self._request("GET", url, **kwargs)
async def post(self, url: str, **kwargs) -> httpx.Response:
return await self._request("POST", url, **kwargs)
async def get_json(self, url: str, **kwargs) -> Any:
r = await self.get(url, **kwargs)
r.raise_for_status()
return r.json()
async def post_json(self, url: str, payload: dict, **kwargs) -> Any:
r = await self.post(url, json=payload, **kwargs)
r.raise_for_status()
return r.json()
@retry(
stop=stop_after_attempt(3),
wait=wait_random_exponential(min=1, max=20),
retry=retry_if_exception_type(RETRYABLE),
)
async def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
async with self._semaphore:
logger.debug(f"→ {method} {url}")
response = await self._client.request(method, url, **kwargs)
logger.debug(f"← {response.status_code} {url}")
return response
async def close(self):
await self._client.aclose()
# Поддержка async context manager
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
# Пример использования в агенте
import os
async def run_agent():
async with AgentHttpClient(
api_key=os.environ.get("TAVILY_API_KEY"),
timeout=15.0,
) as http:
# Параллельный поиск в нескольких источниках
queries = ["asyncio python", "pydantic v2", "langgraph tutorial"]
tasks = [
http.get_json(
"https://api.tavily.com/search",
params={"query": q, "max_results": 3},
)
for q in queries
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for query, result in zip(queries, results):
if isinstance(result, Exception):
logger.warning(f"Ошибка для '{query}': {result}")
else:
print(f"'{query}': {len(result.get('results', []))} результатов")
requests совместимость: быстрый переход
Если у тебя уже есть код на requests и нужно переписать на httpx — вот таблица замен:
| requests | httpx (sync) | httpx (async) |
|---|---|---|
requests.get(url) |
httpx.get(url) |
await client.get(url) |
requests.post(url, json=d) |
httpx.post(url, json=d) |
await client.post(url, json=d) |
requests.Session() |
httpx.Client() |
httpx.AsyncClient() |
r.json() |
r.json() |
r.json() |
r.text |
r.text |
r.text |
r.status_code |
r.status_code |
r.status_code |
r.raise_for_status() |
r.raise_for_status() |
r.raise_for_status() |
r.iter_content() |
r.iter_bytes() |
async for c in r.aiter_bytes() |
r.iter_lines() |
r.iter_lines() |
async for l in r.aiter_lines() |
requests.exceptions.ConnectionError |
httpx.ConnectError |
|
requests.exceptions.Timeout |
httpx.TimeoutException |
|
В httpx нет автоматического raise_for_status() — его нужно вызывать явно. Также httpx по умолчанию не следует редиректам — нужно передать follow_redirects=True.
Шпаргалка
import asyncio
import httpx
# ── Быстрые запросы (без клиента) ─────────────────────
r = httpx.get("https://example.com") # sync
r = httpx.post("https://example.com", json={}) # sync
# ── Async клиент (основной паттерн) ───────────────────
async with httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": "Bearer TOKEN"},
timeout=httpx.Timeout(connect=5.0, read=30.0),
follow_redirects=True,
) as client:
r = await client.get("/endpoint", params={"q": "query"})
r = await client.post("/endpoint", json={"key": "value"})
r.raise_for_status() # HTTPStatusError при 4xx/5xx
data = r.json() # dict
text = r.text # str
# ── Параллельные запросы ───────────────────────────────
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
client.get("/a"),
client.get("/b"),
client.get("/c"),
)
# ── Стриминг ──────────────────────────────────────────
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as r:
async for chunk in r.aiter_bytes(): # бинарный
...
async for line in r.aiter_lines(): # SSE, NDJSON
...
# ── Ошибки ────────────────────────────────────────────
try:
r = await client.get(url)
r.raise_for_status()
except httpx.ConnectError: # Нет соединения
...
except httpx.TimeoutException: # ConnectTimeout | ReadTimeout
...
except httpx.HTTPStatusError as e:
print(e.response.status_code) # 4xx/5xx
# ── Таймауты ──────────────────────────────────────────
httpx.Timeout(5.0) # Все фазы = 5с
httpx.Timeout(connect=3.0, read=60.0) # Раздельно
# ── Retry (через tenacity) ────────────────────────────
from tenacity import retry, stop_after_attempt, wait_random_exponential
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=20))
async def fetch(): ...
Практическое задание
Задание: HTTP-клиент для агента с поиском
- Напиши async функцию
fetch_url(url: str) -> str, которая загружает HTML-страницу с таймаутом 10с и обработкой всех типов ошибок httpx - Добавь retry через
tenacity: 3 попытки, exponential backoff с джиттером, только при сетевых ошибках - Напиши функцию
fetch_all(urls: list[str]) -> list[str], которая загружает все страницы параллельно, но не более 3 одновременно (Semaphore) - Создай класс
SearchClientс методомsearch(query: str) -> list[dict], который обращается к любому публичному поисковому API. ИспользуйAsyncClientкак атрибут класса, инициализируй в__aenter__ - Напиши стриминговую функцию, которая читает SSE-ответ построчно и возвращает
AsyncGenerator[str, None]