Модуль 01 Начинающий ⏱ 35 мин

Streaming responses

LLM генерирует текст токен за токеном. Streaming позволяет показывать ответ пользователю в режиме реального времени — без ожидания полной генерации. Для агентов это критично: многошаговые рассуждения могут занимать десятки секунд.

Что нужно знать: asyncio, httpx, структура messages

Зачем нужен streaming

Без streaming пользователь ждёт полного ответа перед тем, как увидеть хоть что-то. При длинных ответах (анализ, code generation, reasoning) это может быть 10–30 секунд «белого экрана».

Обычный запрос (blocking) TTFB = время полной генерации (~15с)
Пользователь видит ответ только здесь →
Streaming TTFB ≈ 300–600мс
Первый токен
токены идут постепенно

Streaming также критичен для агентов с reasoning: вы можете обрабатывать токены по мере прихода, реагировать на ключевые слова и даже прервать генерацию досрочно.

Как модель отдаёт текст — токен за токеном
Чтобы подключить streaming, нужно передать параметр stream=True и итерироваться по событиям.
ℹ️
Под капотом — Server-Sent Events (SSE)
HTTP-соединение остаётся открытым. Сервер отправляет строки вида data: {"delta": {"text": "..."}}↵↵. SDK скрывает этот протокол за async iterator.

SSE: как выглядит протокол

Понимать SSE полезно для отладки и для написания собственных клиентов. Каждое событие — это несколько строк, разделённых пустой строкой:

# Anthropic SSE event
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Чтобы"}}

# следующий фрагмент
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " подключить"}}

# конец стрима
event: message_stop
data: {"type": "message_stop"}

Типы событий Anthropic:

message_start content_block_start content_block_delta content_block_stop message_delta message_stop

Базовый streaming: Anthropic и OpenAI

Python — Anthropic streaming (рекомендуемый способ)
import anthropic

client = anthropic.AsyncAnthropic()

async def stream_response(prompt: str) -> str:
    """Стримим ответ и выводим токены по мере прихода."""
    full_text = ""

    # stream() — контекстный менеджер, возвращает AsyncMessageStream
    async with client.messages.stream(
        model="claude-opus-4-6",
        system="Ты — полезный ассистент.",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1024,
    ) as stream:
        # text_stream — async iterator только по текстовым дельтам
        async for text_chunk in stream.text_stream:
            print(text_chunk, end="", flush=True)
            full_text += text_chunk

    print()  # перевод строки после стрима
    return full_text

# Запуск
import asyncio
text = asyncio.run(stream_response("Объясни, как работает asyncio event loop"))
Python — низкоуровневый доступ к событиям Anthropic
import anthropic
from anthropic import AsyncMessageStreamManager

client = anthropic.AsyncAnthropic()

async def stream_with_events(prompt: str) -> dict:
    """Обрабатываем все типы событий вручную."""
    result = {"text": "", "input_tokens": 0, "output_tokens": 0}

    async with client.messages.stream(
        model="claude-opus-4-6",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=512,
    ) as stream:

        async for event in stream:
            event_type = event.type

            if event_type == "message_start":
                result["input_tokens"] = event.message.usage.input_tokens

            elif event_type == "content_block_delta":
                if hasattr(event.delta, "text"):
                    delta = event.delta.text
                    result["text"] += delta
                    print(delta, end="", flush=True)

            elif event_type == "message_delta":
                result["output_tokens"] = event.usage.output_tokens

            elif event_type == "message_stop":
                print()  # перевод строки

        # Итоговое сообщение (полностью собранное) — доступно после стрима
        final_message = await stream.get_final_message()
        result["stop_reason"] = final_message.stop_reason

    return result

stats = await stream_with_events("Что такое backpressure в async?")
print(f"\nTokens: {stats['input_tokens']} in / {stats['output_tokens']} out")
Python — OpenAI streaming
import openai

client = openai.AsyncOpenAI()

