Детекция аномалий на GPU: real‑time и батч

Задача страницы. Инженерный гид по построению сервисов анализa аномалий на GPU: стриминговое обнаружение (low‑latency) и офлайн‑батчи (backfill/ретроанализ), методы (Isolation Forest / kNN‑score / Autoencoder / TCN‑прогноз + остатки / правила), выборка окон, калибровка порогов, наблюдаемость, экономика и эксплуатация. Основы: локальный NVMe‑кэш, режимы On‑Demand (онлайн‑детекция) и Interruptible (обучение/пересчёт), контейнеризация, GPU‑ETL на RAPIDS/Spark RAPIDS.

TL;DR

  • **Два режима:

  • On‑Demand — стриминг событий/метрик, окно 1–60 с, цель: E2E‑латентность p95 ≤ 1–3 с.

  • Interruptible — офлайн‑обучение/калибровка/ретроанализ, чанк ≤ 120 с, ретраи.

  • **Методы (ядро): Плотностные/близостные: Isolation Forest (IF), kNN‑score (ANN на GPU). Реконструкция: Autoencoder (AE), Variational AE. Прогноз + остатки: TCN/LSTM → residual = |y - ŷ| → порог. Правила/гибрид: пороги, CUSUM, правила антифрода, ансамбль с ML.

  • Пайплайн: Ingest → GPU‑фичи → Скольжение окна → Модель (GPU) → Агрегатор → Алерт/маршрутизация → Хранилище сигналов.

  • Наблюдаемость: TTFA_alert, latency_e2e, alerts_per_min, FPR/FNR/Precision/Recall, AUC‑PR, drift PSI/KL, gpu_util/HBM, nvme_io.

  • Экономика: GPU_count = ceil(QPS × t_infer / U); Cost_per_1M = (c_gpu × 1e6 × t_infer) / (3600 × U); для батча T_batch ≈ (RTF × L) / (GPU_count × U). См. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/

  • Стек: RAPIDS/cuDF/cuML и/или Spark RAPIDS для ETL, PyTorch/ONNX/Triton для AE/TCN, ANN‑индекс на GPU для kNN. См. https://cloudcompute.ru/solutions/rapids/, https://cloudcompute.ru/solutions/spark-rapids/, https://cloudcompute.ru/solutions/triton-inference-server/

Сценарии (когда это нужно)

  • Антифрод/финтех: транзакции, переводы, аномальные паттерны, velocity‑лимиты, «мулы».
  • E‑commerce/реклама: скликивание, всплески CTR, аномальные корзины/возвраты, прайс‑ошибки.
  • Операции/IoT: телеметрия датчиков, SLO/SLA‑мониторинг, деградации сервисов, outlier‑метрики.
  • Логи/события: редкие последовательности, неожиданные комбинации атрибутов, аномальный трафик.
  • Рекомендательные стеки: аномальные item/user‑паттерны, защита от «накруток». См. https://cloudcompute.ru/solutions/recsys/

Архитектуры и пайплайны 1) Реал‑тайм детекция (низкая латентность, SSE/WebSocket)

Ingress (Kafka/WS/HTTP)
 └─> Feature Extract (cuDF, rolling stats, one-hot/hash)
 └─> Sliding Window (W=10–60 s, step=1–5 s)
 └─> GPU Model:
 ├─ IF/kNN-score (ANN)
 ├─ AE (reconstruction error)
 └─ TCN forecast (residual)
 └─> Aggregator (score→alert, hysteresis)
 └─> Stream Out (SSE/WS JSON) + Storage (Parquet)

Особенности: TTFA_alert ≤ 0.5–1.0 с, drop‑политики очереди, ring‑buffer 200–500 мс, бэктест последних N окон.

2) Батчи (обучение/ретроанализ)

Object/NVMe ─┬─> GPU‑ETL (RAPIDS / Spark RAPIDS)
 ├─> Train (AE/TCN/IF) + Hyperparam Search
 ├─> Threshold Calibration (quantile at FPR budget)
 └─> Backfill Scoring → Materialized Alerts (Parquet)
 └─> Export: model.onnx + thresholds.json → Serving

Особенности: версионирование моделей/порогов/фичей; CTAS‑шаги; idempotent‑запуски. См. https://cloudcompute.ru/solutions/rapids/, https://cloudcompute.ru/solutions/spark-rapids/

3) Гибрид «правила + ML»

Rules (hard) ─┬─> Gate → Fast Reject/Accept
ML Scores ────┘ └─> Borderline → Human/Slow path

Выбор метода (чем руководствоваться)

