Решения

RAPIDS на GPU: cuDF/cuML и ускорение ETL

Задача страницы. Инженерный гид по GPU‑ETL и «классическому» ML на базе RAPIDS: ускоренная обработка таблиц (cuDF), распределённые пайплайны (Dask‑CUDA), обучение/инференс «деревьев/линейных/кластеризации» (cuML). Разберём типовые архитектуры single‑/multi‑GPU, конфиги, RMM‑пулы, UCX, метрики p50/p95, экономику и траблшутинг. Все примеры ориентированы на NVMe‑кэш, режимы On‑Demand (интерактив/ноутбуки/онлайн‑фичи) и Interruptible (ночные батчи/переиндекс/фиче‑инжиниринг).

TL;DR

  • Паттерны:
    Single‑GPU — быстрый прототип/интерактив: cuDF + cuML + RMM pool.
    Multi‑GPU — батчи/большие датасеты: Dask‑cuDF + Dask‑CUDA (UCX, NVLink/IB), spill на NVMe.
  • Ключевые узлы: RMM‑пул (70–80% HBM), NVMe для временных артефактов, UCX‑транспорт, партиционирование по ключу (set_index), явное persist().
  • ETL‑операции: чтение Parquet/ORC → фильтры → merge/joingroupby/agg/windowcategorify/encode → write Parquet; для ML — train/test split → cuML (RF/GBT/LogReg/KMeans).
  • Метрики/наблюдаемость: etl_throughput_gib_s, cudf_op_seconds{op=…}, rmm_current_bytes, dask_task_duration_seconds, spill_bytes, gpu_utilization, p50/p95.
  • Экономика: T_etl ≈ S_data / (N_gpu × BW_eff), Cost ≈ c_gpu × (T_etl+T_train). Планирование — см. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
  • Безопасность: PII‑маскирование, шифрование «на диске/в канале», TTL на временных данных, изоляция пулов по тенантам.

Сценарии (когда это нужно)

  • E‑commerce/аналитика: ежедневные батчи фичей, аггрегации, слияние логов и каталогов, быстрый A/B‑анализ.
  • Рекомендации/реклама: отрицательный семплинг, подготовка пар «user‑item», построение эмбеддингов, оффлайн‑пересчёт индексов (см. https://cloudcompute.ru/solutions/recsys/).
  • NLP/мультимедиа‑таблицы: массовая обработка метаданных, статистики, подготовка «манифестов» для downstream пайплайнов (ASR/диаризация/музыка).
  • Оперативная BI: интерактивные ноутбуки на single‑GPU, выборки и джойны GiB‑масштаба без «часовых» ожиданий.

Архитектуры и пайплайны

1) Single‑GPU: быстрый ETL + ML (ноутбук/сервис)

				
					NVMe (Parquet) ──> cuDF.read_parquet
                    └─> Filter/Project/FillNA
                         └─> Join/Merge (keys)
                              └─> GroupBy/Agg/Windows
                                   └─> Feature Encode/Scale
                                        ├─> Write Parquet (NVMe)
                                        └─> cuML fit/predict
				
			

Особенности: RMM‑pool 70–80% HBM; избегайте лишних to_pandas(); для больших merge — упорядочивать ключи и контролировать кардинальности.

2) Multi‑GPU: Dask‑cuDF + Dask‑CUDA (батчи/кластеры)

				
					Object/NVMe ──> dask_cudf.read_parquet (many partitions)
                  └─> Map/Filter   (per‑partition)
                       └─> set_index/shuffle (by key)
                            └─> Join/GroupBy (distributed)
                                 └─> Persist/Checkpoint
                                      ├─> dask_cuml fit/predict
                                      └─> write_parquet (repartition)
				
			

Особенности: protocol=»ucx», включайте NVLink/IB; контролируйте spill на NVMe, —rmm-pool-size; тяжёлые шифлы — ночью (Interruptible).

3) Гибрид: Online‑фичи + Offline‑батчи

				
					Kafka/Stream ─> Micro‑batch to NVMe ─> Dask‑cuDF ingest ─> Update Feature Store ─> Training Windows ─> Export to Serving


				
			

Профили GPU и ориентиры

Ниже — инженерные ориентиры для оценки пропускной способности ETL (на потоке операций «фильтр→джойн→аггрегация» при Parquet на NVMe). Для вашего корпуса и схемы данных обязательно снимайте фактические метрики.

Профиль GPU

Память

Типовой ETL‑стек

Оценка пропускной способности*

Комментарии

24 ГБ (Compact)

24 ГБ

cuDF single‑GPU

порядка 0.5–1.5 GiB/s

Быстрый интерактив, средние джойны.

48 ГБ (Balanced)