async def openai_stream(prompt: str) -> str:
    """OpenAI streaming через async iterator."""
    full_text = ""

    # stream=True включает SSE
    async with await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Ты — полезный ассистент."},
            {"role": "user",   "content": prompt},
        ],
        max_tokens=1024,
        stream=True,
    ) as stream:
        async for chunk in stream:
            # delta — фрагмент ответа, может быть None
            delta = chunk.choices[0].delta.content
            if delta is not None:
                print(delta, end="", flush=True)
                full_text += delta

        # OpenAI: финальная статистика через stream.get_final_completion()
        # (доступна в openai SDK >= 1.2)

    print()
    return full_text

# Универсальная обёртка для обоих провайдеров
async def stream_any(
    provider: str,
    system: str,
    user_msg: str,
    model: str,
    on_token=None,   # callback(str) — вызывается на каждый чанк
) -> str:
    """Стримит ответ от любого провайдера, вызывая on_token для каждого чанка."""
    text = ""

    if provider == "anthropic":
        client = anthropic.AsyncAnthropic()
        async with client.messages.stream(
            model=model, system=system,
            messages=[{"role": "user", "content": user_msg}],
            max_tokens=1024,
        ) as stream:
            async for chunk in stream.text_stream:
                text += chunk
                if on_token:
                    await on_token(chunk)

    elif provider == "openai":
        client = openai.AsyncOpenAI()
        async with await client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system},
                {"role": "user",   "content": user_msg},
            ],
            max_tokens=1024, stream=True,
        ) as stream:
            async for chunk in stream:
                delta = chunk.choices[0].delta.content
                if delta:
                    text += delta
                    if on_token:
                        await on_token(delta)

    return text

# Пример с callback
async def print_chunk(chunk: str):
    print(chunk, end="", flush=True)

result = await stream_any(
    provider="anthropic",
    system="Ты — Python-наставник.",
    user_msg="Объясни генераторы.",
    model="claude-opus-4-6",
    on_token=print_chunk,
)

Streaming в FastAPI: отдаём ответ клиенту

Агентный бэкенд обычно проксирует стрим от LLM к фронтенду. FastAPI поддерживает StreamingResponse для SSE.

Python — FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import json
import asyncio

app = FastAPI()
llm_client = anthropic.AsyncAnthropic()

class ChatRequest(BaseModel):
    message: str
    system: str = "Ты — полезный ассистент."

async def token_generator(request: ChatRequest):
    """Async generator: отдаёт SSE-события в формате text/event-stream."""
    try:
        async with llm_client.messages.stream(
            model="claude-opus-4-6",
            system=request.system,
            messages=[{"role": "user", "content": request.message}],
            max_tokens=2048,
        ) as stream:
            async for text_chunk in stream.text_stream:
                # SSE формат: data: \n\n
                payload = json.dumps({"type": "text", "delta": text_chunk})
                yield f"data: {payload}\n\n"

        # Сигнал об окончании
        yield f"data: {json.dumps({'type': 'done'})}\n\n"

    except Exception as e:
        error_payload = json.dumps({"type": "error", "message": str(e)})
        yield f"data: {error_payload}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        token_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",   # отключаем nginx-буферизацию
        },
    )

# ── Дополнительно: эндпоинт с WebSocket ──
from fastapi import WebSocket

@app.websocket("/chat/ws")
async def chat_websocket(websocket: WebSocket):
    await websocket.accept()

    while True:
        message = await websocket.receive_text()
        data = json.loads(message)

        async with llm_client.messages.stream(
            model="claude-opus-4-6",
            messages=[{"role": "user", "content": data["message"]}],
            max_tokens=1024,
        ) as stream:
            async for chunk in stream.text_stream:
                await websocket.send_json({"type": "chunk", "text": chunk})

        await websocket.send_json({"type": "done"})
💡
Отключи nginx-буферизацию!
По умолчанию nginx буферизует ответы и клиент увидит их пачкой, а не по одному. Добавь заголовок X-Accel-Buffering: no или в nginx config: proxy_buffering off;

Клиент на JavaScript (EventSource / fetch)

JavaScript — читаем SSE из браузера
// Способ 1: fetch + ReadableStream (для POST-запросов с body)
async function streamChat(message) {
  const response = await fetch('/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop(); // последняя неполная строка — обратно в буфер

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const data = JSON.parse(line.slice(6));

      if (data.type === 'text') {
        document.getElementById('output').textContent += data.delta;
      } else if (data.type === 'done') {
        console.log('Stream complete');
      } else if (data.type === 'error') {
        console.error('LLM error:', data.message);
      }
    }
  }
}

