RAPIDS на GPU: cuDF/cuML и ускорение ETL

Задача страницы. Инженерный гид по GPU‑ETL и «классическому» ML на базе RAPIDS: ускоренная обработка таблиц (cuDF), распределённые пайплайны (Dask‑CUDA), обучение/инференс «деревьев/линейных/кластеризации» (cuML). Разберём типовые архитектуры single‑/multi‑GPU, конфиги, RMM‑пулы, UCX, метрики p50/p95, экономику и траблшутинг. Все примеры ориентированы на NVMe‑кэш, режимы On‑Demand (интерактив/ноутбуки/онлайн‑фичи) и Interruptible (ночные батчи/переиндекс/фиче‑инжиниринг).

TL;DR

  • **Паттерны: Single‑GPU — быстрый прототип/интерактив: cuDF + cuML + RMM pool. Multi‑GPU — батчи/большие датасеты: Dask‑cuDF + Dask‑CUDA (UCX, NVLink/IB), spill на NVMe.
  • Ключевые узлы: RMM‑пул (70–80% HBM), NVMe для временных артефактов, UCX‑транспорт, партиционирование по ключу (set_index), явное persist().
  • ETL‑операции: чтение Parquet/ORC → фильтры → merge/join → groupby/agg/window → categorify/encode → write Parquet; для ML — train/test split → cuML (RF/GBT/LogReg/KMeans).
  • Метрики/наблюдаемость: etl_throughput_gib_s, cudf_op_seconds{op=...}, rmm_current_bytes, dask_task_duration_seconds, spill_bytes, gpu_utilization, p50/p95.
  • Экономика: T_etl ≈ S_data / (N_gpu × BW_eff), Cost ≈ c_gpu × (T_etl+T_train). Планирование — см. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
  • Безопасность: PII‑маскирование, шифрование «на диске/в канале», TTL на временных данных, изоляция пулов по тенантам.

Сценарии (когда это нужно)

  • E‑commerce/аналитика: ежедневные батчи фичей, аггрегации, слияние логов и каталогов, быстрый A/B‑анализ.
  • Рекомендации/реклама: отрицательный семплинг, подготовка пар «user‑item», построение эмбеддингов, оффлайн‑пересчёт индексов (см. https://cloudcompute.ru/solutions/recsys/).
  • NLP/мультимедиа‑таблицы: массовая обработка метаданных, статистики, подготовка «манифестов» для downstream пайплайнов (ASR/диаризация/музыка).
  • Оперативная BI: интерактивные ноутбуки на single‑GPU, выборки и джойны GiB‑масштаба без «часовых» ожиданий.

Архитектуры и пайплайны 1) Single‑GPU: быстрый ETL + ML (ноутбук/сервис)

NVMe (Parquet) ──> cuDF.read_parquet
 └─> Filter/Project/FillNA
 └─> Join/Merge (keys)
 └─> GroupBy/Agg/Windows
 └─> Feature Encode/Scale
 ├─> Write Parquet (NVMe)
 └─> cuML fit/predict

Особенности: RMM‑pool 70–80% HBM; избегайте лишних to_pandas(); для больших merge — упорядочивать ключи и контролировать кардинальности.

2) Multi‑GPU: Dask‑cuDF + Dask‑CUDA (батчи/кластеры)

Object/NVMe ──> dask_cudf.read_parquet (many partitions)
 └─> Map/Filter (per‑partition)
 └─> set_index/shuffle (by key)
 └─> Join/GroupBy (distributed)
 └─> Persist/Checkpoint
 ├─> dask_cuml fit/predict
 └─> write_parquet (repartition)

Особенности: protocol="ucx", включайте NVLink/IB; контролируйте spill на NVMe, --rmm-pool-size; тяжёлые шифлы — ночью (Interruptible).

3) Гибрид: Online‑фичи + Offline‑батчи

Kafka/Stream ─> Micro‑batch to NVMe ─> Dask‑cuDF ingest ─> Update Feature Store ─> Training Windows ─> Export to Serving

Сервинг моделей см. https://cloudcompute.ru/solutions/triton-inference-server/

