requests vs httpx: почему httpx?

requests — великолепная библиотека, стандарт де-факто много лет. Но у неё одна фундаментальная проблема для AI-агентов: она синхронная. Нельзя просто взять и вызвать её внутри async def — заблокируешь весь event loop.

Возможность requests httpx
Синхронные запросы
Async/await поддержка AsyncClient
HTTP/2 поддержка
Совместимый API с requests ✅ почти идентичен
Стриминг ответов ✅ sync и async
Таймауты (connect + read раздельно) ⚠️ базово ✅ детально
Используется в LLM SDK ✅ openai, anthropic, langchain
ℹ️ Когда оставить requests

Если пишешь простой одноразовый скрипт без asyncio — requests прекрасно работает и проще. Но для любого агента, LangGraph-графа или FastAPI-приложения — используй httpx.

Установка
bash
pip install httpx

# HTTP/2 поддержка (опционально, нужен h2)
pip install "httpx[http2]"

# Для retry-логики
pip install tenacity

Основы: первые запросы

Синхронные запросы

API httpx намеренно максимально близок к requests — переход почти не требует переучивания:

GET, POST, заголовки, параметры
python
import httpx

# GET-запрос
response = httpx.get("https://httpbin.org/get")
print(response.status_code)   # 200
print(response.json())        # dict из JSON-тела ответа
print(response.text)          # строка
print(response.headers)       # заголовки ответа

# GET с параметрами строки запроса (?q=python&limit=5)
response = httpx.get(
    "https://api.example.com/search",
    params={"q": "asyncio python", "limit": 5},
)
print(response.url)  # https://api.example.com/search?q=asyncio+python&limit=5

# POST с JSON-телом
response = httpx.post(
    "https://api.openai.com/v1/chat/completions",
    headers={"Authorization": "Bearer sk-..."},
    json={
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": "Привет!"}],
    },
)
data = response.json()
print(data["choices"][0]["message"]["content"])

# Проверка на ошибки — бросает HTTPStatusError при 4xx/5xx
response.raise_for_status()

# Или без исключения — проверяем вручную
if response.is_success:     # 2xx
    print("Успех!")
elif response.is_error:     # 4xx или 5xx
    print(f"Ошибка: {response.status_code}")

Асинхронные запросы — AsyncClient

В async-коде всегда используй AsyncClient. Это ключевое отличие от requests:

AsyncClient — правильный async HTTP
python
import asyncio
import httpx

async def fetch_page(url: str) -> str:
    # async with — автоматически закрывает соединение
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.text

async def main():
    html = await fetch_page("https://example.com")
    print(html[:200])

asyncio.run(main())
Не создавай клиент на каждый запрос

Создание AsyncClient() внутри каждой функции — расточительно: каждый раз поднимается новый пул соединений. Правильно: создай один клиент на время жизни приложения и переиспользуй его.

Жизненный цикл клиента

Клиент хранит пул соединений (connection pool), куки, заголовки сессии. Его нужно явно закрывать или использовать как контекстный менеджер:

Правильный жизненный цикл клиента
python
import httpx
import asyncio

# ── Паттерн 1: контекстный менеджер (для скриптов) ──

async def run_script():
    async with httpx.AsyncClient(
        base_url="https://api.openai.com",
        headers={"Authorization": "Bearer sk-..."},
        timeout=30.0,
    ) as client:
        # Клиент живёт внутри блока
        r1 = await client.post("/v1/chat/completions", json={...})
        r2 = await client.get("/v1/models")
    # Здесь клиент уже закрыт

# ── Паттерн 2: глобальный клиент (для приложений) ──

# http_client.py — создаём один раз
_client: httpx.AsyncClient | None = None

async def get_client() -> httpx.AsyncClient:
    global _client
    if _client is None:
        _client = httpx.AsyncClient(
            timeout=httpx.Timeout(connect=5.0, read=30.0, write=10.0, pool=5.0),
            limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
        )
    return _client

