Streaming responses
LLM генерирует текст токен за токеном. Streaming позволяет показывать ответ пользователю в режиме реального времени — без ожидания полной генерации. Для агентов это критично: многошаговые рассуждения могут занимать десятки секунд.
Зачем нужен streaming
Без streaming пользователь ждёт полного ответа перед тем, как увидеть хоть что-то. При длинных ответах (анализ, code generation, reasoning) это может быть 10–30 секунд «белого экрана».
Streaming также критичен для агентов с reasoning: вы можете обрабатывать токены по мере прихода, реагировать на ключевые слова и даже прервать генерацию досрочно.
HTTP-соединение остаётся открытым. Сервер отправляет строки вида
data: {"delta": {"text": "..."}}↵↵.
SDK скрывает этот протокол за async iterator.
SSE: как выглядит протокол
Понимать SSE полезно для отладки и для написания собственных клиентов. Каждое событие — это несколько строк, разделённых пустой строкой:
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Чтобы"}}
# следующий фрагмент
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": " подключить"}}
# конец стрима
event: message_stop
data: {"type": "message_stop"}
Типы событий Anthropic:
Базовый streaming: Anthropic и OpenAI
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(prompt: str) -> str:
"""Стримим ответ и выводим токены по мере прихода."""
full_text = ""
# stream() — контекстный менеджер, возвращает AsyncMessageStream
async with client.messages.stream(
model="claude-opus-4-6",
system="Ты — полезный ассистент.",
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
) as stream:
# text_stream — async iterator только по текстовым дельтам
async for text_chunk in stream.text_stream:
print(text_chunk, end="", flush=True)
full_text += text_chunk
print() # перевод строки после стрима
return full_text
# Запуск
import asyncio
text = asyncio.run(stream_response("Объясни, как работает asyncio event loop"))
import anthropic
from anthropic import AsyncMessageStreamManager
client = anthropic.AsyncAnthropic()
async def stream_with_events(prompt: str) -> dict:
"""Обрабатываем все типы событий вручную."""
result = {"text": "", "input_tokens": 0, "output_tokens": 0}
async with client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": prompt}],
max_tokens=512,
) as stream:
async for event in stream:
event_type = event.type
if event_type == "message_start":
result["input_tokens"] = event.message.usage.input_tokens
elif event_type == "content_block_delta":
if hasattr(event.delta, "text"):
delta = event.delta.text
result["text"] += delta
print(delta, end="", flush=True)
elif event_type == "message_delta":
result["output_tokens"] = event.usage.output_tokens
elif event_type == "message_stop":
print() # перевод строки
# Итоговое сообщение (полностью собранное) — доступно после стрима
final_message = await stream.get_final_message()
result["stop_reason"] = final_message.stop_reason
return result
stats = await stream_with_events("Что такое backpressure в async?")
print(f"\nTokens: {stats['input_tokens']} in / {stats['output_tokens']} out")
import openai
client = openai.AsyncOpenAI()
async def openai_stream(prompt: str) -> str:
"""OpenAI streaming через async iterator."""
full_text = ""
# stream=True включает SSE
async with await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Ты — полезный ассистент."},
{"role": "user", "content": prompt},
],
max_tokens=1024,
stream=True,
) as stream:
async for chunk in stream:
# delta — фрагмент ответа, может быть None
delta = chunk.choices[0].delta.content
if delta is not None:
print(delta, end="", flush=True)
full_text += delta
# OpenAI: финальная статистика через stream.get_final_completion()
# (доступна в openai SDK >= 1.2)
print()
return full_text
# Универсальная обёртка для обоих провайдеров
async def stream_any(
provider: str,
system: str,
user_msg: str,
model: str,
on_token=None, # callback(str) — вызывается на каждый чанк
) -> str:
"""Стримит ответ от любого провайдера, вызывая on_token для каждого чанка."""
text = ""
if provider == "anthropic":
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model=model, system=system,
messages=[{"role": "user", "content": user_msg}],
max_tokens=1024,
) as stream:
async for chunk in stream.text_stream:
text += chunk
if on_token:
await on_token(chunk)
elif provider == "openai":
client = openai.AsyncOpenAI()
async with await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
],
max_tokens=1024, stream=True,
) as stream:
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
text += delta
if on_token:
await on_token(delta)
return text
# Пример с callback
async def print_chunk(chunk: str):
print(chunk, end="", flush=True)
result = await stream_any(
provider="anthropic",
system="Ты — Python-наставник.",
user_msg="Объясни генераторы.",
model="claude-opus-4-6",
on_token=print_chunk,
)
Streaming в FastAPI: отдаём ответ клиенту
Агентный бэкенд обычно проксирует стрим от LLM к фронтенду.
FastAPI поддерживает StreamingResponse для SSE.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import json
import asyncio
app = FastAPI()
llm_client = anthropic.AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
system: str = "Ты — полезный ассистент."
async def token_generator(request: ChatRequest):
"""Async generator: отдаёт SSE-события в формате text/event-stream."""
try:
async with llm_client.messages.stream(
model="claude-opus-4-6",
system=request.system,
messages=[{"role": "user", "content": request.message}],
max_tokens=2048,
) as stream:
async for text_chunk in stream.text_stream:
# SSE формат: data: \n\n
payload = json.dumps({"type": "text", "delta": text_chunk})
yield f"data: {payload}\n\n"
# Сигнал об окончании
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
error_payload = json.dumps({"type": "error", "message": str(e)})
yield f"data: {error_payload}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
token_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # отключаем nginx-буферизацию
},
)
# ── Дополнительно: эндпоинт с WebSocket ──
from fastapi import WebSocket
@app.websocket("/chat/ws")
async def chat_websocket(websocket: WebSocket):
await websocket.accept()
while True:
message = await websocket.receive_text()
data = json.loads(message)
async with llm_client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": data["message"]}],
max_tokens=1024,
) as stream:
async for chunk in stream.text_stream:
await websocket.send_json({"type": "chunk", "text": chunk})
await websocket.send_json({"type": "done"})
По умолчанию nginx буферизует ответы и клиент увидит их пачкой, а не по одному. Добавь заголовок
X-Accel-Buffering: no или в nginx config:
proxy_buffering off;
Клиент на JavaScript (EventSource / fetch)
// Способ 1: fetch + ReadableStream (для POST-запросов с body)
async function streamChat(message) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // последняя неполная строка — обратно в буфер
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = JSON.parse(line.slice(6));
if (data.type === 'text') {
document.getElementById('output').textContent += data.delta;
} else if (data.type === 'done') {
console.log('Stream complete');
} else if (data.type === 'error') {
console.error('LLM error:', data.message);
}
}
}
}
streamChat('Объясни Docker multi-stage builds');
Streaming + tool calls
Модель может использовать инструменты во время генерации. В streaming-режиме tool call приходит по частям — JSON параметров накапливается в дельтах.
import anthropic
import json
from dataclasses import dataclass, field
@dataclass
class StreamState:
"""Состояние текущего стрима."""
text: str = ""
tool_calls: list[dict] = field(default_factory=list)
current_tool: dict | None = None
input_tokens: int = 0
output_tokens: int = 0
client = anthropic.AsyncAnthropic()
TOOLS = [{
"name": "search_docs",
"description": "Поиск по документации",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Поисковый запрос"},
"limit": {"type": "integer", "default": 5},
},
"required": ["query"],
}
}]
async def stream_with_tools(user_message: str) -> StreamState:
state = StreamState()
async with client.messages.stream(
model="claude-opus-4-6",
tools=TOOLS,
messages=[{"role": "user", "content": user_message}],
max_tokens=1024,
) as stream:
async for event in stream:
match event.type:
case "message_start":
state.input_tokens = event.message.usage.input_tokens
case "content_block_start":
block = event.content_block
if block.type == "tool_use":
# Начало нового tool call
state.current_tool = {
"id": block.id,
"name": block.name,
"input_raw": "", # JSON будет накапливаться в дельтах
}
print(f"\n[Tool: {block.name}] ", end="", flush=True)
case "content_block_delta":
delta = event.delta
if delta.type == "text_delta":
state.text += delta.text
print(delta.text, end="", flush=True)
elif delta.type == "input_json_delta":
# Дельты JSON параметров tool call
if state.current_tool:
state.current_tool["input_raw"] += delta.partial_json
print(".", end="", flush=True)
case "content_block_stop":
if state.current_tool:
# Парсим накопленный JSON
try:
state.current_tool["input"] = json.loads(
state.current_tool.pop("input_raw")
)
except json.JSONDecodeError:
state.current_tool["input"] = {}
state.tool_calls.append(state.current_tool)
state.current_tool = None
case "message_delta":
state.output_tokens = event.usage.output_tokens
print()
return state
state = await stream_with_tools("Найди документацию по asyncio.gather")
print(f"\nText: {state.text[:100]}...")
print(f"Tool calls: {state.tool_calls}")
print(f"Tokens: {state.input_tokens} / {state.output_tokens}")
Прерывание стрима и timeout
В агентах часто нужно отменить генерацию — пользователь нажал «Стоп», истёк timeout, или пришёл важный сигнал. Asyncio позволяет прервать стрим чисто.
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_with_timeout(prompt: str, timeout_sec: float = 10.0) -> str:
"""Стримим с общим timeout на весь запрос."""
text = ""
try:
async with asyncio.timeout(timeout_sec):
async with client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": prompt}],
max_tokens=2048,
) as stream:
async for chunk in stream.text_stream:
text += chunk
print(chunk, end="", flush=True)
except asyncio.TimeoutError:
print(f"\n[Timeout после {timeout_sec}с, собрано {len(text)} символов]")
return text
async def stream_cancellable(prompt: str) -> tuple[str, asyncio.Task]:
"""
Возвращает Task — можно отменить снаружи.
"""
result_holder = {"text": ""}
async def _stream():
async with client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": prompt}],
max_tokens=2048,
) as stream:
try:
async for chunk in stream.text_stream:
result_holder["text"] += chunk
print(chunk, end="", flush=True)
except asyncio.CancelledError:
print(f"\n[Cancelled. Получено {len(result_holder['text'])} символов]")
raise # важно! пробрасываем дальше
task = asyncio.create_task(_stream())
return result_holder, task
# Пример: отмена через 2 секунды
async def demo_cancel():
result, task = await stream_cancellable(
"Напиши подробное эссе о истории Python, начиная с 1991 года"
)
await asyncio.sleep(2.0) # ждём 2 секунды
task.cancel() # отменяем
try:
await task
except asyncio.CancelledError:
pass
print(f"\nИтого собрано: {len(result['text'])} символов")
asyncio.run(demo_cancel())
Если поймал
CancelledError — выполни cleanup и сделай raise.
Проглоченный CancelledError нарушает работу event loop и других задач.
Буферизация и сборка полного текста
Чанки не гарантируют разбивку по словам или предложениям. Для некоторых задач нужно дождаться завершения паттерна.
import anthropic
import re
from collections.abc import AsyncIterator
client = anthropic.AsyncAnthropic()
async def raw_chunks(prompt: str) -> AsyncIterator[str]:
"""Низкоуровневый генератор чанков."""
async with client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
) as stream:
async for chunk in stream.text_stream:
yield chunk
async def sentence_stream(prompt: str) -> AsyncIterator[str]:
"""
Буферизуем чанки и отдаём целые предложения.
Полезно для TTS (text-to-speech) — синтезатор работает лучше с предложениями.
"""
buffer = ""
sentence_end = re.compile(r'(?<=[.!?])\s')
async for chunk in raw_chunks(prompt):
buffer += chunk
# Ищем конец предложения
parts = sentence_end.split(buffer, maxsplit=1)
while len(parts) == 2:
yield parts[0].strip() + " "
buffer = parts[1]
parts = sentence_end.split(buffer, maxsplit=1)
# Отдаём остаток
if buffer.strip():
yield buffer.strip()
async def line_stream(prompt: str) -> AsyncIterator[str]:
"""Буферизуем по строкам (для markdown rendering)."""
buffer = ""
async for chunk in raw_chunks(prompt):
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
yield line + "\n"
if buffer:
yield buffer
# ── Демо: streaming + накопление для финального парсинга ──
async def stream_and_collect(prompt: str) -> str:
"""
Показываем пользователю стрим, но в конце получаем полный текст.
"""
async with client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
) as stream:
async for chunk in stream.text_stream:
print(chunk, end="", flush=True)
# get_final_text() ждёт завершения и возвращает полный текст
full_text = await stream.get_final_text()
print()
return full_text
text = await stream_and_collect("Напиши JSON с тремя рецептами")
import json
# Теперь можем безопасно парсить
data = json.loads(text)
Streaming в агентном цикле
В реальном агенте streaming используется не только для UX, но и для промежуточного вывода reasoning-шагов. Вот паттерн для агентного цикла с streaming:
import anthropic
import json
import asyncio
from typing import Callable, Awaitable
client = anthropic.AsyncAnthropic()
# Имитация инструментов агента
async def tool_search(query: str, limit: int = 5) -> str:
await asyncio.sleep(0.1) # имитация задержки
return f"Результаты поиска по '{query}': [doc1, doc2, doc3]"
async def tool_calculator(expression: str) -> str:
try:
result = eval(expression, {"__builtins__": {}})
return str(result)
except Exception as e:
return f"Ошибка: {e}"
TOOLS_MAP = {
"search": tool_search,
"calculator": tool_calculator,
}
TOOLS_SCHEMA = [
{
"name": "search",
"description": "Поиск информации",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 5},
},
"required": ["query"],
}
},
{
"name": "calculator",
"description": "Вычисление математических выражений",
"input_schema": {
"type": "object",
"properties": {"expression": {"type": "string"}},
"required": ["expression"],
}
},
]
async def run_streaming_agent(
user_message: str,
on_text: Callable[[str], Awaitable[None]] | None = None,
on_tool_start: Callable[[str, dict], Awaitable[None]] | None = None,
on_tool_end: Callable[[str, str], Awaitable[None]] | None = None,
max_iterations: int = 5,
) -> str:
"""
Агентный цикл с streaming.
Коллбэки позволяют интегрировать в UI (FastAPI SSE, WebSocket).
"""
messages = [{"role": "user", "content": user_message}]
full_response = ""
for iteration in range(max_iterations):
tool_calls_in_turn: list[dict] = []
turn_text = ""
async with client.messages.stream(
model="claude-opus-4-6",
system="Используй инструменты для ответа. Думай вслух.",
tools=TOOLS_SCHEMA,
messages=messages,
max_tokens=1024,
) as stream:
async for event in stream:
if event.type == "content_block_delta":
delta = event.delta
if delta.type == "text_delta":
turn_text += delta.text
full_response += delta.text
if on_text:
await on_text(delta.text)
elif delta.type == "input_json_delta":
# Дельты tool call JSON
if tool_calls_in_turn:
tool_calls_in_turn[-1]["input_raw"] += delta.partial_json
elif event.type == "content_block_start":
if event.content_block.type == "tool_use":
tool_calls_in_turn.append({
"id": event.content_block.id,
"name": event.content_block.name,
"input_raw": "",
})
elif event.type == "content_block_stop":
if tool_calls_in_turn and tool_calls_in_turn[-1].get("input_raw") is not None:
last = tool_calls_in_turn[-1]
if "input" not in last:
try:
last["input"] = json.loads(last.pop("input_raw", "{}"))
except json.JSONDecodeError:
last["input"] = {}
final_message = await stream.get_final_message()
# Добавляем ответ ассистента в историю
messages.append({"role": "assistant", "content": final_message.content})
# Если нет tool calls — агент завершил работу
if final_message.stop_reason == "end_turn" or not tool_calls_in_turn:
break
# Выполняем tool calls параллельно
tool_results = []
for tc in tool_calls_in_turn:
tool_fn = TOOLS_MAP.get(tc["name"])
if on_tool_start:
await on_tool_start(tc["name"], tc["input"])
if tool_fn:
result = await tool_fn(**tc["input"])
else:
result = f"Инструмент '{tc['name']}' не найден"
if on_tool_end:
await on_tool_end(tc["name"], result)
tool_results.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": result,
})
# Добавляем результаты в историю
messages.append({"role": "user", "content": tool_results})
return full_response
# ── Запуск с выводом в консоль ──
async def demo():
async def on_text(chunk: str):
print(chunk, end="", flush=True)
async def on_tool_start(name: str, inp: dict):
print(f"\n[→ Tool: {name}({inp})]", flush=True)
async def on_tool_end(name: str, result: str):
print(f"\n[← {name}: {result[:80]}...]", flush=True)
result = await run_streaming_agent(
"Найди информацию о LangGraph и посчитай 2^10",
on_text=on_text,
on_tool_start=on_tool_start,
on_tool_end=on_tool_end,
)
print(f"\n\nИтоговый ответ ({len(result)} символов)")
asyncio.run(demo())
Обработка ошибок в streaming
import anthropic
import asyncio
import logging
from anthropic import (
APIConnectionError,
APIStatusError,
APITimeoutError,
RateLimitError,
InternalServerError,
)
logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()
async def resilient_stream(
prompt: str,
max_retries: int = 3,
base_delay: float = 1.0,
) -> str:
"""Streaming с retry для transient errors."""
last_error = None
for attempt in range(max_retries):
text = ""
try:
async with client.messages.stream(
model="claude-opus-4-6",
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
) as stream:
async for chunk in stream.text_stream:
text += chunk
print(chunk, end="", flush=True)
print()
return text # успешно
except RateLimitError as e:
# 429 — нужно подождать дольше
delay = base_delay * (2 ** attempt) + 5.0
logger.warning(f"Rate limited (attempt {attempt+1}). Waiting {delay}s")
last_error = e
await asyncio.sleep(delay)
except (APIConnectionError, APITimeoutError) as e:
# Сетевые ошибки — retry быстрее
delay = base_delay * (2 ** attempt)
logger.warning(f"Connection error (attempt {attempt+1}): {e}. Waiting {delay}s")
last_error = e
await asyncio.sleep(delay)
except InternalServerError as e:
# 500/529 — сервер перегружен
delay = base_delay * (2 ** attempt) + 2.0
logger.warning(f"Server error {e.status_code} (attempt {attempt+1}). Waiting {delay}s")
last_error = e
await asyncio.sleep(delay)
except APIStatusError as e:
# 4xx (кроме 429) — не ретраим
if e.status_code in (400, 401, 403):
logger.error(f"Non-retryable error {e.status_code}: {e.message}")
raise
# 5xx — ретраим
delay = base_delay * (2 ** attempt)
last_error = e
await asyncio.sleep(delay)
raise RuntimeError(f"Failed after {max_retries} attempts") from last_error
| Тип ошибки | Код | Стратегия |
|---|---|---|
RateLimitError |
429 | Exponential backoff + jitter (мин. 5с) |
APIConnectionError |
— | Retry с коротким backoff |
APITimeoutError |
— | Retry, увеличить timeout |
InternalServerError |
500/529 | Retry с backoff |
APIStatusError 400/401/403 |
400–403 | Не ретраить — исправить запрос |
Проверь себя
Вопросы для самопроверки
- Какой протокол лежит под капотом LLM streaming?
- Что такое TTFB и почему streaming его улучшает?
- Чем
stream.text_streamотличается от прямой итерации по событиям? - Как отменить streaming-запрос в asyncio?
- Почему нельзя парсить JSON из tool call прямо в дельтах?
- Какой HTTP-заголовок нужен, чтобы nginx не буферизовал SSE?
Показать ответы
- Server-Sent Events (SSE) — HTTP-соединение остаётся открытым, сервер отправляет строки
data: ...разделённые пустой строкой. - TTFB (Time To First Byte) — время до получения первого байта ответа. Без streaming = время полной генерации. Со streaming = ~300–600мс (первый токен).
text_stream— отфильтрованный итератор только текстовых дельт. Прямая итерация — все события (message_start, content_block_*, message_stop и т.д.).- Вызвать
task.cancel()на asyncio Task. В обработчике пойматьCancelledError, сделать cleanup и проброситьraise. - JSON параметров приходит по частям (partial_json). Парсить нужно только после
content_block_stop, когда накоплен полный JSON. X-Accel-Buffering: noв ответе. Или в nginx config:proxy_buffering off;
Итог урока
- Streaming = SSE — HTTP-соединение открыто, сервер шлёт дельты. SDK скрывает протокол за async iterator
- TTFB со streaming ~300–600мс вместо 10–30с при blocking
- Anthropic:
client.messages.stream()→stream.text_stream. OpenAI:create(..., stream=True)→chunk.choices[0].delta.content - Tool call JSON в стриме — накапливать из
input_json_delta, парсить послеcontent_block_stop - FastAPI:
StreamingResponse+media_type="text/event-stream"+X-Accel-Buffering: no - Отмена:
asyncio.timeout()илиtask.cancel(). Всегда пробрасывайCancelledError - Ретраить: RateLimitError (backoff ≥5с), ConnectionError, TimeoutError, 5xx. Не ретраить: 400/401/403