Профили GPU и ориентиры

Ниже — инженерные ориентиры для оценки пропускной способности ETL (на потоке операций «фильтр→джойн→аггрегация» при Parquet на NVMe). Для вашего корпуса и схемы данных обязательно снимайте фактические метрики.

**Профиль GPU** **Память** **Типовой ETL‑стек** **Оценка пропускной способности\*** **Комментарии**
24 ГБ (Compact) 24 ГБ cuDF single‑GPU порядка **0.5–1.5 GiB/s** Быстрый интерактив, средние джойны.
48 ГБ (Balanced) 48 ГБ Dask‑cuDF (2–4 GPU) порядка **1–3 GiB/s** Батчи, широкие таблицы, контроль шифлов.
80 ГБ (HQ) 80 ГБ Dask‑cuDF (4–8 GPU) порядка **2–5 GiB/s** Тяжёлые джойны/группировки, большие окна.

* Итог зависит от кардинальностей ключей, ширины строк, компрессии, NVMe/сети и параметров шифла.

Тюнинг/профилировка: https://cloudcompute.ru/solutions/performance-tuning/https://cloudcompute.ru/solutions/throughput-vs-latency/https://cloudcompute.ru/solutions/multi-gpu/

Конфиги и скелеты кода

Docker Compose (ноутбук + Dask‑кластер)

version: "3.9"
x-common-env: &env
 RMM_POOL_SIZE: "70%" # пул памяти RMM
 DASK_PROTOCOL: "ucx" # ucx|tcp
 CUDA_VISIBLE_DEVICES: "0,1" # укажите ваши GPU
 UCX_TLS: "tcp,cuda_copy,cuda_ipc"
 UCX_SOCKADDR_TLS_PRIORITY: "tcp"
 WORKER_SPILL_DIR: "/nvme/spill"
services:
 rapids-notebook:
 image: cloudcompute/rapids-notebook:latest
 environment:
 <<: *env
 deploy:
 resources:
 reservations:
 devices: [{ capabilities: ["gpu"] }]
 ports: ["8888:8888"]
 volumes:
 - /nvme/datasets:/data
 - /nvme/spill:/nvme/spill
 command: ["jupyter", "lab", "--ip=0.0.0.0", "--allow-root"]
 dask-scheduler:
 image: cloudcompute/dask-cuda:latest
 environment:
 <<: *env
 ports: ["8786:8786","8787:8787"]
 command: ["dask-scheduler"]
 dask-worker:
 image: cloudcompute/dask-cuda:latest
 depends_on: [dask-scheduler]
 environment:
 <<: *env
 deploy:
 resources:
 reservations:
 devices: [{ capabilities: ["gpu"] }]
 volumes:
 - /nvme/spill:/nvme/spill
 command:
 ["dask-cuda-worker","dask-scheduler:8786",
 "--rmm-pool-size","${RMM_POOL_SIZE}",
 "--local-directory","/nvme/spill",
 "--enable-tcp-over-ucx","--enable-cuda-ipc"]

K8s (Dask‑CUDA: планировщик и воркеры с GPU)

apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-scheduler }
spec:
 replicas: 1
 selector: { matchLabels: { app: dask-scheduler } }
 template:
 metadata: { labels: { app: dask-scheduler } }
 spec:
 containers:
 - name: scheduler
 image: cloudcompute/dask-cuda:latest
 ports: [{ containerPort: 8786 }, { containerPort: 8787 }]
 resources: { limits: { cpu: "2", memory: "4Gi" } }
---
apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-worker }
spec:
 replicas: 2
 selector: { matchLabels: { app: dask-worker } }
 template:
 metadata: { labels: { app: dask-worker } }
 spec:
 containers:
 - name: worker
 image: cloudcompute/dask-cuda:latest
 env:
 - { name: RMM_POOL_SIZE, value: "70%" }
 command:
 ["dask-cuda-worker","dask-scheduler:8786",
 "--rmm-pool-size","70%","--local-directory","/nvme/spill",
 "--enable-tcp-over-ucx","--enable-cuda-ipc"]
 volumeMounts:
 - { name: spill, mountPath: /nvme/spill }
 resources:
 limits: { nvidia.com/gpu: 1, cpu: "4", memory: "24Gi" }
 volumes:
 - name: spill
 hostPath: { path: /nvme/spill }