streamChat('Объясни Docker multi-stage builds');

Streaming + tool calls

Модель может использовать инструменты во время генерации. В streaming-режиме tool call приходит по частям — JSON параметров накапливается в дельтах.

Python — streaming с отслеживанием tool calls
import anthropic
import json
from dataclasses import dataclass, field

@dataclass
class StreamState:
    """Состояние текущего стрима."""
    text: str = ""
    tool_calls: list[dict] = field(default_factory=list)
    current_tool: dict | None = None
    input_tokens: int = 0
    output_tokens: int = 0

client = anthropic.AsyncAnthropic()

TOOLS = [{
    "name": "search_docs",
    "description": "Поиск по документации",
    "input_schema": {
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Поисковый запрос"},
            "limit": {"type": "integer", "default": 5},
        },
        "required": ["query"],
    }
}]

async def stream_with_tools(user_message: str) -> StreamState:
    state = StreamState()

    async with client.messages.stream(
        model="claude-opus-4-6",
        tools=TOOLS,
        messages=[{"role": "user", "content": user_message}],
        max_tokens=1024,
    ) as stream:

        async for event in stream:
            match event.type:

                case "message_start":
                    state.input_tokens = event.message.usage.input_tokens

                case "content_block_start":
                    block = event.content_block
                    if block.type == "tool_use":
                        # Начало нового tool call
                        state.current_tool = {
                            "id": block.id,
                            "name": block.name,
                            "input_raw": "",  # JSON будет накапливаться в дельтах
                        }
                        print(f"\n[Tool: {block.name}] ", end="", flush=True)

                case "content_block_delta":
                    delta = event.delta
                    if delta.type == "text_delta":
                        state.text += delta.text
                        print(delta.text, end="", flush=True)
                    elif delta.type == "input_json_delta":
                        # Дельты JSON параметров tool call
                        if state.current_tool:
                            state.current_tool["input_raw"] += delta.partial_json
                            print(".", end="", flush=True)

                case "content_block_stop":
                    if state.current_tool:
                        # Парсим накопленный JSON
                        try:
                            state.current_tool["input"] = json.loads(
                                state.current_tool.pop("input_raw")
                            )
                        except json.JSONDecodeError:
                            state.current_tool["input"] = {}
                        state.tool_calls.append(state.current_tool)
                        state.current_tool = None

                case "message_delta":
                    state.output_tokens = event.usage.output_tokens

    print()
    return state

state = await stream_with_tools("Найди документацию по asyncio.gather")
print(f"\nText: {state.text[:100]}...")
print(f"Tool calls: {state.tool_calls}")
print(f"Tokens: {state.input_tokens} / {state.output_tokens}")

Прерывание стрима и timeout

В агентах часто нужно отменить генерацию — пользователь нажал «Стоп», истёк timeout, или пришёл важный сигнал. Asyncio позволяет прервать стрим чисто.

Python — отмена через asyncio.CancelledError и timeout
import asyncio
import anthropic

client = anthropic.AsyncAnthropic()

async def stream_with_timeout(prompt: str, timeout_sec: float = 10.0) -> str:
    """Стримим с общим timeout на весь запрос."""
    text = ""
    try:
        async with asyncio.timeout(timeout_sec):
            async with client.messages.stream(
                model="claude-opus-4-6",
                messages=[{"role": "user", "content": prompt}],
                max_tokens=2048,
            ) as stream:
                async for chunk in stream.text_stream:
                    text += chunk
                    print(chunk, end="", flush=True)

    except asyncio.TimeoutError:
        print(f"\n[Timeout после {timeout_sec}с, собрано {len(text)} символов]")

    return text


async def stream_cancellable(prompt: str) -> tuple[str, asyncio.Task]:
    """
    Возвращает Task — можно отменить снаружи.
    """
    result_holder = {"text": ""}

    async def _stream():
        async with client.messages.stream(
            model="claude-opus-4-6",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=2048,
        ) as stream:
            try:
                async for chunk in stream.text_stream:
                    result_holder["text"] += chunk
                    print(chunk, end="", flush=True)
            except asyncio.CancelledError:
                print(f"\n[Cancelled. Получено {len(result_holder['text'])} символов]")
                raise  # важно! пробрасываем дальше

    task = asyncio.create_task(_stream())
    return result_holder, task