48 ГБ

Dask‑cuDF (2–4 GPU)

порядка 1–3 GiB/s

Батчи, широкие таблицы, контроль шифлов.

80 ГБ (HQ)

80 ГБ

Dask‑cuDF (4–8 GPU)

порядка 2–5 GiB/s

Тяжёлые джойны/группировки, большие окна.

* Итог зависит от кардинальностей ключей, ширины строк, компрессии, NVMe/сети и параметров шифла.

Тюнинг/профилировка: https://cloudcompute.ru/solutions/performance-tuning/https://cloudcompute.ru/solutions/throughput-vs-latency/https://cloudcompute.ru/solutions/multi-gpu/

Конфиги и скелеты кода

Docker Compose (ноутбук + Dask‑кластер)

				
					version: "3.9"

x-common-env: &env
  RMM_POOL_SIZE: "70%"            # пул памяти RMM
  DASK_PROTOCOL: "ucx"            # ucx|tcp
  CUDA_VISIBLE_DEVICES: "0,1"     # укажите ваши GPU
  UCX_TLS: "tcp,cuda_copy,cuda_ipc"
  UCX_SOCKADDR_TLS_PRIORITY: "tcp"
  WORKER_SPILL_DIR: "/nvme/spill"

services:
  rapids-notebook:
    image: cloudcompute/rapids-notebook:latest
    environment:
      <<: *env
    deploy:
      resources:
        reservations:
          devices: [{ capabilities: ["gpu"] }]
    ports: ["8888:8888"]
    volumes:
      - /nvme/datasets:/data
      - /nvme/spill:/nvme/spill
    command: ["jupyter", "lab", "--ip=0.0.0.0", "--allow-root"]

  dask-scheduler:
    image: cloudcompute/dask-cuda:latest
    environment:
      <<: *env
    ports: ["8786:8786","8787:8787"]
    command: ["dask-scheduler"]

  dask-worker:
    image: cloudcompute/dask-cuda:latest
    depends_on: [dask-scheduler]
    environment:
      <<: *env
    deploy:
      resources:
        reservations:
          devices: [{ capabilities: ["gpu"] }]
    volumes:
      - /nvme/spill:/nvme/spill
    command:
      ["dask-cuda-worker","dask-scheduler:8786",
       "--rmm-pool-size","${RMM_POOL_SIZE}",
       "--local-directory","/nvme/spill",
       "--enable-tcp-over-ucx","--enable-cuda-ipc"]


				
			

K8s (Dask‑CUDA: планировщик и воркеры с GPU)

				
					apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-scheduler }
spec:
  replicas: 1
  selector: { matchLabels: { app: dask-scheduler } }
  template:
    metadata: { labels: { app: dask-scheduler } }
    spec:
      containers:
        - name: scheduler
          image: cloudcompute/dask-cuda:latest
          ports: [{ containerPort: 8786 }, { containerPort: 8787 }]
          resources: { limits: { cpu: "2", memory: "4Gi" } }

---
apiVersion: apps/v1
kind: Deployment
metadata: { name: dask-worker }
spec:
  replicas: 2
  selector: { matchLabels: { app: dask-worker } }
  template:
    metadata: { labels: { app: dask-worker } }
    spec:
      containers:
        - name: worker
          image: cloudcompute/dask-cuda:latest
          env:
            - { name: RMM_POOL_SIZE, value: "70%" }
          command:
            ["dask-cuda-worker","dask-scheduler:8786",
             "--rmm-pool-size","70%","--local-directory","/nvme/spill",
             "--enable-tcp-over-ucx","--enable-cuda-ipc"]
          volumeMounts:
            - { name: spill, mountPath: /nvme/spill }
          resources:
            limits: { nvidia.com/gpu: 1, cpu: "4", memory: "24Gi" }
      volumes:
        - name: spill
          hostPath: { path: /nvme/spill }


				
			

Конфиг пайплайна (YAML)

				
					etl:
  input:
    path: "/data/events/*.parquet"
    columns: ["user_id","item_id","ts","price","category"]
  preprocess:
    filter:
      ts_from: "2025-01-01"
    fillna:
      price: 0.0
  join:
    right_path: "/data/catalog/*.parquet"
    keys: ["item_id"]
    how: "left"
  aggregations:
    groupby: ["user_id","category"]
    metrics:
      - { col: "price", op: "sum", as: "spend_sum" }
      - { col: "item_id", op: "count", as: "views" }
  encode:
    categorify: ["category"]
  output:
    path: "/data/features/dt=2025-08-28/"
    repartition: 256

ml:
  model: "rf_classifier"    # rf_classifier|logreg|kmeans
  target: "label"
  features: ["spend_sum","views","category_enc"]
  train_ratio: 0.8


				
			

