Решения
RAPIDS на GPU: cuDF/cuML и ускорение ETL
Задача страницы. Инженерный гид по GPU‑ETL и «классическому» ML на базе RAPIDS: ускоренная обработка таблиц (cuDF), распределённые пайплайны (Dask‑CUDA), обучение/инференс «деревьев/линейных/кластеризации» (cuML). Разберём типовые архитектуры single‑/multi‑GPU, конфиги, RMM‑пулы, UCX, метрики p50/p95, экономику и траблшутинг. Все примеры ориентированы на NVMe‑кэш, режимы On‑Demand (интерактив/ноутбуки/онлайн‑фичи) и Interruptible (ночные батчи/переиндекс/фиче‑инжиниринг).
TL;DR
- Паттерны:
Single‑GPU — быстрый прототип/интерактив: cuDF + cuML + RMM pool.
Multi‑GPU — батчи/большие датасеты: Dask‑cuDF + Dask‑CUDA (UCX, NVLink/IB), spill на NVMe. - Ключевые узлы: RMM‑пул (70–80% HBM), NVMe для временных артефактов, UCX‑транспорт, партиционирование по ключу (set_index), явное persist().
- ETL‑операции: чтение Parquet/ORC → фильтры → merge/join → groupby/agg/window → categorify/encode → write Parquet; для ML — train/test split → cuML (RF/GBT/LogReg/KMeans).
- Метрики/наблюдаемость: etl_throughput_gib_s, cudf_op_seconds{op=…}, rmm_current_bytes, dask_task_duration_seconds, spill_bytes, gpu_utilization, p50/p95.
- Экономика: T_etl ≈ S_data / (N_gpu × BW_eff), Cost ≈ c_gpu × (T_etl+T_train). Планирование — см. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
- Безопасность: PII‑маскирование, шифрование «на диске/в канале», TTL на временных данных, изоляция пулов по тенантам.
Сценарии (когда это нужно)
- E‑commerce/аналитика: ежедневные батчи фичей, аггрегации, слияние логов и каталогов, быстрый A/B‑анализ.
- Рекомендации/реклама: отрицательный семплинг, подготовка пар «user‑item», построение эмбеддингов, оффлайн‑пересчёт индексов (см. https://cloudcompute.ru/solutions/recsys/).
- NLP/мультимедиа‑таблицы: массовая обработка метаданных, статистики, подготовка «манифестов» для downstream пайплайнов (ASR/диаризация/музыка).
- Оперативная BI: интерактивные ноутбуки на single‑GPU, выборки и джойны GiB‑масштаба без «часовых» ожиданий.
Архитектуры и пайплайны

1) Single‑GPU: быстрый ETL + ML (ноутбук/сервис)
NVMe (Parquet) ──> cuDF.read_parquet
└─> Filter/Project/FillNA
└─> Join/Merge (keys)
└─> GroupBy/Agg/Windows
└─> Feature Encode/Scale
├─> Write Parquet (NVMe)
└─> cuML fit/predict
Особенности: RMM‑pool 70–80% HBM; избегайте лишних to_pandas(); для больших merge — упорядочивать ключи и контролировать кардинальности.
2) Multi‑GPU: Dask‑cuDF + Dask‑CUDA (батчи/кластеры)
Object/NVMe ──> dask_cudf.read_parquet (many partitions)
└─> Map/Filter (per‑partition)
└─> set_index/shuffle (by key)
└─> Join/GroupBy (distributed)
└─> Persist/Checkpoint
├─> dask_cuml fit/predict
└─> write_parquet (repartition)
Особенности: protocol=»ucx», включайте NVLink/IB; контролируйте spill на NVMe, —rmm-pool-size; тяжёлые шифлы — ночью (Interruptible).
3) Гибрид: Online‑фичи + Offline‑батчи
Kafka/Stream ─> Micro‑batch to NVMe ─> Dask‑cuDF ingest ─> Update Feature Store ─> Training Windows ─> Export to Serving
Сервинг моделей см. https://cloudcompute.ru/solutions/triton-inference-server/
Профили GPU и ориентиры
Ниже — инженерные ориентиры для оценки пропускной способности ETL (на потоке операций «фильтр→джойн→аггрегация» при Parquet на NVMe). Для вашего корпуса и схемы данных обязательно снимайте фактические метрики.
Профиль GPU | Память | Типовой ETL‑стек | Оценка пропускной способности* | Комментарии |
24 ГБ (Compact) | 24 ГБ | cuDF single‑GPU | порядка 0.5–1.5 GiB/s | Быстрый интерактив, средние джойны. |
48 ГБ (Balanced) | 48 ГБ | Dask‑cuDF (2–4 GPU) | порядка 1–3 GiB/s | Батчи, широкие таблицы, контроль шифлов. |
80 ГБ (HQ) | 80 ГБ | Dask‑cuDF (4–8 GPU) | порядка 2–5 GiB/s | Тяжёлые джойны/группировки, большие окна. |
* Итог зависит от кардинальностей ключей, ширины строк, компрессии, NVMe/сети и параметров шифла.
Тюнинг/профилировка: https://cloudcompute.ru/solutions/performance-tuning/ • https://cloudcompute.ru/solutions/throughput-vs-latency/ • https://cloudcompute.ru/solutions/multi-gpu/
Конфиги и скелеты кода
Docker Compose (ноутбук + Dask‑кластер)
version: "3.9"
x-common-env: &env
RMM_POOL_SIZE: "70%" # пул памяти RMM
DASK_PROTOCOL: "ucx" # ucx|tcp
CUDA_VISIBLE_DEVICES: "0,1" # укажите ваши GPU
UCX_TLS: "tcp,cuda_copy,cuda_ipc"
UCX_SOCKADDR_TLS_PRIORITY: "tcp"
WORKER_SPILL_DIR: "/nvme/spill"
services:
rapids-notebook:
image: cloudcompute/rapids-notebook:latest
environment:
<<: *env
deploy:
resources:
reservations:
devices: [{ capabilities: ["gpu"] }]
ports: ["8888:8888"]
volumes:
- /nvme/datasets:/data
- /nvme/spill:/nvme/spill
command: ["jupyter", "lab", "--ip=0.0.0.0", "--allow-root"]
dask-scheduler:
image: cloudcompute/dask-cuda:latest
environment:
<<: *env
ports: ["8786:8786","8787:8787"]
command: ["dask-scheduler"]
dask-worker:
image: cloudcompute/dask-cuda:latest
depends_on: [dask-scheduler]
environment:
<<: *env
deploy:
resources:
reservations:
devices: [{ capabilities: ["gpu"] }]
volumes:
- /nvme/spill:/nvme/spill
command:
["dask-cuda-worker","dask-scheduler:8786",
"--rmm-pool-size","${RMM_POOL_SIZE}",
"--local-directory","/nvme/spill",
"--enable-tcp-over-ucx","--enable-cuda-ipc"]
K8s (Dask‑CUDA: планировщик и воркеры с GPU)
apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-scheduler }
spec:
replicas: 1
selector: { matchLabels: { app: dask-scheduler } }
template:
metadata: { labels: { app: dask-scheduler } }
spec:
containers:
- name: scheduler
image: cloudcompute/dask-cuda:latest
ports: [{ containerPort: 8786 }, { containerPort: 8787 }]
resources: { limits: { cpu: "2", memory: "4Gi" } }
---
apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-worker }
spec:
replicas: 2
selector: { matchLabels: { app: dask-worker } }
template:
metadata: { labels: { app: dask-worker } }
spec:
containers:
- name: worker
image: cloudcompute/dask-cuda:latest
env:
- { name: RMM_POOL_SIZE, value: "70%" }
command:
["dask-cuda-worker","dask-scheduler:8786",
"--rmm-pool-size","70%","--local-directory","/nvme/spill",
"--enable-tcp-over-ucx","--enable-cuda-ipc"]
volumeMounts:
- { name: spill, mountPath: /nvme/spill }
resources:
limits: { nvidia.com/gpu: 1, cpu: "4", memory: "24Gi" }
volumes:
- name: spill
hostPath: { path: /nvme/spill }
Конфиг пайплайна (YAML)
etl:
input:
path: "/data/events/*.parquet"
columns: ["user_id","item_id","ts","price","category"]
preprocess:
filter:
ts_from: "2025-01-01"
fillna:
price: 0.0
join:
right_path: "/data/catalog/*.parquet"
keys: ["item_id"]
how: "left"
aggregations:
groupby: ["user_id","category"]
metrics:
- { col: "price", op: "sum", as: "spend_sum" }
- { col: "item_id", op: "count", as: "views" }
encode:
categorify: ["category"]
output:
path: "/data/features/dt=2025-08-28/"
repartition: 256
ml:
model: "rf_classifier" # rf_classifier|logreg|kmeans
target: "label"
features: ["spend_sum","views","category_enc"]
train_ratio: 0.8
Python: single‑GPU (cuDF + cuML)
import cudf, cupy as cp
from cuml.model_selection import train_test_split
from cuml.ensemble import RandomForestClassifier
from rmm import reinitialize
# Инициализация RMM-пула (~70% HBM)
reinitialize(pool_allocator=True, initial_pool_size=int(0.7 * cp.cuda.runtime.memGetInfo()[1]))
# ETL
events = cudf.read_parquet("/data/events/*.parquet", columns=["user_id","item_id","ts","price","category"])
events = events[events["ts"] >= "2025-01-01"].fillna({"price": 0.0})
catalog = cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
df = events.merge(catalog, on="item_id", how="left")
feat = (df.groupby(["user_id","category"])
.agg({"price":"sum","item_id":"count"})
.rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
.reset_index())
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
feat.to_parquet("/data/features/2025-08-28/")
# ML
X = feat[["spend_sum","views","category_enc"]].astype("float32")
y = (feat["spend_sum"] > 0).astype("int32") # пример таргета
X_tr, X_te, y_tr, y_te = train_test_split(X, y, train_size=0.8, random_state=42)
clf = RandomForestClassifier(n_estimators=200, max_depth=12)
clf.fit(X_tr, y_tr)
print("AUC-like proxy:", float((clf.predict_proba(X_te)[:,1] > 0.5).astype("int32").mean()))
Python: Multi‑GPU (Dask‑cuDF + Dask‑CUDA + cuML)
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import dask_cudf, cudf
from cuml.dask.ensemble import RandomForestClassifier as DaskRF
cluster = LocalCUDACluster(protocol="ucx", enable_tcp_over_ucx=True,
enable_nvlink=True, rmm_pool_size="70%")
client = Client(cluster)
dfe = dask_cudf.read_parquet("/data/events/*.parquet",
columns=["user_id","item_id","ts","price","category"])
dfe = dfe[dfe["ts"] >= "2025-01-01"].fillna({"price": 0.0})
dfc = dask_cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
# Шифл по ключу для эффективного merge
dfe = dfe.set_index("item_id", shuffle="tasks")
dfc = dfc.set_index("item_id", shuffle="tasks")
df = dfe.merge(dfc, left_index=True, right_index=True, how="left")
feat = (df.reset_index()
.groupby(["user_id","category"])
.agg({"price":"sum","item_id":"count"})
.rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
.reset_index())
feat = feat.persist(); wait(feat)
# Категории → коды (per-partition)
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
# Разделение на обучение/тест (простое разбиение по hash user_id)
train = feat[feat["user_id"].hash_values() % 5 != 0].persist()
test = feat[feat["user_id"].hash_values() % 5 == 0].persist(); wait([train, test])
X_tr = train[["spend_sum","views","category_enc"]].astype("float32")
y_tr = (train["spend_sum"] > 0).astype("int32")
X_te = test[["spend_sum","views","category_enc"]].astype("float32")
y_te = (test["spend_sum"] > 0).astype("int32")
rf = DaskRF(n_estimators=400, max_depth=14)
rf.fit(X_tr, y_tr)
pred = rf.predict_proba(X_te)[:,1]
print("p50_partitions:", client.run_on_scheduler(lambda: "ok")) # заглушка метрик