# Пример: отмена через 2 секунды
async def demo_cancel():
    result, task = await stream_cancellable(
        "Напиши подробное эссе о истории Python, начиная с 1991 года"
    )
    await asyncio.sleep(2.0)    # ждём 2 секунды
    task.cancel()               # отменяем
    try:
        await task
    except asyncio.CancelledError:
        pass
    print(f"\nИтого собрано: {len(result['text'])} символов")

asyncio.run(demo_cancel())
⚠️
Всегда пробрасывай CancelledError!
Если поймал CancelledError — выполни cleanup и сделай raise. Проглоченный CancelledError нарушает работу event loop и других задач.

Буферизация и сборка полного текста

Чанки не гарантируют разбивку по словам или предложениям. Для некоторых задач нужно дождаться завершения паттерна.

Python — умная буферизация стрима
import anthropic
import re
from collections.abc import AsyncIterator

client = anthropic.AsyncAnthropic()

async def raw_chunks(prompt: str) -> AsyncIterator[str]:
    """Низкоуровневый генератор чанков."""
    async with client.messages.stream(
        model="claude-opus-4-6",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1024,
    ) as stream:
        async for chunk in stream.text_stream:
            yield chunk


async def sentence_stream(prompt: str) -> AsyncIterator[str]:
    """
    Буферизуем чанки и отдаём целые предложения.
    Полезно для TTS (text-to-speech) — синтезатор работает лучше с предложениями.
    """
    buffer = ""
    sentence_end = re.compile(r'(?<=[.!?])\s')

    async for chunk in raw_chunks(prompt):
        buffer += chunk
        # Ищем конец предложения
        parts = sentence_end.split(buffer, maxsplit=1)
        while len(parts) == 2:
            yield parts[0].strip() + " "
            buffer = parts[1]
            parts = sentence_end.split(buffer, maxsplit=1)

    # Отдаём остаток
    if buffer.strip():
        yield buffer.strip()


async def line_stream(prompt: str) -> AsyncIterator[str]:
    """Буферизуем по строкам (для markdown rendering)."""
    buffer = ""
    async for chunk in raw_chunks(prompt):
        buffer += chunk
        while "\n" in buffer:
            line, buffer = buffer.split("\n", 1)
            yield line + "\n"
    if buffer:
        yield buffer


# ── Демо: streaming + накопление для финального парсинга ──
async def stream_and_collect(prompt: str) -> str:
    """
    Показываем пользователю стрим, но в конце получаем полный текст.
    """
    async with client.messages.stream(
        model="claude-opus-4-6",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1024,
    ) as stream:
        async for chunk in stream.text_stream:
            print(chunk, end="", flush=True)

        # get_final_text() ждёт завершения и возвращает полный текст
        full_text = await stream.get_final_text()

    print()
    return full_text

text = await stream_and_collect("Напиши JSON с тремя рецептами")
import json
# Теперь можем безопасно парсить
data = json.loads(text)

Streaming в агентном цикле

В реальном агенте streaming используется не только для UX, но и для промежуточного вывода reasoning-шагов. Вот паттерн для агентного цикла с streaming:

Python — агентный цикл со streaming и инструментами
import anthropic
import json
import asyncio
from typing import Callable, Awaitable

client = anthropic.AsyncAnthropic()

# Имитация инструментов агента
async def tool_search(query: str, limit: int = 5) -> str:
    await asyncio.sleep(0.1)  # имитация задержки
    return f"Результаты поиска по '{query}': [doc1, doc2, doc3]"

async def tool_calculator(expression: str) -> str:
    try:
        result = eval(expression, {"__builtins__": {}})
        return str(result)
    except Exception as e:
        return f"Ошибка: {e}"

TOOLS_MAP = {
    "search": tool_search,
    "calculator": tool_calculator,
}

