Решения

Стриминг токенов для LLM: SSE/WebSocket и тайм‑ауты

Задача страницы. Показать, как организовать потоковую отдачу токенов (SSE/WebSocket) для LLM‑сервисов: выбрать протокол, задать лимиты/тайм‑ауты, спроектировать формат чанков, учесть прокси/ingress, метрики SLA и стоимость.

TL;DR

  • Для веб‑клиентов стартуйте с SSE: просто, поддерживается прокси, легко дебажить curl.
  • Для двусторонней интерактивности и «живых» UI — WebSocket (пинги/поны, явная отмена).
  • Ключевые метрики: TTFT (время до первого токена), TBT (средняя задержка между токенами), TTLT (время до последнего токена), TPS_decode.
  • Настройте тайм‑ауты: prefill, межтокенный idle, общий лимит запроса. Обязательно — стриминг + лимиты длины.

Когда и зачем включать стриминг

  • Чаты/ассистенты: пользователь видит ответ сразу (низкое perceived latency).
  • Длинные генерации: отдаём частями, не держим большой буфер в памяти.
  • Ограниченный SLA: отдельные лимиты на TTFT/p95 и общий тайм‑аут.
  • А/В и canary: безопаснее сравнивать UX с потоковой отдачей.

Связанные разделы: /solutions/llm-inference/, /solutions/llm-inference/vllm/, /solutions/llm-inference/tgi/, /solutions/llm-inference/tensorrt-llm/, /solutions/llm-inference/sglang/, /solutions/llm-inference/llama-cpp/.

Протоколы: SSE vs WebSocket (и что выбрать)

SSE (Server‑Sent Events)

  • Однонаправленный поток «сервер → клиент», поверх HTTP/1.1 или HTTP/2.
  • Простой парсинг (data: …\n\n), автоматический ре‑коннект EventSource.
  • Хорошо проходит через корпоративные прокси и балансировщики.
  • Идеален для «выдачи токенов» без двустороннего взаимодействия.

WebSocket

  • Двунаправочное общение, «пинги/поны», явная отмена и «паузa».
  • Требует корректной поддержки Upgrade по всей цепочке прокси/ingress.
  • Подходит для насыщенных UI, коллабораций, инструментов (tool‑use).

Рекомендация: начинать с SSE; если требуется двусторонний контроль/интерактив — добавьте WebSocket как второй эндпоинт.

Базовый сервер SSE (FastAPI, поток токенов)

				
					# server_sse.py
import asyncio, json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def stream_tokens(prompt: str, max_tokens: int = 256):
    # Имитация генерации токенов из модели
    # В проде здесь вызывается ваш генератор (vLLM/TGI/SGLang/…)
    try:
        yield "event: start\ndata: {}\n\n"
        for i in range(max_tokens):
            tok = f"tok{i}"
            payload = {"choices": [{"delta": {"content": tok}}]}
            yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
            await asyncio.sleep(0.01)  # эмуляция задержки decode
        yield "event: done\ndata: [DONE]\n\n"
    except asyncio.CancelledError:
        # клиент закрыл соединение — аккуратно завершаем генерацию
        yield "event: aborted\ndata: {}\n\n"
        raise

@app.post("/v1/chat/completions_stream")
async def completions_stream(req: Request):
    body = await req.json()
    messages = body.get("messages", [])
    prompt = messages[-1]["content"] if messages else ""
    max_tokens = int(body.get("max_tokens", 256))
    headers = {
        "Cache-Control": "no-cache",
        "X-Accel-Buffering": "no"  # для некоторых прокси
    }
    return StreamingResponse(
        stream_tokens(prompt, max_tokens),
        media_type="text/event-stream",
        headers=headers
    )

				
			

Формат чанков

  • Стандартная строка: data: {json}\n\n.
  • Рекомендуется событиями помечать фазы: start, token (по умолчанию), done, aborted.
  • Для совместимости с OpenAI‑совместимыми SDK добавляйте choices[0].delta.content.

Базовый сервер WebSocket (FastAPI)

				
					# server_ws.py
import asyncio, json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

async def tokens(prompt, max_tokens=256):
    for i in range(max_tokens):
        yield {"delta": {"content": f"tok{i}"}}
        await asyncio.sleep(0.01)

@app.websocket("/v1/ws")
async def ws_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        cfg = await ws.receive_json()     # {"prompt": "...", "max_tokens": 256}
        async for chunk in tokens(cfg.get("prompt",""), cfg.get("max_tokens",256)):
            await ws.send_json(chunk)
        await ws.send_json({"done": True})
    except WebSocketDisconnect:
        # клиент оборвал — прерываем генерацию
        return

				
			

Практика

  • Используйте ping/pong (или «heartbeat»‑сообщения) каждые N секунд.
  • Для отмены — договоритесь о сообщении: {«action»:»cancel»,»id»:»<req_id>»}.

Клиенты стриминга

SSE (браузер, EventSource)

				
					<script>
const es = new EventSource("/v1/chat/completions_stream"); // если GET
// для POST-сценариев используйте fetch с body и затем поток чтения (см. ниже)
es.addEventListener("start", e => console.log("started"));
es.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  const delta = msg?.choices?.[0]?.delta?.content || "";
  // добавить токен в UI
};
es.addEventListener("done", () => es.close());
</script>

				
			

