RAPIDS на GPU: cuDF/cuML и ускорение ETL
Задача страницы. Инженерный гид по GPU‑ETL и «классическому» ML на базе RAPIDS: ускоренная обработка таблиц (cuDF), распределённые пайплайны (Dask‑CUDA), обучение/инференс «деревьев/линейных/кластеризации» (cuML). Разберём типовые архитектуры single‑/multi‑GPU, конфиги, RMM‑пулы, UCX, метрики p50/p95, экономику и траблшутинг. Все примеры ориентированы на NVMe‑кэш, режимы On‑Demand (интерактив/ноутбуки/онлайн‑фичи) и Interruptible (ночные батчи/переиндекс/фиче‑инжиниринг).
TL;DR
- **Паттерны: Single‑GPU — быстрый прототип/интерактив: cuDF + cuML + RMM pool. Multi‑GPU — батчи/большие датасеты: Dask‑cuDF + Dask‑CUDA (UCX, NVLink/IB), spill на NVMe.
- Ключевые узлы: RMM‑пул (70–80% HBM), NVMe для временных артефактов, UCX‑транспорт, партиционирование по ключу (set_index), явное persist().
- ETL‑операции: чтение Parquet/ORC → фильтры → merge/join → groupby/agg/window → categorify/encode → write Parquet; для ML — train/test split → cuML (RF/GBT/LogReg/KMeans).
- Метрики/наблюдаемость: etl_throughput_gib_s, cudf_op_seconds{op=...}, rmm_current_bytes, dask_task_duration_seconds, spill_bytes, gpu_utilization, p50/p95.
- Экономика: T_etl ≈ S_data / (N_gpu × BW_eff), Cost ≈ c_gpu × (T_etl+T_train). Планирование — см. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
- Безопасность: PII‑маскирование, шифрование «на диске/в канале», TTL на временных данных, изоляция пулов по тенантам.
Сценарии (когда это нужно)
- E‑commerce/аналитика: ежедневные батчи фичей, аггрегации, слияние логов и каталогов, быстрый A/B‑анализ.
- Рекомендации/реклама: отрицательный семплинг, подготовка пар «user‑item», построение эмбеддингов, оффлайн‑пересчёт индексов (см. https://cloudcompute.ru/solutions/recsys/).
- NLP/мультимедиа‑таблицы: массовая обработка метаданных, статистики, подготовка «манифестов» для downstream пайплайнов (ASR/диаризация/музыка).
- Оперативная BI: интерактивные ноутбуки на single‑GPU, выборки и джойны GiB‑масштаба без «часовых» ожиданий.
Архитектуры и пайплайны 1) Single‑GPU: быстрый ETL + ML (ноутбук/сервис)
NVMe (Parquet) ──> cuDF.read_parquet
└─> Filter/Project/FillNA
└─> Join/Merge (keys)
└─> GroupBy/Agg/Windows
└─> Feature Encode/Scale
├─> Write Parquet (NVMe)
└─> cuML fit/predict
Особенности: RMM‑pool 70–80% HBM; избегайте лишних to_pandas(); для больших merge — упорядочивать ключи и контролировать кардинальности.
2) Multi‑GPU: Dask‑cuDF + Dask‑CUDA (батчи/кластеры)
Object/NVMe ──> dask_cudf.read_parquet (many partitions)
└─> Map/Filter (per‑partition)
└─> set_index/shuffle (by key)
└─> Join/GroupBy (distributed)
└─> Persist/Checkpoint
├─> dask_cuml fit/predict
└─> write_parquet (repartition)
Особенности: protocol="ucx", включайте NVLink/IB; контролируйте spill на NVMe, --rmm-pool-size; тяжёлые шифлы — ночью (Interruptible).
3) Гибрид: Online‑фичи + Offline‑батчи
Kafka/Stream ─> Micro‑batch to NVMe ─> Dask‑cuDF ingest ─> Update Feature Store ─> Training Windows ─> Export to Serving
Сервинг моделей см. https://cloudcompute.ru/solutions/triton-inference-server/
Профили GPU и ориентиры
Ниже — инженерные ориентиры для оценки пропускной способности ETL (на потоке операций «фильтр→джойн→аггрегация» при Parquet на NVMe). Для вашего корпуса и схемы данных обязательно снимайте фактические метрики.
| **Профиль GPU** | **Память** | **Типовой ETL‑стек** | **Оценка пропускной способности\*** | **Комментарии** |
| 24 ГБ (Compact) | 24 ГБ | cuDF single‑GPU | порядка **0.5–1.5 GiB/s** | Быстрый интерактив, средние джойны. |
| 48 ГБ (Balanced) | 48 ГБ | Dask‑cuDF (2–4 GPU) | порядка **1–3 GiB/s** | Батчи, широкие таблицы, контроль шифлов. |
| 80 ГБ (HQ) | 80 ГБ | Dask‑cuDF (4–8 GPU) | порядка **2–5 GiB/s** | Тяжёлые джойны/группировки, большие окна. |
* Итог зависит от кардинальностей ключей, ширины строк, компрессии, NVMe/сети и параметров шифла.
Тюнинг/профилировка: https://cloudcompute.ru/solutions/performance-tuning/ • https://cloudcompute.ru/solutions/throughput-vs-latency/ • https://cloudcompute.ru/solutions/multi-gpu/
Конфиги и скелеты кода
Docker Compose (ноутбук + Dask‑кластер)
version: "3.9"
x-common-env: &env
RMM_POOL_SIZE: "70%" # пул памяти RMM
DASK_PROTOCOL: "ucx" # ucx|tcp
CUDA_VISIBLE_DEVICES: "0,1" # укажите ваши GPU
UCX_TLS: "tcp,cuda_copy,cuda_ipc"
UCX_SOCKADDR_TLS_PRIORITY: "tcp"
WORKER_SPILL_DIR: "/nvme/spill"
services:
rapids-notebook:
image: cloudcompute/rapids-notebook:latest
environment:
<<: *env
deploy:
resources:
reservations:
devices: [{ capabilities: ["gpu"] }]
ports: ["8888:8888"]
volumes:
- /nvme/datasets:/data
- /nvme/spill:/nvme/spill
command: ["jupyter", "lab", "--ip=0.0.0.0", "--allow-root"]
dask-scheduler:
image: cloudcompute/dask-cuda:latest
environment:
<<: *env
ports: ["8786:8786","8787:8787"]
command: ["dask-scheduler"]
dask-worker:
image: cloudcompute/dask-cuda:latest
depends_on: [dask-scheduler]
environment:
<<: *env
deploy:
resources:
reservations:
devices: [{ capabilities: ["gpu"] }]
volumes:
- /nvme/spill:/nvme/spill
command:
["dask-cuda-worker","dask-scheduler:8786",
"--rmm-pool-size","${RMM_POOL_SIZE}",
"--local-directory","/nvme/spill",
"--enable-tcp-over-ucx","--enable-cuda-ipc"]
K8s (Dask‑CUDA: планировщик и воркеры с GPU)
apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-scheduler }
spec:
replicas: 1
selector: { matchLabels: { app: dask-scheduler } }
template:
metadata: { labels: { app: dask-scheduler } }
spec:
containers:
- name: scheduler
image: cloudcompute/dask-cuda:latest
ports: [{ containerPort: 8786 }, { containerPort: 8787 }]
resources: { limits: { cpu: "2", memory: "4Gi" } }
---
apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-worker }
spec:
replicas: 2
selector: { matchLabels: { app: dask-worker } }
template:
metadata: { labels: { app: dask-worker } }
spec:
containers:
- name: worker
image: cloudcompute/dask-cuda:latest
env:
- { name: RMM_POOL_SIZE, value: "70%" }
command:
["dask-cuda-worker","dask-scheduler:8786",
"--rmm-pool-size","70%","--local-directory","/nvme/spill",
"--enable-tcp-over-ucx","--enable-cuda-ipc"]
volumeMounts:
- { name: spill, mountPath: /nvme/spill }
resources:
limits: { nvidia.com/gpu: 1, cpu: "4", memory: "24Gi" }
volumes:
- name: spill
hostPath: { path: /nvme/spill }
Конфиг пайплайна (YAML)
etl:
input:
path: "/data/events/*.parquet"
columns: ["user_id","item_id","ts","price","category"]
preprocess:
filter:
ts_from: "2025-01-01"
fillna:
price: 0.0
join:
right_path: "/data/catalog/*.parquet"
keys: ["item_id"]
how: "left"
aggregations:
groupby: ["user_id","category"]
metrics:
- { col: "price", op: "sum", as: "spend_sum" }
- { col: "item_id", op: "count", as: "views" }
encode:
categorify: ["category"]
output:
path: "/data/features/dt=2025-08-28/"
repartition: 256
ml:
model: "rf_classifier" # rf_classifier|logreg|kmeans
target: "label"
features: ["spend_sum","views","category_enc"]
train_ratio: 0.8
Python: single‑GPU (cuDF + cuML)
import cudf, cupy as cp
from cuml.model_selection import train_test_split
from cuml.ensemble import RandomForestClassifier
from rmm import reinitialize
# Инициализация RMM-пула (~70% HBM)
reinitialize(pool_allocator=True, initial_pool_size=int(0.7 * cp.cuda.runtime.memGetInfo()[1]))
# ETL
events = cudf.read_parquet("/data/events/*.parquet", columns=["user_id","item_id","ts","price","category"])
events = events[events["ts"] >= "2025-01-01"].fillna({"price": 0.0})
catalog = cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
df = events.merge(catalog, on="item_id", how="left")
feat = (df.groupby(["user_id","category"])
.agg({"price":"sum","item_id":"count"})
.rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
.reset_index())
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
feat.to_parquet("/data/features/2025-08-28/")
# ML
X = feat[["spend_sum","views","category_enc"]].astype("float32")
y = (feat["spend_sum"] > 0).astype("int32") # пример таргета
X_tr, X_te, y_tr, y_te = train_test_split(X, y, train_size=0.8, random_state=42)
clf = RandomForestClassifier(n_estimators=200, max_depth=12)
clf.fit(X_tr, y_tr)
print("AUC-like proxy:", float((clf.predict_proba(X_te)[:,1] > 0.5).astype("int32").mean()))
Python: Multi‑GPU (Dask‑cuDF + Dask‑CUDA + cuML)
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import dask_cudf, cudf
from cuml.dask.ensemble import RandomForestClassifier as DaskRF
cluster = LocalCUDACluster(protocol="ucx", enable_tcp_over_ucx=True,
enable_nvlink=True, rmm_pool_size="70%")
client = Client(cluster)
dfe = dask_cudf.read_parquet("/data/events/*.parquet",
columns=["user_id","item_id","ts","price","category"])
dfe = dfe[dfe["ts"] >= "2025-01-01"].fillna({"price": 0.0})
dfc = dask_cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
# Шифл по ключу для эффективного merge
dfe = dfe.set_index("item_id", shuffle="tasks")
dfc = dfc.set_index("item_id", shuffle="tasks")
df = dfe.merge(dfc, left_index=True, right_index=True, how="left")
feat = (df.reset_index()
.groupby(["user_id","category"])
.agg({"price":"sum","item_id":"count"})
.rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
.reset_index())
feat = feat.persist(); wait(feat)
# Категории → коды (per-partition)
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
# Разделение на обучение/тест (простое разбиение по hash user_id)
train = feat[feat["user_id"].hash_values() % 5 != 0].persist()
test = feat[feat["user_id"].hash_values() % 5 == 0].persist(); wait([train, test])
X_tr = train[["spend_sum","views","category_enc"]].astype("float32")
y_tr = (train["spend_sum"] > 0).astype("int32")
X_te = test[["spend_sum","views","category_enc"]].astype("float32")
y_te = (test["spend_sum"] > 0).astype("int32")
rf = DaskRF(n_estimators=400, max_depth=14)
rf.fit(X_tr, y_tr)
pred = rf.predict_proba(X_te)[:,1]
print("p50_partitions:", client.run_on_scheduler(lambda: "ok")) # заглушка метрик
``` **Наблюдаемость, метрики, алерты**
**Latency/Perf (Prometheus‑стиль):**
- rapids\_cudf\_op\_seconds{op=read|merge|groupby|write} — длительность ключевых операций (p50/p95).
- rapids\_etl\_throughput\_gib\_s — GiB/с по пайплайну.
- rapids\_cuml\_fit\_seconds{model=rf|logreg|kmeans} — обучение; rapids\_cuml\_predict\_seconds.
- dask\_task\_duration\_seconds{type=shuffle|map|reduce}; dask\_scheduler\_queue\_len.
- rmm\_current\_bytes, rmm\_peak\_bytes; spill\_bytes (на NVMe).
- gpu\_utilization, gpu\_memory\_bytes, nvme\_{read,write}\_mb\_s.
**Quality/ML:**
- ml\_auc, ml\_accuracy, ml\_f1, ml\_calibration\_ece — оффлайн.
**Алерты (примеры):**
- rmm\_current\_bytes/HBM > 0.8 — увеличить spill/уменьшить партиции/снизить ширину строк.
- dask\_task\_duration\_p95{type=shuffle} ↑ — перегрузка шифла; перенести job в Interruptible, увеличить GPU/пул NVMe.
- spill\_bytes\_p95 ↑ — не хватает HBM; пересобрать RMM‑pool, оптимизировать join/категории.
- rapids\_cudf\_op\_seconds{op=merge}\_p95 ↑ — перекос ключей; ребаланс set\_index, предварительные categorify.
Дашборды и логирование: <https://cloudcompute.ru/solutions/monitoring-logging/> • <https://cloudcompute.ru/solutions/llm-inference/observability/>
## **Экономика и формулы**
Обозначения: S\_data — объём входных данных (GiB), N\_gpu — число GPU, BW\_eff — эффективная пропускная (GiB/с/GPU), c\_gpu — цена/час, U — целевая загрузка.
- **Время ETL:** T\_etl ≈ S\_data / (N\_gpu × BW\_eff).
- **Время тренинга:** T\_train ≈ N\_iter × t\_iter / (N\_gpu × U) (условно; зависит от модели).
- **Стоимость:** Cost\_total ≈ c\_gpu × (T\_etl + T\_train).
- **On‑Demand vs Interruptible:** для тяжёлых шифлов переносите в Interruptible; Cost\_per\_1T\_rows считайте через фактический etl\_throughput\_gib\_s.
Планирование бюджета: <https://cloudcompute.ru/solutions/cost-planner/> • компромиссы: <https://cloudcompute.ru/solutions/throughput-vs-latency/>
**Безопасность/политики**
- **PII/комплаенс:** маскирование полей на этапе ETL; раздельные бакеты для сырых/очищенных данных.
- **Шифрование:** «в канале» (Ingress/межузловые) и «на диске» (NVMe spill/артефакты).
- **Ретеншн:** TTL для временных файлов/спиллов; аудит доступа к фичестору.
- **Изоляция:** отдельные GPU‑пулы по тенантам/задачам; quotas на Dask‑кластер.
Подробнее: <https://cloudcompute.ru/solutions/security/> • <https://cloudcompute.ru/solutions/storage-data/>
**Траблшутинг**
<table><tbody><tr><td>**Симптом**
</td><td>**Возможная причина**
</td><td>**Решение**
</td></tr><tr><td>CUDA OOM при merge/groupby
</td><td>Узкие партиции, большие промежуточные
</td><td>Увеличить RMM‑pool, repartition, set\_index по ключу, включить spill
</td></tr><tr><td>Шифл «висит»/очень долгий
</td><td>Высокая кардинальность/скос ключей
</td><td>Пред‑аггрегация, shuffle="p2p"/задачами, ключ с меньшей кардинальностью
</td></tr><tr><td>Сплошной spill на NVMe
</td><td>Завышенный batch/ширина строк
</td><td>Уменьшить число колонок, astype к компактным типам, частичная аггрегация
</td></tr><tr><td>Нестабильные воркеры
</td><td>Перегрев HBM/пики задач
</td><td>Снизить параллелизм, лимиты по памяти, увеличить --rmm-pool-size
</td></tr><tr><td>UCX/сетевые ошибки
</td><td>Неверные флаги/сеть
</td><td>Переключиться на TCP, проверить enable\_nvlink/enable\_tcp\_over\_ucx, MTU
</td></tr><tr><td>Несогласованность типов
</td><td>Смешанные dtypes между файлами
</td><td>Явные схемы, astype перед merge/write, валидация схемы
</td></tr><tr><td>Медленный write Parquet
</td><td>Мелкие партиции/много файлов
</td><td>repartition до разумного числа, увеличить row\_group\_size
</td></tr><tr><td>Модель «переучилась»
</td><td>Данные утекли/дисбаланс
</td><td>Time‑split, регуляризация, калибровка, контроль дрифта
</td></tr></tbody></table>
Ещё по устойчивости пайплайнов: <https://cloudcompute.ru/solutions/interruptible-patterns/> • тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>
**Как запустить в cloudcompute.ru**
1. Откройте **Шаблоны запусков**: <https://cloudcompute.ru/solutions/templates/> — выберите **RAPIDS ETL (Single‑GPU)** или **RAPIDS + Dask‑CUDA (Multi‑GPU)**.
2. Выберите профиль GPU: **24/48/80 ГБ** — по объёму данных/окнам/джойнам.
3. Смонтируйте диски: /nvme/datasets, /nvme/spill, при необходимости — /nvme/checkpoints.
4. Заполните переменные окружения (RMM‑pool, протокол UCX/TCP, spill‑директория) как в docker-compose.yml.
5. Для продакшна: автоскейл по очереди/использованию, дашборды/алерты, канареечные обновления пайплайна.
**Чек‑лист перед продом**
- Замерены etl\_throughput\_gib\_s, p50/p95 ключевых операций (read/merge/groupby/write).
- RMM‑pool 70–80% HBM; spill на NVMe протестирован.
- Партиционирование/set\_index по ключам настроено; шифлы устойчивы.
- Версионирование схемы/фичей и проверка типов включены.
- Алерты по rmm\_current\_bytes, spill\_bytes, task\_duration\_p95, gpu\_mem\_peak.
- Политики PII/ретеншн/шифрование — внедрены.
- Нагрузочный прогон ≥ 30 мин с целевыми p95; план отката готов.
**Навигация**
- Хаб «Решения»: <https://cloudcompute.ru/solutions/>
- Рекомендательные системы: <https://cloudcompute.ru/solutions/recsys/>
- Хранилища и данные: <https://cloudcompute.ru/solutions/storage-data/>
- Производительность и тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>
- Throughput vs Latency: <https://cloudcompute.ru/solutions/throughput-vs-latency/>
- Multi‑GPU: <https://cloudcompute.ru/solutions/multi-gpu/>
- Планирование стоимости: <https://cloudcompute.ru/solutions/cost-planner/>
- Мониторинг и логи: <https://cloudcompute.ru/solutions/monitoring-logging/>
- Наблюдаемость инференса/ETL: <https://cloudcompute.ru/solutions/llm-inference/observability/>
- Interruptible‑паттерны: <https://cloudcompute.ru/solutions/interruptible-patterns/>
- Triton Inference Server (сервинг моделей): <https://cloudcompute.ru/solutions/triton-inference-server/>
- Gradio + FastAPI (примеры UI/эндпойнтов): <https://cloudcompute.ru/solutions/gradio-fastapi/>
- CI/CD контейнеров: <https://cloudcompute.ru/solutions/containers-ci-cd/>
Готовы запустить?
Запустить GPU-сервер