TOOLS_SCHEMA = [
    {
        "name": "search",
        "description": "Поиск информации",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "limit": {"type": "integer", "default": 5},
            },
            "required": ["query"],
        }
    },
    {
        "name": "calculator",
        "description": "Вычисление математических выражений",
        "input_schema": {
            "type": "object",
            "properties": {"expression": {"type": "string"}},
            "required": ["expression"],
        }
    },
]

async def run_streaming_agent(
    user_message: str,
    on_text: Callable[[str], Awaitable[None]] | None = None,
    on_tool_start: Callable[[str, dict], Awaitable[None]] | None = None,
    on_tool_end: Callable[[str, str], Awaitable[None]] | None = None,
    max_iterations: int = 5,
) -> str:
    """
    Агентный цикл с streaming.
    Коллбэки позволяют интегрировать в UI (FastAPI SSE, WebSocket).
    """
    messages = [{"role": "user", "content": user_message}]
    full_response = ""

    for iteration in range(max_iterations):
        tool_calls_in_turn: list[dict] = []
        turn_text = ""

        async with client.messages.stream(
            model="claude-opus-4-6",
            system="Используй инструменты для ответа. Думай вслух.",
            tools=TOOLS_SCHEMA,
            messages=messages,
            max_tokens=1024,
        ) as stream:

            async for event in stream:
                if event.type == "content_block_delta":
                    delta = event.delta
                    if delta.type == "text_delta":
                        turn_text += delta.text
                        full_response += delta.text
                        if on_text:
                            await on_text(delta.text)

                    elif delta.type == "input_json_delta":
                        # Дельты tool call JSON
                        if tool_calls_in_turn:
                            tool_calls_in_turn[-1]["input_raw"] += delta.partial_json

                elif event.type == "content_block_start":
                    if event.content_block.type == "tool_use":
                        tool_calls_in_turn.append({
                            "id": event.content_block.id,
                            "name": event.content_block.name,
                            "input_raw": "",
                        })

                elif event.type == "content_block_stop":
                    if tool_calls_in_turn and tool_calls_in_turn[-1].get("input_raw") is not None:
                        last = tool_calls_in_turn[-1]
                        if "input" not in last:
                            try:
                                last["input"] = json.loads(last.pop("input_raw", "{}"))
                            except json.JSONDecodeError:
                                last["input"] = {}

            final_message = await stream.get_final_message()

        # Добавляем ответ ассистента в историю
        messages.append({"role": "assistant", "content": final_message.content})

        # Если нет tool calls — агент завершил работу
        if final_message.stop_reason == "end_turn" or not tool_calls_in_turn:
            break

        # Выполняем tool calls параллельно
        tool_results = []
        for tc in tool_calls_in_turn:
            tool_fn = TOOLS_MAP.get(tc["name"])
            if on_tool_start:
                await on_tool_start(tc["name"], tc["input"])

            if tool_fn:
                result = await tool_fn(**tc["input"])
            else:
                result = f"Инструмент '{tc['name']}' не найден"

            if on_tool_end:
                await on_tool_end(tc["name"], result)

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tc["id"],
                "content": result,
            })

        # Добавляем результаты в историю
        messages.append({"role": "user", "content": tool_results})

    return full_response


# ── Запуск с выводом в консоль ──
async def demo():
    async def on_text(chunk: str):
        print(chunk, end="", flush=True)

    async def on_tool_start(name: str, inp: dict):
        print(f"\n[→ Tool: {name}({inp})]", flush=True)

    async def on_tool_end(name: str, result: str):
        print(f"\n[← {name}: {result[:80]}...]", flush=True)

    result = await run_streaming_agent(
        "Найди информацию о LangGraph и посчитай 2^10",
        on_text=on_text,
        on_tool_start=on_tool_start,
        on_tool_end=on_tool_end,
    )
    print(f"\n\nИтоговый ответ ({len(result)} символов)")

asyncio.run(demo())

Обработка ошибок в streaming

Python — retry и обработка ошибок стрима
import anthropic
import asyncio
import logging
from anthropic import (
    APIConnectionError,
    APIStatusError,
    APITimeoutError,
    RateLimitError,
    InternalServerError,
)

logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()