SSE через fetch (POST + ReadableStream)

				
					const resp = await fetch("/v1/chat/completions_stream", {
  method: "POST",
  headers: {"Content-Type":"application/json"},
  body: JSON.stringify({messages:[{role:"user",content:"Привет"}], max_tokens:200})
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buf = "";
for (;;) {
  const {value, done} = await reader.read();
  if (done) break;
  buf += decoder.decode(value, {stream:true});
  let parts = buf.split("\n\n");
  buf = parts.pop();
  for (const p of parts) {
    if (p.startsWith("data: ")) {
      const payload = JSON.parse(p.slice(6));
      const delta = payload?.choices?.[0]?.delta?.content || "";
      // обновляем UI
    }
  }
}

				
			

WebSocket (браузер)

				
					const ws = new WebSocket("wss://host/v1/ws");
ws.onopen = () => ws.send(JSON.stringify({prompt:"Старт", max_tokens:200}));
ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.done) ws.close();
  else {
    const delta = msg?.delta?.content || "";
    // показать токен
  }
};

				
			

Тайм‑ауты, heartbeat, отмена

Тайм‑ауты уровня запроса

  • Prefill timeout: максимальное время до первого токена (TTFT).
  • Idle timeout: максимальная пауза между токенами (если модель «замолчала»).
  • Overall timeout: общий предел на длительность стрима.

Heartbeat

  • Отправляйте пустые «пинги» каждые 10–30 с, чтобы соединение не считалось «мертвым» на прокси.
    • SSE: : ping\n\n или event: ping\ndata: {}\n\n
    • WebSocket: ping‑фрейм или {type:»ping»}

Отмена

  • SSE: закрытие соединения клиентом → серверу прилетит CancelledError — прервите генерацию и освободите ресурсы.
  • WebSocket: отдельное сообщение {«action»:»cancel»,»id»:»…»} → сервер завершает стрим и отправляет {done:true}.

Формат чанков и совместимость

Рекомендации по JSON‑чанку

				
					{
  "id": "req_123",
  "model": "llama3-8b",
  "choices": [
    { "index": 0, "delta": { "content": "токен" }, "finish_reason": null }
  ]
}

				
			
  • Отправляйте один токен или пачку токенов каждые 20–50 мс — подбор зависит от TPS и UX.
  • Финальный блок: finish_reason: «stop» | «length» | «cancelled» и/или [DONE].
  • Для RAG/инструментов добавляйте вспомогательные поля (например, tool_calls).

Прокси/ingress: что настроить

Общие настройки (SSE)

				
					# пример для reverse proxy
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;         # важно для стрима
proxy_read_timeout 3600s;    # общий тайм-аут
proxy_send_timeout 3600s;

				
			

WebSocket

				
					proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 3600s;

				
			

Полезно

  • Включите X-Accel-Buffering: no на ответе SSE.
  • Проверьте лимиты keep‑alive, максимума одновременных соединений и буферизацию ответов.
  • Для публичных API — CORS (разрешённые origin), rate‑limit и квоты.

SLA и метрики стрима

Метрики

  • TTFT (time‑to‑first‑token) p50/p95.
  • TBT (межтокенная задержка, мс/токен).
  • TTLT (время до последнего токена).
  • TPS_decode, QPS, доля timeouts/errors, отмены.
  • KV‑кэш: занятость/эвикции, одновременность потоков.

Формулы

T_total ≈ TTFT + (L_out / (TPS_decode / B)) + overhead

QPS     ≈ B / T_total

Мониторинг и трейсинг: /solutions/llm-inference/observability/, логи/метрики — /solutions/monitoring-logging/.

Стоимость стриминга (как считать)

Обозначим: TPS_decode — токены/с (на батч), Num_GPU, GPU_hour_price.

Tokens_per_hour ≈ TPS_decode × 3600

Cost_per_1M     ≈ (GPU_hour_price × Num_GPU) / (Tokens_per_hour / 1e6)Снижайте цену за 1М: квантизация (/solutions/llm-inference/quantization/), ограничение max_tokens, continuous batching (в стеке сервинга), prefix‑кэш (если доступен).

Траблшутинг

  • Долгий TTFT. Сократите префикс/системные подсказки, используйте prefix‑кэш (см. vLLM), выделите пул для длинных запросов.
  • Обрывы через прокси. Отключите буферизацию, увеличьте read/send‑timeout, добавьте heartbeat.
  • «Рваный» поток в UI. Увеличьте пачку токенов/частоту чанков, включите сглаживание в клиенте.
  • GPU‑OOM на пике. Жёстче лимиты max context/max_tokens, разнесите short/long по пулам, квантизация KV/весов.
  • Высокая p95. Приоритизируйте short‑очередь, увеличьте реплики short‑пула, ограничьте длинные ответы.

Как запускать это в cloudcompute.ru

Чек‑лист перед продом

  • Выбран протокол: SSE (по умолчанию) или WebSocket (интерактив).
  • Заданы лимиты: max_tokens, max context, одновременность.
  • Настроены тайм‑ауты: prefill/idle/overall, heartbeat.
  • Прокси/ingress: без буферизации, увеличенные timeouts, upgrade для WS.
  • Метрики/алерты: TTFT/TBT/TTLT, TPS/QPS, KV‑кэш, ошибки/отмены.
  • Фоллбек‑план: деградация лимитов, перевод в другой пул/модель, повтор запроса.