Конфиг пайплайна (YAML)

etl:
 input:
 path: "/data/events/*.parquet"
 columns: ["user_id","item_id","ts","price","category"]
 preprocess:
 filter:
 ts_from: "2025-01-01"
 fillna:
 price: 0.0
 join:
 right_path: "/data/catalog/*.parquet"
 keys: ["item_id"]
 how: "left"
 aggregations:
 groupby: ["user_id","category"]
 metrics:
 - { col: "price", op: "sum", as: "spend_sum" }
 - { col: "item_id", op: "count", as: "views" }
 encode:
 categorify: ["category"]
 output:
 path: "/data/features/dt=2025-08-28/"
 repartition: 256
ml:
 model: "rf_classifier" # rf_classifier|logreg|kmeans
 target: "label"
 features: ["spend_sum","views","category_enc"]
 train_ratio: 0.8

Python: single‑GPU (cuDF + cuML)

import cudf, cupy as cp
from cuml.model_selection import train_test_split
from cuml.ensemble import RandomForestClassifier
from rmm import reinitialize
# Инициализация RMM-пула (~70% HBM)
reinitialize(pool_allocator=True, initial_pool_size=int(0.7 * cp.cuda.runtime.memGetInfo()[1]))
# ETL
events = cudf.read_parquet("/data/events/*.parquet", columns=["user_id","item_id","ts","price","category"])
events = events[events["ts"] >= "2025-01-01"].fillna({"price": 0.0})
catalog = cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
df = events.merge(catalog, on="item_id", how="left")
feat = (df.groupby(["user_id","category"])
 .agg({"price":"sum","item_id":"count"})
 .rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
 .reset_index())
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
feat.to_parquet("/data/features/2025-08-28/")
# ML
X = feat[["spend_sum","views","category_enc"]].astype("float32")
y = (feat["spend_sum"] > 0).astype("int32") # пример таргета
X_tr, X_te, y_tr, y_te = train_test_split(X, y, train_size=0.8, random_state=42)
clf = RandomForestClassifier(n_estimators=200, max_depth=12)
clf.fit(X_tr, y_tr)
print("AUC-like proxy:", float((clf.predict_proba(X_te)[:,1] > 0.5).astype("int32").mean()))

Python: Multi‑GPU (Dask‑cuDF + Dask‑CUDA + cuML)

from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import dask_cudf, cudf
from cuml.dask.ensemble import RandomForestClassifier as DaskRF
cluster = LocalCUDACluster(protocol="ucx", enable_tcp_over_ucx=True,
 enable_nvlink=True, rmm_pool_size="70%")
client = Client(cluster)
dfe = dask_cudf.read_parquet("/data/events/*.parquet",
 columns=["user_id","item_id","ts","price","category"])
dfe = dfe[dfe["ts"] >= "2025-01-01"].fillna({"price": 0.0})
dfc = dask_cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
# Шифл по ключу для эффективного merge
dfe = dfe.set_index("item_id", shuffle="tasks")
dfc = dfc.set_index("item_id", shuffle="tasks")
df = dfe.merge(dfc, left_index=True, right_index=True, how="left")
feat = (df.reset_index()
 .groupby(["user_id","category"])
 .agg({"price":"sum","item_id":"count"})
 .rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
 .reset_index())