**Данные/цель** **Рекомендуемый метод** **Примечания**
Мульти‑фичевые табличные события **Isolation Forest** / **kNN‑score** Быстрый старт; kNN требует ANN‑индекса на GPU.
Последовательности/метрики (time‑series) **TCN/LSTM прогноз + остатки** Устойчиво к трендам/сезонности; нужен «девственный» период.
Высокая нелинейность/мало меток **Autoencoder (AE)** Reconstruction error как score; калибровка по квантилю.
Жёсткие бизнес‑ограничения **Правила + ML** Комбинировать: hard‑rules → ML‑скоринг → ensemble.

GPU‑профили и ориентиры

Оценки по одновременным реал‑тайм входящим событиям (QPS) при U≈0.7. Для батчей масштабируйте по формулам в «Экономике».

**Профиль GPU** **Память** **Типичный стек** **Онлайн QPS\*** **Батч‑скоринг (evt/s)\*\*** **Комментарии**
24 ГБ (Compact) 24 ГБ RAPIDS + IF/kNN‑score 10–30 к 50–150 к Базовые фичи, окна ≤ 30 с, ANN средней ёмкости.
48 ГБ (Balanced) 48 ГБ AE/TCN + ANN на GPU 30–70 к 150–300 к Баланс качества/латентности; стабильный p95.
80 ГБ (HQ) 80 ГБ Сложные модели + богатые фичи 70–150 к 300–600 к Большие фиче‑вектора, крупные индексы/окна.

* Зависят от длины окна, сложности фичей/модели, I/O. ** Единичный сервис‑воркер; линейно масштабируется до пределов NVMe/сети. Тюнинг: https://cloudcompute.ru/solutions/performance-tuning/, https://cloudcompute.ru/solutions/throughput-vs-latency/ ## Конфиги и скелеты кода

Docker Compose (real‑time + batch)

version: "3.9"
x-env: &env
 MODELS_DIR: /models
 CACHE_DIR: /var/cache/anom
 PRECISION: "fp16" # bf16|fp16|fp32
 METHOD: "ae" # ae|tcn|if|knn
 WINDOW_SEC: "30"
 STEP_SEC: "5"
 ANN_TOPK: "50"
 THRESHOLD: "p99.5" # abs|zscore|pXX quantile
 DRIFT_MONITOR: "psi" # psi|kl|disabled
services:
 anom-rt:
 image: cloudcompute/anom-detector:latest
 environment:
 <<: *env
 SERVICE_MODE: "realtime"
 TTFA_TARGET_MS: "800"
 deploy:
 resources:
 reservations:
 devices: [{ capabilities: ["gpu"] }]
 ports: ["9020:9020"]
 volumes:
 - /nvme/models:/models
 - /nvme/anom-cache:/var/cache/anom
 command: ["python","serve_rt.py","--host=0.0.0.0","--port=9020"]
 anom-batch:
 image: cloudcompute/anom-detector:latest
 environment:
 <<: *env
 SERVICE_MODE: "batch"
 MAX_CHUNK_SECONDS: "120"
 deploy:
 resources:
 reservations:
 devices: [{ capabilities: ["gpu"] }]
 volumes:
 - /nvme/datasets:/data
 - /nvme/models:/models
 - /nvme/anom-cache:/var/cache/anom
 command: ["python","score_batch.py","--input=/data/in.parquet","--output=/data/out/"]

K8s (1 GPU/под)

apiVersion: apps/v1
kind: Deployment
metadata: { name: anom-rt }
spec:
 replicas: 3
 selector: { matchLabels: { app: anom-rt } }
 template:
 metadata: { labels: { app: anom-rt } }
 spec:
 containers:
 - name: rt
 image: cloudcompute/anom-detector:latest
 ports: [{ containerPort: 9020 }]
 env:
 - { name: METHOD, value: "ae" }
 - { name: WINDOW_SEC, value: "30" }
 - { name: STEP_SEC, value: "5" }
 - { name: PRECISION, value: "fp16" }
 volumeMounts:
 - { name: models, mountPath: /models }
 - { name: cache, mountPath: /var/cache/anom }
 resources:
 limits: { nvidia.com/gpu: 1, memory: "24Gi", cpu: "4" }
 volumes:
 - name: models ; hostPath: { path: /nvme/models }
 - name: cache ; hostPath: { path: /nvme/anom-cache }

Конфиг пайплайна (YAML)