async def resilient_stream(
    prompt: str,
    max_retries: int = 3,
    base_delay: float = 1.0,
) -> str:
    """Streaming с retry для transient errors."""
    last_error = None

    for attempt in range(max_retries):
        text = ""
        try:
            async with client.messages.stream(
                model="claude-opus-4-6",
                messages=[{"role": "user", "content": prompt}],
                max_tokens=1024,
            ) as stream:
                async for chunk in stream.text_stream:
                    text += chunk
                    print(chunk, end="", flush=True)

            print()
            return text  # успешно

        except RateLimitError as e:
            # 429 — нужно подождать дольше
            delay = base_delay * (2 ** attempt) + 5.0
            logger.warning(f"Rate limited (attempt {attempt+1}). Waiting {delay}s")
            last_error = e
            await asyncio.sleep(delay)

        except (APIConnectionError, APITimeoutError) as e:
            # Сетевые ошибки — retry быстрее
            delay = base_delay * (2 ** attempt)
            logger.warning(f"Connection error (attempt {attempt+1}): {e}. Waiting {delay}s")
            last_error = e
            await asyncio.sleep(delay)

        except InternalServerError as e:
            # 500/529 — сервер перегружен
            delay = base_delay * (2 ** attempt) + 2.0
            logger.warning(f"Server error {e.status_code} (attempt {attempt+1}). Waiting {delay}s")
            last_error = e
            await asyncio.sleep(delay)

        except APIStatusError as e:
            # 4xx (кроме 429) — не ретраим
            if e.status_code in (400, 401, 403):
                logger.error(f"Non-retryable error {e.status_code}: {e.message}")
                raise

            # 5xx — ретраим
            delay = base_delay * (2 ** attempt)
            last_error = e
            await asyncio.sleep(delay)

    raise RuntimeError(f"Failed after {max_retries} attempts") from last_error
Тип ошибки Код Стратегия
RateLimitError 429 Exponential backoff + jitter (мин. 5с)
APIConnectionError Retry с коротким backoff
APITimeoutError Retry, увеличить timeout
InternalServerError 500/529 Retry с backoff
APIStatusError 400/401/403 400–403 Не ретраить — исправить запрос

Проверь себя

Вопросы для самопроверки

  1. Какой протокол лежит под капотом LLM streaming?
  2. Что такое TTFB и почему streaming его улучшает?
  3. Чем stream.text_stream отличается от прямой итерации по событиям?
  4. Как отменить streaming-запрос в asyncio?
  5. Почему нельзя парсить JSON из tool call прямо в дельтах?
  6. Какой HTTP-заголовок нужен, чтобы nginx не буферизовал SSE?
Показать ответы
  1. Server-Sent Events (SSE) — HTTP-соединение остаётся открытым, сервер отправляет строки data: ... разделённые пустой строкой.
  2. TTFB (Time To First Byte) — время до получения первого байта ответа. Без streaming = время полной генерации. Со streaming = ~300–600мс (первый токен).
  3. text_stream — отфильтрованный итератор только текстовых дельт. Прямая итерация — все события (message_start, content_block_*, message_stop и т.д.).
  4. Вызвать task.cancel() на asyncio Task. В обработчике поймать CancelledError, сделать cleanup и пробросить raise.
  5. JSON параметров приходит по частям (partial_json). Парсить нужно только после content_block_stop, когда накоплен полный JSON.
  6. X-Accel-Buffering: no в ответе. Или в nginx config: proxy_buffering off;

Итог урока

  • Streaming = SSE — HTTP-соединение открыто, сервер шлёт дельты. SDK скрывает протокол за async iterator
  • TTFB со streaming ~300–600мс вместо 10–30с при blocking
  • Anthropic: client.messages.stream()stream.text_stream. OpenAI: create(..., stream=True)chunk.choices[0].delta.content
  • Tool call JSON в стриме — накапливать из input_json_delta, парсить после content_block_stop
  • FastAPI: StreamingResponse + media_type="text/event-stream" + X-Accel-Buffering: no
  • Отмена: asyncio.timeout() или task.cancel(). Всегда пробрасывай CancelledError
  • Ретраить: RateLimitError (backoff ≥5с), ConnectionError, TimeoutError, 5xx. Не ретраить: 400/401/403