Python: single‑GPU (cuDF + cuML)

				
					import cudf, cupy as cp
from cuml.model_selection import train_test_split
from cuml.ensemble import RandomForestClassifier
from rmm import reinitialize

# Инициализация RMM-пула (~70% HBM)
reinitialize(pool_allocator=True, initial_pool_size=int(0.7 * cp.cuda.runtime.memGetInfo()[1]))

# ETL
events = cudf.read_parquet("/data/events/*.parquet", columns=["user_id","item_id","ts","price","category"])
events = events[events["ts"] >= "2025-01-01"].fillna({"price": 0.0})
catalog = cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]
df = events.merge(catalog, on="item_id", how="left")
feat = (df.groupby(["user_id","category"])
          .agg({"price":"sum","item_id":"count"})
          .rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
          .reset_index())
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])
feat.to_parquet("/data/features/2025-08-28/")

# ML
X = feat[["spend_sum","views","category_enc"]].astype("float32")
y = (feat["spend_sum"] > 0).astype("int32")  # пример таргета
X_tr, X_te, y_tr, y_te = train_test_split(X, y, train_size=0.8, random_state=42)
clf = RandomForestClassifier(n_estimators=200, max_depth=12)
clf.fit(X_tr, y_tr)
print("AUC-like proxy:", float((clf.predict_proba(X_te)[:,1] > 0.5).astype("int32").mean()))


				
			

Python: Multi‑GPU (Dask‑cuDF + Dask‑CUDA + cuML)

				
					from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import dask_cudf, cudf
from cuml.dask.ensemble import RandomForestClassifier as DaskRF

cluster = LocalCUDACluster(protocol="ucx", enable_tcp_over_ucx=True,
                           enable_nvlink=True, rmm_pool_size="70%")
client = Client(cluster)

dfe = dask_cudf.read_parquet("/data/events/*.parquet",
                             columns=["user_id","item_id","ts","price","category"])
dfe = dfe[dfe["ts"] >= "2025-01-01"].fillna({"price": 0.0})
dfc = dask_cudf.read_parquet("/data/catalog/*.parquet")[["item_id","category"]]

# Шифл по ключу для эффективного merge
dfe = dfe.set_index("item_id", shuffle="tasks")
dfc = dfc.set_index("item_id", shuffle="tasks")
df = dfe.merge(dfc, left_index=True, right_index=True, how="left")

feat = (df.reset_index()
          .groupby(["user_id","category"])
          .agg({"price":"sum","item_id":"count"})
          .rename(columns={"price_sum":"spend_sum","item_id_count":"views"})
          .reset_index())
feat = feat.persist(); wait(feat)

# Категории → коды (per-partition)
feat["category_enc"] = feat["category"].astype("category").cat.codes
feat = feat.drop(columns=["category"])

# Разделение на обучение/тест (простое разбиение по hash user_id)
train = feat[feat["user_id"].hash_values() % 5 != 0].persist()
test  = feat[feat["user_id"].hash_values() % 5 == 0].persist(); wait([train, test])

X_tr = train[["spend_sum","views","category_enc"]].astype("float32")
y_tr = (train["spend_sum"] > 0).astype("int32")
X_te = test[["spend_sum","views","category_enc"]].astype("float32")
y_te = (test["spend_sum"] > 0).astype("int32")

rf = DaskRF(n_estimators=400, max_depth=14)
rf.fit(X_tr, y_tr)
pred = rf.predict_proba(X_te)[:,1]
print("p50_partitions:", client.run_on_scheduler(lambda: "ok"))  # заглушка метрик


				
			

Наблюдаемость, метрики, алерты

Latency/Perf (Prometheus‑стиль):

  • rapids_cudf_op_seconds{op=read|merge|groupby|write} — длительность ключевых операций (p50/p95).
  • rapids_etl_throughput_gib_s — GiB/с по пайплайну.
  • rapids_cuml_fit_seconds{model=rf|logreg|kmeans} — обучение; rapids_cuml_predict_seconds.
  • dask_task_duration_seconds{type=shuffle|map|reduce}; dask_scheduler_queue_len.
  • rmm_current_bytes, rmm_peak_bytes; spill_bytes (на NVMe).
  • gpu_utilization, gpu_memory_bytes, nvme_{read,write}_mb_s.

Quality/ML:

  • ml_auc, ml_accuracy, ml_f1, ml_calibration_ece — оффлайн.