pipeline:
 input:
 schema: ["ts","user_id","amount","geo","category","device"]
 time_col: "ts"
 features:
 numeric: ["amount"]
 categorical: ["geo","category","device"]
 hashing_trick: { dims: 512 }
 rolling:
 window_sec: 30
 stats: ["mean","std","min","max","p90","count"]
 model:
 method: "ae" # ae|tcn|if|knn
 ae:
 hidden: [256,128,32,128,256]
 dropout: 0.1
 tcn:
 channels: [64,64,64]
 kernel: 5
 horizon: 5 # шагов вперёд
 iforest:
 trees: 200
 max_samples: 256
 knn:
 index: "gpu_ivf"
 topk: 50
 threshold:
 mode: "quantile"
 q: 0.995
 output:
 emit: ["score","is_anomaly","explain_shap"] # explain опционально

FastAPI: стриминг алертов (SSE)

from fastapi import FastAPI, Body
from sse_starlette.sse import EventSourceResponse
import json, time
app = FastAPI()
async def score_stream(event_iter, model, featurizer, thr):
 buf = []
 last_emit = time.time()
 async for evt in event_iter:
 x = featurizer.update(evt) # обновление rolling-окон
 s = float(model.score(x)) # anomaly score
 y = int(s > thr.value())
 buf.append({"t": evt["ts"], "score": s, "anomaly": y})
 now = time.time()
 if now - last_emit > 0.1: # частичные результаты
 yield json.dumps(buf); buf.clear(); last_emit = now
 if buf: yield json.dumps(buf)
@app.post("/alerts/stream")
async def alerts_stream(payload: dict = Body(...)):
 model, featurizer, thr = load_model(), load_feat(), load_thr()
 async def gen():
 async for chunk in score_stream(events_iter(payload), model, featurizer, thr):
 yield {"event": "alerts", "data": chunk}
 yield {"event": "done", "data": "{}"}
 return EventSourceResponse(gen())

PyTorch: Autoencoder (обучение + скоринг)