feat = feat.persist(); wait(feat)
# Категории → коды (per-partition)
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
# Разделение на обучение/тест (простое разбиение по hash user_id)
train = feat[feat["user_id"].hash_values() % 5 != 0].persist()
test = feat[feat["user_id"].hash_values() % 5 == 0].persist(); wait([train, test])
X_tr = train[["spend_sum","views","category_enc"]].astype("float32")
y_tr = (train["spend_sum"] > 0).astype("int32")
X_te = test[["spend_sum","views","category_enc"]].astype("float32")
y_te = (test["spend_sum"] > 0).astype("int32")
rf = DaskRF(n_estimators=400, max_depth=14)
rf.fit(X_tr, y_tr)
pred = rf.predict_proba(X_te)[:,1]
print("p50_partitions:", client.run_on_scheduler(lambda: "ok")) # заглушка метрик
``` **Наблюдаемость, метрики, алерты**

**Latency/Perf (Prometheus‑стиль):**

- rapids\_cudf\_op\_seconds{op=read|merge|groupby|write} — длительность ключевых операций (p50/p95).
- rapids\_etl\_throughput\_gib\_s — GiB/с по пайплайну.
- rapids\_cuml\_fit\_seconds{model=rf|logreg|kmeans} — обучение; rapids\_cuml\_predict\_seconds.
- dask\_task\_duration\_seconds{type=shuffle|map|reduce}; dask\_scheduler\_queue\_len.
- rmm\_current\_bytes, rmm\_peak\_bytes; spill\_bytes (на NVMe).
- gpu\_utilization, gpu\_memory\_bytes, nvme\_{read,write}\_mb\_s.

**Quality/ML:**

- ml\_auc, ml\_accuracy, ml\_f1, ml\_calibration\_ece — оффлайн.

**Алерты (примеры):**

- rmm\_current\_bytes/HBM &gt; 0.8 — увеличить spill/уменьшить партиции/снизить ширину строк.
- dask\_task\_duration\_p95{type=shuffle} ↑ — перегрузка шифла; перенести job в Interruptible, увеличить GPU/пул NVMe.
- spill\_bytes\_p95 ↑ — не хватает HBM; пересобрать RMM‑pool, оптимизировать join/категории.
- rapids\_cudf\_op\_seconds{op=merge}\_p95 ↑ — перекос ключей; ребаланс set\_index, предварительные categorify.

Дашборды и логирование: <https://cloudcompute.ru/solutions/monitoring-logging/> • <https://cloudcompute.ru/solutions/llm-inference/observability/>

## **Экономика и формулы**

Обозначения: S\_data — объём входных данных (GiB), N\_gpu — число GPU, BW\_eff — эффективная пропускная (GiB/с/GPU), c\_gpu — цена/час, U — целевая загрузка.

- **Время ETL:** T\_etl ≈ S\_data / (N\_gpu × BW\_eff).
- **Время тренинга:** T\_train ≈ N\_iter × t\_iter / (N\_gpu × U) (условно; зависит от модели).
- **Стоимость:** Cost\_total ≈ c\_gpu × (T\_etl + T\_train).
- **On‑Demand vs Interruptible:** для тяжёлых шифлов переносите в Interruptible; Cost\_per\_1T\_rows считайте через фактический etl\_throughput\_gib\_s.

Планирование бюджета: <https://cloudcompute.ru/solutions/cost-planner/> • компромиссы: <https://cloudcompute.ru/solutions/throughput-vs-latency/>

**Безопасность/политики**

- **PII/комплаенс:** маскирование полей на этапе ETL; раздельные бакеты для сырых/очищенных данных.
- **Шифрование:** «в канале» (Ingress/межузловые) и «на диске» (NVMe spill/артефакты).
- **Ретеншн:** TTL для временных файлов/спиллов; аудит доступа к фичестору.
- **Изоляция:** отдельные GPU‑пулы по тенантам/задачам; quotas на Dask‑кластер.

Подробнее: <https://cloudcompute.ru/solutions/security/> • <https://cloudcompute.ru/solutions/storage-data/>

**Траблшутинг**

<table><tbody><tr><td>**Симптом**

</td><td>**Возможная причина**

</td><td>**Решение**

</td></tr><tr><td>CUDA OOM при merge/groupby

</td><td>Узкие партиции, большие промежуточные

</td><td>Увеличить RMM‑pool, repartition, set\_index по ключу, включить spill

</td></tr><tr><td>Шифл «висит»/очень долгий

</td><td>Высокая кардинальность/скос ключей

</td><td>Пред‑аггрегация, shuffle="p2p"/задачами, ключ с меньшей кардинальностью

</td></tr><tr><td>Сплошной spill на NVMe

</td><td>Завышенный batch/ширина строк

</td><td>Уменьшить число колонок, astype к компактным типам, частичная аггрегация

</td></tr><tr><td>Нестабильные воркеры

</td><td>Перегрев HBM/пики задач

</td><td>Снизить параллелизм, лимиты по памяти, увеличить --rmm-pool-size

</td></tr><tr><td>UCX/сетевые ошибки

</td><td>Неверные флаги/сеть

</td><td>Переключиться на TCP, проверить enable\_nvlink/enable\_tcp\_over\_ucx, MTU

</td></tr><tr><td>Несогласованность типов

</td><td>Смешанные dtypes между файлами

</td><td>Явные схемы, astype перед merge/write, валидация схемы

</td></tr><tr><td>Медленный write Parquet

</td><td>Мелкие партиции/много файлов

</td><td>repartition до разумного числа, увеличить row\_group\_size

</td></tr><tr><td>Модель «переучилась»

</td><td>Данные утекли/дисбаланс

</td><td>Time‑split, регуляризация, калибровка, контроль дрифта

</td></tr></tbody></table>

Ещё по устойчивости пайплайнов: <https://cloudcompute.ru/solutions/interruptible-patterns/> • тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>

**Как запустить в cloudcompute.ru**

1. Откройте **Шаблоны запусков**: <https://cloudcompute.ru/solutions/templates/> — выберите **RAPIDS ETL (Single‑GPU)** или **RAPIDS + Dask‑CUDA (Multi‑GPU)**.
2. Выберите профиль GPU: **24/48/80 ГБ** — по объёму данных/окнам/джойнам.
3. Смонтируйте диски: /nvme/datasets, /nvme/spill, при необходимости — /nvme/checkpoints.
4. Заполните переменные окружения (RMM‑pool, протокол UCX/TCP, spill‑директория) как в docker-compose.yml.
5. Для продакшна: автоскейл по очереди/использованию, дашборды/алерты, канареечные обновления пайплайна.

**Чек‑лист перед продом**

- Замерены etl\_throughput\_gib\_s, p50/p95 ключевых операций (read/merge/groupby/write).
- RMM‑pool 70–80% HBM; spill на NVMe протестирован.
- Партиционирование/set\_index по ключам настроено; шифлы устойчивы.
- Версионирование схемы/фичей и проверка типов включены.
- Алерты по rmm\_current\_bytes, spill\_bytes, task\_duration\_p95, gpu\_mem\_peak.
- Политики PII/ретеншн/шифрование — внедрены.
- Нагрузочный прогон ≥ 30 мин с целевыми p95; план отката готов.

**Навигация**

- Хаб «Решения»: <https://cloudcompute.ru/solutions/>
- Рекомендательные системы: <https://cloudcompute.ru/solutions/recsys/>
- Хранилища и данные: <https://cloudcompute.ru/solutions/storage-data/>
- Производительность и тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>
- Throughput vs Latency: <https://cloudcompute.ru/solutions/throughput-vs-latency/>
- Multi‑GPU: <https://cloudcompute.ru/solutions/multi-gpu/>
- Планирование стоимости: <https://cloudcompute.ru/solutions/cost-planner/>
- Мониторинг и логи: <https://cloudcompute.ru/solutions/monitoring-logging/>
- Наблюдаемость инференса/ETL: <https://cloudcompute.ru/solutions/llm-inference/observability/>
- Interruptible‑паттерны: <https://cloudcompute.ru/solutions/interruptible-patterns/>
- Triton Inference Server (сервинг моделей): <https://cloudcompute.ru/solutions/triton-inference-server/>
- Gradio + FastAPI (примеры UI/эндпойнтов): <https://cloudcompute.ru/solutions/gradio-fastapi/>
- CI/CD контейнеров: <https://cloudcompute.ru/solutions/containers-ci-cd/>

Готовы запустить?

Запустить GPU-сервер