async def close_client():
    global _client
    if _client:
        await _client.aclose()
        _client = None

# В FastAPI — lifespan events:
# @asynccontextmanager
# async def lifespan(app):
#     yield
#     await close_client()

# ── Паттерн 3: синглтон через dependency injection ──

from functools import lru_cache

@lru_cache(maxsize=1)
def get_sync_client() -> httpx.Client:
    """Синхронный клиент — синглтон."""
    return httpx.Client(timeout=30.0)

Таймауты: не зависай навсегда

LLM API может отвечать секундами. Внешние сервисы могут зависнуть. Без таймаутов агент ждёт бесконечно — это неприемлемо в production:

Детальные таймауты httpx
python
import httpx

# Простой общий таймаут (секунды)
client = httpx.AsyncClient(timeout=30.0)

# Детальный контроль каждой фазы
timeout = httpx.Timeout(
    connect=5.0,   # Время на установку TCP-соединения
    read=60.0,     # Время на получение ответа (большое для LLM!)
    write=10.0,    # Время на отправку тела запроса
    pool=5.0,      # Время ожидания свободного соединения из пула
)
client = httpx.AsyncClient(timeout=timeout)

# Переопределить таймаут для конкретного запроса
async with httpx.AsyncClient(timeout=30.0) as client:

    # Долгий запрос — нужен большой read timeout
    resp = await client.post(
        "/v1/chat/completions",
        json={"model": "gpt-4o", "messages": [...]},
        timeout=httpx.Timeout(connect=5.0, read=120.0),  # 2 минуты на чтение
    )

    # Быстрый health-check — короткий таймаут
    health = await client.get("/health", timeout=2.0)

# Обработка таймаута
try:
    resp = await client.get("https://slow-api.com/data", timeout=5.0)
except httpx.TimeoutException as e:
    print(f"Таймаут: {type(e).__name__} — {e}")
    # TimeoutException — базовый класс
    # ConnectTimeout — не смогли подключиться
    # ReadTimeout    — сервер не ответил вовремя

Заголовки и аутентификация

Заголовки на уровне клиента и запроса
python
import httpx
import os

api_key = os.environ["OPENAI_API_KEY"]

# Заголовки на уровне клиента — применяются ко всем запросам
client = httpx.AsyncClient(
    base_url="https://api.openai.com/v1",
    headers={
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
        "User-Agent": "MyAgent/1.0",
    },
)

# Дополнительные заголовки для конкретного запроса
# (мёрджатся с клиентскими)
response = await client.post(
    "/chat/completions",
    json={"model": "gpt-4o", "messages": [...]},
    headers={"X-Request-Id": "req-abc-123"},  # Добавляется к общим
)

# Bearer-токен — через встроенный класс
client = httpx.AsyncClient(
    auth=httpx.BearerAuth("your-token-here"),
)

# Basic auth
client = httpx.AsyncClient(
    auth=("username", "password"),
)

Параллельные запросы

Главная причина использовать AsyncClient — параллельные запросы. Комбинируем с asyncio.gather() из первого урока:

Параллельные запросы с одним клиентом
python
import asyncio
import httpx
from pydantic import BaseModel

class SearchResult(BaseModel):
    query: str
    results: list[str]
    source: str

async def search_source(
    client: httpx.AsyncClient,
    source: str,
    query: str,
) -> SearchResult:
    """Поиск в одном источнике."""
    response = await client.get(
        f"https://{source}/search",
        params={"q": query},
        timeout=10.0,
    )
    response.raise_for_status()
    data = response.json()
    return SearchResult(
        query=query,
        results=data.get("items", []),
        source=source,
    )

async def multi_source_search(query: str) -> list[SearchResult]:
    """Ищем во всех источниках одновременно."""
    sources = ["news-api.com", "wiki-api.com", "scholar-api.com"]

    async with httpx.AsyncClient() as client:
        # Один клиент — несколько параллельных запросов
        # Эффективнее: переиспользуются соединения из пула
        tasks = [search_source(client, src, query) for src in sources]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    # Фильтруем упавшие запросы
    good = []
    for src, r in zip(sources, results):
        if isinstance(r, Exception):
            print(f"Источник {src} недоступен: {r}")
        else:
            good.append(r)
    return good