import torch, torch.nn as nn, torch.nn.functional as F
from torch.cuda.amp import autocast, GradScaler
class AE(nn.Module):
 def __init__(self, dims=[256,128,32,128,256]):
 super().__init__()
 e = [nn.Linear(dims[i], dims[i+1]) for i in range(len(dims)//2)]
 d = [nn.Linear(dims[-i-1], dims[-i-2]) for i in range(len(dims)//2)]
 self.enc, self.dec = nn.ModuleList(e), nn.ModuleList(d)
 def forward(self, x):
 h = x
 for lin in self.enc: h = F.relu(lin(h))
 for lin in self.dec[:-1]: h = F.relu(lin(h))
 return self.dec[-1](h)
def train_ae(loader, dims, epochs=5, lr=1e-3, device="cuda"):
 model, opt, scaler = AE(dims).to(device), torch.optim.AdamW(AE(dims).parameters(), lr=lr), GradScaler()
 for _ in range(epochs):
 for xb in loader:
 xb = xb.to(device, non_blocking=True)
 opt.zero_grad(set_to_none=True)
 with autocast():
 recon = model(xb)
 loss = F.mse_loss(recon, xb)
 scaler.scale(loss).backward(); scaler.step(opt); scaler.update()
 return model
def score_ae(model, x):
 with torch.no_grad(), autocast():
 r = model(x)
 return ((r - x)**2).mean(dim=-1) # reconstruction error

Наблюдаемость/метрики/алерты

Latency/Perf:

  • anom_ttfa_seconds — до первого алерта (цель p95 ≤ 0.5–1.0 с).
  • anom_latency_e2e_seconds — от события до алерта (p50/p95).
  • anom_qps, anom_queue_depth, anom_drop_total.
  • gpu_utilization, gpu_memory_bytes, gpu_mem_peak_bytes, nvme_{read,write}_mb_s.

Quality:

  • anom_precision, anom_recall, anom_fpr, anom_fnr, anom_auc_pr.
  • Калибровка: anom_threshold@target_fpr (например, FPR ≤ 0.1%).
  • Дрифт: anom_psi/anom_kl по ключевым фичам.

Ops:

  • alerts_per_min, suppressed_per_min (гистерезис), explain_rate.
  • Версии: model_version, threshold_version, feature_schema_version.

Алерты (примеры):

  • anom_latency_e2e_p95 > SLA — уменьшить окно/шаг, упростить фичи/индекс, увеличить GPU.
  • anom_fpr > бюджет — поднять порог/перекалибровать на свежем корпусе, добавить правила.
  • anom_psi > порог — дрифт фичей; переобучение/обновление порога.
  • gpu_mem_peak/HBM > 0.9 — уменьшить размер окна/батча, перейти на FP16, разделить сервисы (ANN/модель).

Подробнее: https://cloudcompute.ru/solutions/monitoring-logging/https://cloudcompute.ru/solutions/llm-inference/observability/

Экономика и формулы

Обозначения: c_gpu — цена/час, U — целевая загрузка, t_infer — среднее время инференса на событие, QPS — входной поток.

  • **Онлайн (сколько GPU): GPU_count = ceil( (QPS × t_infer) / U ).
  • **Стоимость 1 млн событий: Cost_per_1M = (c_gpu × 1e6 × t_infer) / (3600 × U).
  • **Батч (время партии): T_batch ≈ (RTF × L_events) / (GPU_count × U).
  • **Окно и шаг: Увеличение WINDOW_SEC повышает качество, но растит задержку и HBM. Балансируйте окно/шаг/порог через AUC‑PR и p95 latency.

Компромиссы throughput↔latency — см. https://cloudcompute.ru/solutions/throughput-vs-latency/. Планирование бюджета — https://cloudcompute.ru/solutions/cost-planner/

Безопасность/политики

  • PII/комплаенс: маскирование/хэширование идентификаторов; минимизация фичей; шифрование «на диске/в канале».
  • Ретеншн: TTL на сырые события и временные окна; хранить только агрегаты/сигналы, необходимые для аудита.
  • Изоляция: раздельные пулы On‑Demand/Interruptible; роли на модели/индексы; журналирование экспорта.
  • Объяснимость: храните threshold_version, подпись корпуса калибровки и метрики качества.

См. также: https://cloudcompute.ru/solutions/security/https://cloudcompute.ru/solutions/storage-data/

Траблшутинг

**Симптом** **Возможная причина** **Решение**
Высокий FPR (много ложных) Недокалиброванный порог/дрифт Перекалибровка на свежем корпусе, PSI/KL‑мониторинг, гибрид с правилами
Пропущенные аномалии (FNR↑) Окно слишком короткое/агрессивная фильтрация Увеличить WINDOW\_SEC, добавить фичи, снизить порог/усилить модель
Латентность «скачет» Узкое I/O/ANN/фичестор NVMe‑кэш, уменьшить TOP‑K, упростить фичи, повысить параллельность
VRAM OOM Слишком длинные окна/большие батчи FP16/BF16, сократить окно/батч, разделить сервисы (ANN/модель), offload
GPU простаивает Самплер/фичи на CPU Pinned/zero‑copy, cuDF‑фичи на GPU, предвычисление агрегатов
Алерты «дребезжат» Нет гистерезиса/дебаунса Ввести enter/exit‑порог, min‑duration, экспоненциальное сглаживание
Качество деградировало после релиза Дрифт фичей/смена схемы Версионировать схемы/модели/пороги, канареечный деплой, откат
ANN индекс «распух» Старые точки/дубликаты TTL на векторы, периодический rebuild/консолидация

См. также: https://cloudcompute.ru/solutions/interruptible-patterns/https://cloudcompute.ru/solutions/performance-tuning/

Как запустить в cloudcompute.ru

  1. Откройте Шаблоны запусков: https://cloudcompute.ru/solutions/templates/ — выберите Anomaly Detection (Realtime) и/или Anomaly Detection (Batch).
  2. Профиль GPU: 24/48/80 ГБ — по сложности фичей/модели и SLA.
  3. Диски: /nvme/models, /nvme/anom-cache, для батчей — /nvme/datasets.
  4. Задайте переменные окружения из docker-compose.yml (метод, окна, шаг, порог, DRIFT‑мониторинг).
  5. Продакшн: автоскейл по U/QPS/latency, дашборды и алерты, раздельные пулы On‑Demand/Interruptible, канареечный деплой.

Чек‑лист перед продом

  • Замерены TTFA_alert p50/p95, latency_e2e p50/p95, qps, HBM.
  • Достигнуты целевые AUC‑PR/Precision/Recall на контрольном корпусе; пороги зафиксированы.
  • Настроен мониторинг дрифта (PSI/KL) и политика перекалибровки.
  • NVMe‑кэш/индексы/фичестор прогреты; ANN‑индекс обслуживается.
  • Алерты и дашборды: latency/FPR/FNR/HBM/IO/queue.
  • Политики PII/ретеншн/шифрование внедрены; роли и журнал экспорта.
  • Канареечный релиз и откат готовы; нагрузочный прогон ≥ 30 мин с целевыми p95.

Навигация

Готовы запустить?

Запустить GPU-сервер