Наблюдаемость, метрики, алерты
Latency/Perf (Prometheus‑стиль):
- rapids_cudf_op_seconds{op=read|merge|groupby|write} — длительность ключевых операций (p50/p95).
- rapids_etl_throughput_gib_s — GiB/с по пайплайну.
- rapids_cuml_fit_seconds{model=rf|logreg|kmeans} — обучение; rapids_cuml_predict_seconds.
- dask_task_duration_seconds{type=shuffle|map|reduce}; dask_scheduler_queue_len.
- rmm_current_bytes, rmm_peak_bytes; spill_bytes (на NVMe).
- gpu_utilization, gpu_memory_bytes, nvme_{read,write}_mb_s.
Quality/ML:
- ml_auc, ml_accuracy, ml_f1, ml_calibration_ece — оффлайн.
Алерты (примеры):
- rmm_current_bytes/HBM > 0.8 — увеличить spill/уменьшить партиции/снизить ширину строк.
- dask_task_duration_p95{type=shuffle} ↑ — перегрузка шифла; перенести job в Interruptible, увеличить GPU/пул NVMe.
- spill_bytes_p95 ↑ — не хватает HBM; пересобрать RMM‑pool, оптимизировать join/категории.
- rapids_cudf_op_seconds{op=merge}_p95 ↑ — перекос ключей; ребаланс set_index, предварительные categorify.
Дашборды и логирование: https://cloudcompute.ru/solutions/monitoring-logging/ • https://cloudcompute.ru/solutions/llm-inference/observability/
Экономика и формулы
Обозначения: S_data — объём входных данных (GiB), N_gpu — число GPU, BW_eff — эффективная пропускная (GiB/с/GPU), c_gpu — цена/час, U — целевая загрузка.
- Время ETL: T_etl ≈ S_data / (N_gpu × BW_eff).
- Время тренинга: T_train ≈ N_iter × t_iter / (N_gpu × U) (условно; зависит от модели).
- Стоимость: Cost_total ≈ c_gpu × (T_etl + T_train).
- On‑Demand vs Interruptible: для тяжёлых шифлов переносите в Interruptible; Cost_per_1T_rows считайте через фактический etl_throughput_gib_s.
Планирование бюджета: https://cloudcompute.ru/solutions/cost-planner/ • компромиссы: https://cloudcompute.ru/solutions/throughput-vs-latency/
Безопасность/политики
- PII/комплаенс: маскирование полей на этапе ETL; раздельные бакеты для сырых/очищенных данных.
- Шифрование: «в канале» (Ingress/межузловые) и «на диске» (NVMe spill/артефакты).
- Ретеншн: TTL для временных файлов/спиллов; аудит доступа к фичестору.
- Изоляция: отдельные GPU‑пулы по тенантам/задачам; quotas на Dask‑кластер.
Подробнее: https://cloudcompute.ru/solutions/security/ • https://cloudcompute.ru/solutions/storage-data/
Траблшутинг
Симптом | Возможная причина | Решение |
CUDA OOM при merge/groupby | Узкие партиции, большие промежуточные | Увеличить RMM‑pool, repartition, set_index по ключу, включить spill |
Шифл «висит»/очень долгий | Высокая кардинальность/скос ключей | Пред‑аггрегация, shuffle=»p2p»/задачами, ключ с меньшей кардинальностью |
Сплошной spill на NVMe | Завышенный batch/ширина строк | Уменьшить число колонок, astype к компактным типам, частичная аггрегация |
Нестабильные воркеры | Перегрев HBM/пики задач | Снизить параллелизм, лимиты по памяти, увеличить —rmm-pool-size |
UCX/сетевые ошибки | Неверные флаги/сеть | Переключиться на TCP, проверить enable_nvlink/enable_tcp_over_ucx, MTU |
Несогласованность типов | Смешанные dtypes между файлами | Явные схемы, astype перед merge/write, валидация схемы |
Медленный write Parquet | Мелкие партиции/много файлов | repartition до разумного числа, увеличить row_group_size |
Модель «переучилась» | Данные утекли/дисбаланс | Time‑split, регуляризация, калибровка, контроль дрифта |
Ещё по устойчивости пайплайнов: https://cloudcompute.ru/solutions/interruptible-patterns/ • тюнинг: https://cloudcompute.ru/solutions/performance-tuning/
Как запустить в cloudcompute.ru
- Откройте Шаблоны запусков: https://cloudcompute.ru/solutions/templates/ — выберите RAPIDS ETL (Single‑GPU) или RAPIDS + Dask‑CUDA (Multi‑GPU).
- Выберите профиль GPU: 24/48/80 ГБ — по объёму данных/окнам/джойнам.
- Смонтируйте диски: /nvme/datasets, /nvme/spill, при необходимости — /nvme/checkpoints.
- Заполните переменные окружения (RMM‑pool, протокол UCX/TCP, spill‑директория) как в docker-compose.yml.
- Для продакшна: автоскейл по очереди/использованию, дашборды/алерты, канареечные обновления пайплайна.
Чек‑лист перед продом
- Замерены etl_throughput_gib_s, p50/p95 ключевых операций (read/merge/groupby/write).
- RMM‑pool 70–80% HBM; spill на NVMe протестирован.
- Партиционирование/set_index по ключам настроено; шифлы устойчивы.
- Версионирование схемы/фичей и проверка типов включены.
- Алерты по rmm_current_bytes, spill_bytes, task_duration_p95, gpu_mem_peak.
- Политики PII/ретеншн/шифрование — внедрены.
- Нагрузочный прогон ≥ 30 мин с целевыми p95; план отката готов.
Навигация
- Хаб «Решения»: https://cloudcompute.ru/solutions/
- Рекомендательные системы: https://cloudcompute.ru/solutions/recsys/
- Хранилища и данные: https://cloudcompute.ru/solutions/storage-data/
- Производительность и тюнинг: https://cloudcompute.ru/solutions/performance-tuning/
- Throughput vs Latency: https://cloudcompute.ru/solutions/throughput-vs-latency/
- Multi‑GPU: https://cloudcompute.ru/solutions/multi-gpu/
- Планирование стоимости: https://cloudcompute.ru/solutions/cost-planner/
- Мониторинг и логи: https://cloudcompute.ru/solutions/monitoring-logging/
- Наблюдаемость инференса/ETL: https://cloudcompute.ru/solutions/llm-inference/observability/
- Interruptible‑паттерны: https://cloudcompute.ru/solutions/interruptible-patterns/
- Triton Inference Server (сервинг моделей): https://cloudcompute.ru/solutions/triton-inference-server/
- Gradio + FastAPI (примеры UI/эндпойнтов): https://cloudcompute.ru/solutions/gradio-fastapi/
- CI/CD контейнеров: https://cloudcompute.ru/solutions/containers-ci-cd/