async def main():
    results = await multi_source_search("asyncio python tutorial")
    for r in results:
        print(f"{r.source}: {len(r.results)} результатов")

asyncio.run(main())

Ограничение параллельности через Semaphore

Semaphore + httpx — контроль нагрузки на API
python
import asyncio
import httpx

async def scrape_pages(urls: list[str], max_concurrent: int = 5) -> list[str]:
    """Скрейпим страницы, не более max_concurrent одновременно."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch(client: httpx.AsyncClient, url: str) -> str:
        async with semaphore:
            try:
                r = await client.get(url, timeout=15.0)
                r.raise_for_status()
                return r.text
            except httpx.HTTPError as e:
                print(f"Ошибка {url}: {e}")
                return ""

    async with httpx.AsyncClient(
        follow_redirects=True,
        headers={"User-Agent": "Mozilla/5.0 (compatible; AgentBot/1.0)"},
    ) as client:
        tasks = [fetch(client, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(20)]
    pages = await scrape_pages(urls, max_concurrent=5)
    print(f"Загружено {sum(1 for p in pages if p)} страниц")

Retry-логика: устойчивость к сбоям

LLM API иногда возвращает 429 Too Many Requests или 500 Internal Server Error. Агент должен повторять запрос с задержкой, а не падать:

Простой retry вручную

Exponential backoff без библиотек
python
import asyncio
import httpx

RETRYABLE_STATUS = {429, 500, 502, 503, 504}

async def request_with_retry(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    max_retries: int = 3,
    **kwargs,
) -> httpx.Response:
    """HTTP-запрос с exponential backoff."""
    last_exc = None

    for attempt in range(max_retries + 1):
        try:
            response = await client.request(method, url, **kwargs)

            if response.status_code in RETRYABLE_STATUS:
                # 429 может вернуть Retry-After заголовок
                retry_after = response.headers.get("Retry-After")
                wait = float(retry_after) if retry_after else 2 ** attempt
                print(f"Статус {response.status_code}, ждём {wait:.1f}с (попытка {attempt+1})")
                await asyncio.sleep(wait)
                continue

            return response  # Успех или неретраябельная ошибка

        except (httpx.ConnectError, httpx.ReadTimeout) as e:
            last_exc = e
            if attempt == max_retries:
                break
            wait = 2 ** attempt  # 1с, 2с, 4с...
            print(f"Ошибка соединения, ждём {wait}с (попытка {attempt+1}): {e}")
            await asyncio.sleep(wait)

    raise last_exc or httpx.RequestError(f"Превышено число попыток: {max_retries}")

Retry через tenacity — production-ready

tenacity — мощная библиотека для retry с декораторами. Именно её используют openai и anthropic SDK под капотом:

tenacity — декларативный retry
python
import httpx
import asyncio
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    wait_random_exponential,
    retry_if_exception_type,
    retry_if_result,
    before_sleep_log,
)
import logging

logger = logging.getLogger(__name__)

# Retry при сетевых ошибках и rate limit
@retry(
    stop=stop_after_attempt(4),                           # Максимум 4 попытки
    wait=wait_exponential(multiplier=1, min=1, max=30),   # 1с, 2с, 4с, 8с... макс 30с
    retry=retry_if_exception_type((
        httpx.ConnectError,
        httpx.ReadTimeout,
        httpx.RemoteProtocolError,
    )),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)
async def fetch_with_retry(url: str) -> dict:
    async with httpx.AsyncClient(timeout=20.0) as client:
        r = await client.get(url)
        r.raise_for_status()
        return r.json()

# Для LLM API — retry на 429 и 5xx
def is_retryable_status(response: httpx.Response) -> bool:
    return response.status_code in {429, 500, 502, 503, 504}

@retry(
    stop=stop_after_attempt(5),
    # Случайный джиттер — важно для параллельных запросов!
    # Без джиттера все retry происходят одновременно → thundering herd
    wait=wait_random_exponential(min=1, max=60),
    retry=(
        retry_if_exception_type(httpx.TransportError) |
        retry_if_result(is_retryable_status)
    ),
)
async def call_llm_api(client: httpx.AsyncClient, payload: dict) -> httpx.Response:
    response = await client.post(
        "https://api.openai.com/v1/chat/completions",
        json=payload,
    )
    return response

async def main():
    result = await fetch_with_retry("https://api.example.com/data")
    print(result)

Стриминг: читаем ответ по частям

LLM API отдаёт токены по мере генерации — это Server-Sent Events (SSE). Стриминг позволяет показывать ответ пользователю в реальном времени, не ожидая полного завершения:

Стриминг SSE от OpenAI API
python
import asyncio
import httpx
import json
import os

async def stream_chat(prompt: str) -> str:
    """Стримим ответ от OpenAI и печатаем по мере поступления."""
    full_response = ""

    async with httpx.AsyncClient(timeout=httpx.Timeout(read=120.0)) as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"},
            json={
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": prompt}],
                "stream": True,   # ← включаем стриминг
            },
        ) as response:
            response.raise_for_status()

            # Читаем поток строк (SSE формат: "data: {...}\n\n")
            async for line in response.aiter_lines():
                if not line.startswith("data: "):
                    continue

                data_str = line[len("data: "):]

                if data_str == "[DONE]":
                    break  # Конец стрима

                try:
                    chunk = json.loads(data_str)
                except json.JSONDecodeError:
                    continue

                delta = chunk["choices"][0].get("delta", {})
                token = delta.get("content", "")

                if token:
                    print(token, end="", flush=True)  # Печатаем токен сразу
                    full_response += token

    print()  # Перевод строки в конце
    return full_response

async def main():
    answer = await stream_chat("Объясни asyncio в Python за 3 предложения")
    print(f"\nПолный ответ ({len(answer)} символов)")

asyncio.run(main())

Стриминг больших файлов

Скачивание файлов без загрузки в память
python
import httpx
import asyncio
from pathlib import Path

async def download_file(url: str, dest: Path, chunk_size: int = 8192) -> int:
    """Скачивает файл стримингом, не загружая всё в RAM."""
    total = 0
    async with httpx.AsyncClient(timeout=httpx.Timeout(read=300.0)) as client:
        async with client.stream("GET", url, follow_redirects=True) as response:
            response.raise_for_status()

            file_size = int(response.headers.get("content-length", 0))
            print(f"Размер файла: {file_size / 1024 / 1024:.1f} МБ")

            with open(dest, "wb") as f:
                async for chunk in response.aiter_bytes(chunk_size=chunk_size):
                    f.write(chunk)
                    total += len(chunk)

                    if file_size:
                        pct = total / file_size * 100
                        print(f"\rЗагружено: {pct:.1f}%", end="", flush=True)
    print()
    return total

async def main():
    dest = Path("dataset.jsonl")
    size = await download_file("https://example.com/large-dataset.jsonl", dest)
    print(f"Загружено {size / 1024:.1f} КБ → {dest}")

asyncio.run(main())

Обработка ошибок: все случаи

Иерархия исключений httpx и правильная обработка
python
import httpx
import asyncio

async def safe_request(url: str) -> dict | None:
    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.get(url)
            response.raise_for_status()  # Бросает HTTPStatusError при 4xx/5xx
            return response.json()

    # ── Ошибки сети ──
    except httpx.ConnectError:
        print("Не удалось подключиться — проверь URL и сеть")
    except httpx.ConnectTimeout:
        print("Таймаут подключения — сервер не отвечает")
    except httpx.ReadTimeout:
        print("Таймаут чтения — сервер завис после подключения")

    # ── HTTP ошибки ──
    except httpx.HTTPStatusError as e:
        code = e.response.status_code

        if code == 401:
            print("Неверный API-ключ (401 Unauthorized)")
        elif code == 403:
            print("Нет доступа к ресурсу (403 Forbidden)")
        elif code == 404:
            print(f"Ресурс не найден: {e.request.url}")
        elif code == 422:
            print(f"Невалидные параметры запроса: {e.response.json()}")
        elif code == 429:
            retry_after = e.response.headers.get("Retry-After", "неизвестно")
            print(f"Rate limit! Повтори через {retry_after}с")
        elif code >= 500:
            print(f"Ошибка сервера {code} — попробуй позже")

    # ── Прочие проблемы ──
    except httpx.DecodingError:
        print("Ответ не удалось декодировать — сервер вернул невалидные данные")
    except httpx.TooManyRedirects:
        print("Слишком много редиректов")
    except httpx.RequestError as e:
        # Базовый класс для всех сетевых ошибок — ловим остаток
        print(f"Ошибка запроса: {type(e).__name__}: {e}")

    return None

Клиент для AI-агента: собираем всё вместе

Типичный переиспользуемый HTTP-клиент, который используется в реальных агентах — с retry, таймаутами, логированием и rate limiting:

AgentHttpClient — production-ready клиент
python
# http_client.py
import asyncio
import logging
import httpx
from typing import Any
from tenacity import retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type

logger = logging.getLogger(__name__)

RETRYABLE = (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError)


class AgentHttpClient:
    """
    Переиспользуемый async HTTP-клиент для AI-агента.
    Один экземпляр на всё приложение.
    """

    def __init__(
        self,
        *,
        base_url: str = "",
        api_key: str | None = None,
        timeout: float = 30.0,
        max_connections: int = 20,
        max_retries: int = 3,
    ):
        headers = {"User-Agent": "AgentBot/1.0"}
        if api_key:
            headers["Authorization"] = f"Bearer {api_key}"

        self._client = httpx.AsyncClient(
            base_url=base_url,
            headers=headers,
            timeout=httpx.Timeout(connect=5.0, read=timeout, write=10.0),
            limits=httpx.Limits(
                max_connections=max_connections,
                max_keepalive_connections=max_connections // 2,
            ),
            follow_redirects=True,
        )
        self._semaphore = asyncio.Semaphore(max_connections)
        self._max_retries = max_retries

    async def get(self, url: str, **kwargs) -> httpx.Response:
        return await self._request("GET", url, **kwargs)

    async def post(self, url: str, **kwargs) -> httpx.Response:
        return await self._request("POST", url, **kwargs)

    async def get_json(self, url: str, **kwargs) -> Any:
        r = await self.get(url, **kwargs)
        r.raise_for_status()
        return r.json()

    async def post_json(self, url: str, payload: dict, **kwargs) -> Any:
        r = await self.post(url, json=payload, **kwargs)
        r.raise_for_status()
        return r.json()

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_random_exponential(min=1, max=20),
        retry=retry_if_exception_type(RETRYABLE),
    )
    async def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
        async with self._semaphore:
            logger.debug(f"→ {method} {url}")
            response = await self._client.request(method, url, **kwargs)
            logger.debug(f"← {response.status_code} {url}")
            return response

    async def close(self):
        await self._client.aclose()

    # Поддержка async context manager
    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()


# Пример использования в агенте
import os

async def run_agent():
    async with AgentHttpClient(
        api_key=os.environ.get("TAVILY_API_KEY"),
        timeout=15.0,
    ) as http:

        # Параллельный поиск в нескольких источниках
        queries = ["asyncio python", "pydantic v2", "langgraph tutorial"]
        tasks = [
            http.get_json(
                "https://api.tavily.com/search",
                params={"query": q, "max_results": 3},
            )
            for q in queries
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for query, result in zip(queries, results):
            if isinstance(result, Exception):
                logger.warning(f"Ошибка для '{query}': {result}")
            else:
                print(f"'{query}': {len(result.get('results', []))} результатов")

requests совместимость: быстрый переход

Если у тебя уже есть код на requests и нужно переписать на httpx — вот таблица замен:

requests httpx (sync) httpx (async)
requests.get(url) httpx.get(url) await client.get(url)
requests.post(url, json=d) httpx.post(url, json=d) await client.post(url, json=d)
requests.Session() httpx.Client() httpx.AsyncClient()
r.json() r.json() r.json()
r.text r.text r.text
r.status_code r.status_code r.status_code
r.raise_for_status() r.raise_for_status() r.raise_for_status()
r.iter_content() r.iter_bytes() async for c in r.aiter_bytes()
r.iter_lines() r.iter_lines() async for l in r.aiter_lines()
requests.exceptions.ConnectionError httpx.ConnectError
requests.exceptions.Timeout httpx.TimeoutException
⚠️ Ключевые различия requests vs httpx

В httpx нет автоматического raise_for_status() — его нужно вызывать явно. Также httpx по умолчанию не следует редиректам — нужно передать follow_redirects=True.

Шпаргалка

httpx cheatsheet для AI-инженера
python
import asyncio
import httpx

# ── Быстрые запросы (без клиента) ─────────────────────
r = httpx.get("https://example.com")           # sync
r = httpx.post("https://example.com", json={}) # sync

# ── Async клиент (основной паттерн) ───────────────────
async with httpx.AsyncClient(
    base_url="https://api.example.com",
    headers={"Authorization": "Bearer TOKEN"},
    timeout=httpx.Timeout(connect=5.0, read=30.0),
    follow_redirects=True,
) as client:
    r = await client.get("/endpoint", params={"q": "query"})
    r = await client.post("/endpoint", json={"key": "value"})
    r.raise_for_status()   # HTTPStatusError при 4xx/5xx
    data = r.json()        # dict
    text = r.text          # str

# ── Параллельные запросы ───────────────────────────────
async with httpx.AsyncClient() as client:
    results = await asyncio.gather(
        client.get("/a"),
        client.get("/b"),
        client.get("/c"),
    )

# ── Стриминг ──────────────────────────────────────────
async with httpx.AsyncClient() as client:
    async with client.stream("GET", url) as r:
        async for chunk in r.aiter_bytes():   # бинарный
            ...
        async for line in r.aiter_lines():    # SSE, NDJSON
            ...

# ── Ошибки ────────────────────────────────────────────
try:
    r = await client.get(url)
    r.raise_for_status()
except httpx.ConnectError:       # Нет соединения
    ...
except httpx.TimeoutException:   # ConnectTimeout | ReadTimeout
    ...
except httpx.HTTPStatusError as e:
    print(e.response.status_code)  # 4xx/5xx

# ── Таймауты ──────────────────────────────────────────
httpx.Timeout(5.0)                              # Все фазы = 5с
httpx.Timeout(connect=3.0, read=60.0)          # Раздельно

# ── Retry (через tenacity) ────────────────────────────
from tenacity import retry, stop_after_attempt, wait_random_exponential
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=20))
async def fetch(): ...

Практическое задание

Задание: HTTP-клиент для агента с поиском

  1. Напиши async функцию fetch_url(url: str) -> str, которая загружает HTML-страницу с таймаутом 10с и обработкой всех типов ошибок httpx
  2. Добавь retry через tenacity: 3 попытки, exponential backoff с джиттером, только при сетевых ошибках
  3. Напиши функцию fetch_all(urls: list[str]) -> list[str], которая загружает все страницы параллельно, но не более 3 одновременно (Semaphore)
  4. Создай класс SearchClient с методом search(query: str) -> list[dict], который обращается к любому публичному поисковому API. Используй AsyncClient как атрибут класса, инициализируй в __aenter__
  5. Напиши стриминговую функцию, которая читает SSE-ответ построчно и возвращает AsyncGenerator[str, None]

Что дальше