Алерты (примеры):

  • rmm_current_bytes/HBM > 0.8 — увеличить spill/уменьшить партиции/снизить ширину строк.
  • dask_task_duration_p95{type=shuffle} ↑ — перегрузка шифла; перенести job в Interruptible, увеличить GPU/пул NVMe.
  • spill_bytes_p95 ↑ — не хватает HBM; пересобрать RMM‑pool, оптимизировать join/категории.
  • rapids_cudf_op_seconds{op=merge}_p95 ↑ — перекос ключей; ребаланс set_index, предварительные categorify.

Дашборды и логирование: https://cloudcompute.ru/solutions/monitoring-logging/https://cloudcompute.ru/solutions/llm-inference/observability/

Экономика и формулы

Обозначения: S_data — объём входных данных (GiB), N_gpu — число GPU, BW_eff — эффективная пропускная (GiB/с/GPU), c_gpu — цена/час, U — целевая загрузка.

  • Время ETL: T_etl ≈ S_data / (N_gpu × BW_eff).
  • Время тренинга: T_train ≈ N_iter × t_iter / (N_gpu × U) (условно; зависит от модели).
  • Стоимость: Cost_total ≈ c_gpu × (T_etl + T_train).
  • On‑Demand vs Interruptible: для тяжёлых шифлов переносите в Interruptible; Cost_per_1T_rows считайте через фактический etl_throughput_gib_s.

Планирование бюджета: https://cloudcompute.ru/solutions/cost-planner/ • компромиссы: https://cloudcompute.ru/solutions/throughput-vs-latency/

Безопасность/политики

  • PII/комплаенс: маскирование полей на этапе ETL; раздельные бакеты для сырых/очищенных данных.
  • Шифрование: «в канале» (Ingress/межузловые) и «на диске» (NVMe spill/артефакты).
  • Ретеншн: TTL для временных файлов/спиллов; аудит доступа к фичестору.
  • Изоляция: отдельные GPU‑пулы по тенантам/задачам; quotas на Dask‑кластер.

Подробнее: https://cloudcompute.ru/solutions/security/https://cloudcompute.ru/solutions/storage-data/

Траблшутинг

Симптом

Возможная причина

Решение

CUDA OOM при merge/groupby

Узкие партиции, большие промежуточные

Увеличить RMM‑pool, repartition, set_index по ключу, включить spill

Шифл «висит»/очень долгий

Высокая кардинальность/скос ключей

Пред‑аггрегация, shuffle=»p2p»/задачами, ключ с меньшей кардинальностью

Сплошной spill на NVMe

Завышенный batch/ширина строк

Уменьшить число колонок, astype к компактным типам, частичная аггрегация

Нестабильные воркеры

Перегрев HBM/пики задач

Снизить параллелизм, лимиты по памяти, увеличить —rmm-pool-size

UCX/сетевые ошибки

Неверные флаги/сеть

Переключиться на TCP, проверить enable_nvlink/enable_tcp_over_ucx, MTU

Несогласованность типов

Смешанные dtypes между файлами

Явные схемы, astype перед merge/write, валидация схемы

Медленный write Parquet

Мелкие партиции/много файлов

repartition до разумного числа, увеличить row_group_size

Модель «переучилась»

Данные утекли/дисбаланс

Time‑split, регуляризация, калибровка, контроль дрифта

Ещё по устойчивости пайплайнов: https://cloudcompute.ru/solutions/interruptible-patterns/ • тюнинг: https://cloudcompute.ru/solutions/performance-tuning/

Как запустить в cloudcompute.ru

  1. Откройте Шаблоны запусков: https://cloudcompute.ru/solutions/templates/ — выберите RAPIDS ETL (Single‑GPU) или RAPIDS + Dask‑CUDA (Multi‑GPU).
  2. Выберите профиль GPU: 24/48/80 ГБ — по объёму данных/окнам/джойнам.
  3. Смонтируйте диски: /nvme/datasets, /nvme/spill, при необходимости — /nvme/checkpoints.
  4. Заполните переменные окружения (RMM‑pool, протокол UCX/TCP, spill‑директория) как в docker-compose.yml.
  5. Для продакшна: автоскейл по очереди/использованию, дашборды/алерты, канареечные обновления пайплайна.

Чек‑лист перед продом

  • Замерены etl_throughput_gib_s, p50/p95 ключевых операций (read/merge/groupby/write).
  • RMM‑pool 70–80% HBM; spill на NVMe протестирован.
  • Партиционирование/set_index по ключам настроено; шифлы устойчивы.
  • Версионирование схемы/фичей и проверка типов включены.
  • Алерты по rmm_current_bytes, spill_bytes, task_duration_p95, gpu_mem_peak.
  • Политики PII/ретеншн/шифрование — внедрены.
  • Нагрузочный прогон ≥ 30 мин с целевыми p95; план отката готов.

Навигация