Spark RAPIDS: ускорение Spark‑пайплайнов на GPU

Задача страницы. Практический гид по запуску Spark RAPIDS для ускорения Spark SQL/DataFrame/ETL и части ML‑операций на GPU. Разберём архитектуру плагина (Catalyst→GPU), рекомендуемые конфиги (SQLPlugin, UCX‑shuffle, RMM‑пулы), пайплайны single‑/multi‑GPU, sizing по профилям 24/48/80 ГБ, метрики/алерты, экономику и траблшутинг. Предполагается NVMe‑кэш, режимы On‑Demand (интерактив/низкая латентность) и Interruptible (ночные батчи), контейнеризация и мониторинг.

TL;DR

  • Что ускоряем. DataFrame/Spark SQL‑операторы (scan, filter, project, join, agg, window, sort, repartition) и часть ML‑класса — переводятся в GPU‑колоннарные эквиваленты. RDD‑API и произвольные Python/Scala UDF — не ускоряются (часто фоллбэк на CPU).
  • **Включить GPU в Spark: spark.plugins = com.nvidia.spark.SQLPlugin spark.rapids.sql.enabled = true spark.executor.resource.gpu.amount = 1 spark.task.resource.gpu.amount ∈ [0.0625..0.5] (параллелизм на GPU)
  • Сетевой/Shuffle: RapidsShuffleManager + UCX для межузловых шифлов, NVMe под spill.
  • Память: RMM‑пул 70–80% HBM, pinned‑pool для fast host↔device, крупные batch‑размеры.
  • Наблюдаемость: EXPLAIN=ALL для контроля фоллбэков, gpuOpTime/OpTime, p95 job/stage/task, GPU util/HBM, spill.
  • Экономика: T_job ≈ S_data / (N_gpu × BW_eff). Cost ≈ c_gpu × T_job. Часто 3–10× ускорение ETL при меньшем числе узлов. См. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
  • Если нужна «чистая» RAPIDS без Spark: см. соседнюю страницу https://cloudcompute.ru/solutions/rapids/

Сценарии (когда это нужно)

  • Ежедневные батчи ETL: парсинг логов/Parquet/ORC → джойны (каталог/события) → агрегаты/окна → выгрузка фичей для рексиса/ML.
  • Интерактивная аналитика: ad‑hoc SQL на GiB–TiB таблицах, быстрые выборки/группировки.
  • Подготовка фичей для рекомендаций: категоризация/хэширование/агрегации, негатив‑семплинг (in‑batch), экспорт для индексов. См. https://cloudcompute.ru/solutions/recsys/
  • Гибрид Spark↔GPU‑ML: ETL в Spark RAPIDS → обучение/инференс в Triton/cuDF/cuML. См. https://cloudcompute.ru/solutions/triton-inference-server/ и https://cloudcompute.ru/solutions/rapids/

Архитектуры и пайплайны Как работает плагин (упрощённо)

Spark SQL/DataFrame
 └─> Catalyst Optimizer
 └─> RAPIDS Plugin (rule-based)
 ├─ GPU-колоннарные операторы (scan/join/agg/...)
 └─ [Fallback] CPU-операторы (если не поддерживается)
 └─ Columnar <-> Row конверсия (минимизировать!)

1) GPU‑ETL с UCX‑shuffle (батчи)

Parquet/ORC on NVMe/Object
 └─> Spark SQL (GPU)
 ├─ Read -> Filter/Project
 ├─ Repartition/Shuffle (UCX, NVMe spill)
 ├─ Join/Aggregations/Windows
 └─ Write Parquet (partitioned)

2) Гибрид с downstream ML

Spark RAPIDS ETL (GPU) ──> Features Parquet ──> cuDF/cuML/Triton (GPU) ──> Serving

3) Интерактив на кластере

Spark Thrift/Notebook ─> SQL (GPU plan) ─> Low-latency scans/joins ─> Caching ─> BI/Ad-hoc

Профили GPU и ориентиры пропускной способности

Инженерные ориентиры для ETL‑пайплайна (filter→join→groupby на Parquet, NVMe, UCX‑shuffle). Фактические цифры зависят от схемы/кардинальностей/компрессии/сети.

**Профиль GPU** **Память** **Типичный кластер** **Эффективная скорость ETL\*** **Комментарии**
24 ГБ (Compact) 24 ГБ 2–4 executors×1 GPU **0.5–1.2 GiB/s** Малая/средняя ширина строк, ограниченные шифлы.
48 ГБ (Balanced) 48 ГБ 4–8 executors×1 GPU **1–3 GiB/s** Баланс join‑нагрузок и оконных ф‑ий.
80 ГБ (HQ) 80 ГБ 8–16 executors×1 GPU **2–5 GiB/s** Тяжёлые джойны/окна, большие партиции.

* На узел — при хорошей NVMe/сети и UCX‑shuffle; масштабируйте линейно до сетевых/IO‑потолков. Тюнинг и распределёнка: https://cloudcompute.ru/solutions/performance-tuning/, https://cloudcompute.ru/solutions/multi-gpu/

Конфиги (YAML/submit) и скелеты кода

spark-defaults.conf (минимум для GPU)

spark.plugins com.nvidia.spark.SQLPlugin
spark.rapids.sql.enabled true
spark.rapids.sql.concurrentGpuTasks 2
spark.rapids.memory.gpu.allocFraction 0.8
spark.rapids.memory.pinnedPool.size 2G
spark.rapids.sql.explain ALL # видеть NOT_ON_GPU
spark.sql.adaptive.enabled true # AQE
spark.sql.files.maxPartitionBytes 512m
spark.sql.shuffle.partitions 200
spark.shuffle.manager org.apache.spark.shuffle.rapids.RapidsShuffleManager
spark.rapids.shuffle.mode UCX
spark.rapids.shuffle.ucx.bounceBuffers.size 64m
spark.executor.resource.gpu.amount 1
spark.task.resource.gpu.amount 0.125 # 8 задач на GPU (подберите)
spark.executor.memory 12g
spark.executor.memoryOverhead 6g

spark-submit (Kubernetes; 1 GPU на executor)

spark-submit 
 --master k8s://https://$K8S_API 
 --deploy-mode cluster 
 --name rapids-etl 
 --conf spark.kubernetes.container.image=cloudcompute/spark-rapids:latest 
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
 --conf spark.executor.instances=8 
 --conf spark.plugins=com.nvidia.spark.SQLPlugin 
 --conf spark.rapids.sql.enabled=true 
 --conf spark.shuffle.manager=org.apache.spark.shuffle.rapids.RapidsShuffleManager 
 --conf spark.rapids.shuffle.mode=UCX 
 --conf spark.executor.resource.gpu.amount=1 
 --conf spark.task.resource.gpu.amount=0.125 
 --conf spark.rapids.memory.gpu.allocFraction=0.8 
 --conf spark.rapids.memory.pinnedPool.size=2G 
 --conf spark.locality.wait=0s 
 --conf spark.sql.adaptive.enabled=true 
 --conf spark.sql.files.maxPartitionBytes=512m 
 --conf spark.sql.shuffle.partitions=200 
 local:///opt/app/jobs/etl.py

Пример PySpark (GPU‑friendly SQL/DataFrame)

from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder
 .appName("rapids-etl")
 .getOrCreate())
# Важно: избегаем произвольных UDF (фоллбэк на CPU), используем встроенные функции
events = (spark.read.parquet("s3a://bucket/events/")
 .select("user_id","item_id","ts","price","category")
 .where(F.col("ts") >= "2025-01-01")
 .fillna({"price": 0.0}))
catalog = spark.read.parquet("s3a://bucket/catalog/").select("item_id","brand","category")
# Джойн по ключу + агрегации — остаются на GPU
df = (events.join(catalog.hint("broadcast"), "item_id", "left")
 .withColumn("week", F.date_trunc("week", F.col("ts")))
 .groupBy("user_id","week","category")
 .agg(F.sum("price").alias("spend_sum"),
 F.count("*").alias("views"))
 .withColumn("arpu", F.col("spend_sum") / F.greatest(F.col("views"), F.lit(1))))
# Партиционированная запись
(df.repartition(256, "week")
 .write.mode("overwrite")
 .partitionBy("week").parquet("s3a://bucket/features/"))
# Контроль плана: GPU/CPU
print(df._jdf.queryExecution().toString()) # кратко

Spark‑Operator (опционально): SparkApplication (K8s)

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata: { name: rapids-etl }
spec:
 type: Scala
 mode: cluster
 image: cloudcompute/spark-rapids:latest
 imagePullPolicy: IfNotPresent
 mainClass: com.company.jobs.Etl
 mainApplicationFile: local:///opt/app/jobs/etl.jar
 sparkVersion: "3.5.1"
 restartPolicy: { type: OnFailure }
 driver:
 cores: 1
 memory: 4g
 serviceAccount: spark
 env:
 - name: SPARK_CONF_DIR
 value: /opt/spark/conf
 executor:
 instances: 8
 cores: 4
 memory: 12g
 memoryOverhead: 6g
 gpu:
 name: nvidia.com/gpu
 quantity: 1
 deps: {}
``` **Наблюдаемость/метрики/алерты**

**Spark + RAPIDS (Prometheus/лог‑метрики):**

- **План/ускорение:** rapids\_explain\_not\_on\_gpu\_count, rapids\_plan\_gpu\_operators\_total, rapids\_gpuOpTime\_ms, rapids\_opTime\_ms — доля GPU‑времени.
- **Latency/Perf:** spark\_job\_duration\_seconds, spark\_stage\_duration\_seconds{type=shuffle|agg|join}, spark\_task\_duration\_seconds (p50/p95).
- **Shuffle/IO:** spark\_shuffle\_read\_bytes, spark\_shuffle\_write\_bytes, rapids\_spill\_bytes{to=host|disk}, nvme\_{read,write}\_mb\_s.
- **GPU:** gpu\_utilization, gpu\_memory\_bytes, gpu\_mem\_peak\_bytes.
- **AQE:** spark\_sql\_adaptive\_skewed\_partitions, spark\_sql\_exchange\_reuse (эффективность адаптива).

**Алерты (примеры):**

- rapids\_explain\_not\_on\_gpu\_count &gt; 0 — есть CPU‑операторы: переписать UDF/выражения, проверить типы.
- rapids\_spill\_bytes{to=disk}\_p95 ↑ — не хватает HBM/host‑пула: увеличить RMM/pinned, уменьшить batch, оптимизировать join.
- spark\_stage\_duration\_p95{type=shuffle} ↑ — UCX/сеть/NVMe узкие; увеличить параллелизм/ресурсы, перенести джобы в Interruptible.
- gpu\_mem\_peak/HBM &gt; 0.9 — снизить concurrentGpuTasks, batch, увеличить HBM‑профиль.

Детали по мониторингу: <https://cloudcompute.ru/solutions/monitoring-logging/> • <https://cloudcompute.ru/solutions/llm-inference/observability/>

## **Экономика и формулы**

Обозначения: S\_data — объём данных (GiB), N\_gpu — число GPU‑executors, BW\_eff — эффективная скорость GPU‑ETL (GiB/с/узел), c\_gpu — цена/час, U — целевая загрузка.

- **Время джобы:
 T\_job ≈ S\_data / (N\_gpu × BW\_eff).
- **Стоимость:
 Cost ≈ c\_gpu × T\_job = c\_gpu × S\_data / (N\_gpu × BW\_eff).
- **Сравнение CPU vs GPU (оценка):
 Пусть BW\_cpu — GiB/с на CPU‑узел, BW\_gpu — на GPU‑узел.
 Выигрыш по стоимости при одинаковом S\_data:
 Cost\_ratio ≈ (c\_gpu / BW\_gpu) / (c\_cpu / BW\_cpu).
 Обычно BW\_gpu &gt;&gt; BW\_cpu, и при сопоставимых тарифах выигрывает GPU‑кластер.

Планирование: <https://cloudcompute.ru/solutions/cost-planner/> • компромисс throughput↔latency: <https://cloudcompute.ru/solutions/throughput-vs-latency/>

**Безопасность/политики**

- **PII/комплаенс:** маскирование и минимизация колонок ещё в ETL; разделение «сырых/очищенных» бакетов.
- **Шифрование:** транспорт межузлов (UCX/TCP) и хранилище (NVMe/object).
- **Ретеншн:** TTL на spill/временные артефакты, аудит доступа.
- **Изоляция:** пулы On‑Demand/Interruptible, квоты на executors, ограничение дорогих SQL‑паттернов.

См. также: <https://cloudcompute.ru/solutions/security/> • <https://cloudcompute.ru/solutions/storage-data/>

**Траблшутинг**

<table><tbody><tr><td>**Симптом**

</td><td>**Возможная причина**

</td><td>**Решение**

</td></tr><tr><td>В плане много **NOT\_ON\_GPU**

</td><td>UDF/неподдерж. выражения/типы

</td><td>Переписать на встроенные функции, привести типы, отключить проблемные шаги

</td></tr><tr><td>Низкая утилизация GPU

</td><td>Малые batch/узкие партиции

</td><td>Увеличить maxPartitionBytes, concurrentGpuTasks, батч‑размеры

</td></tr><tr><td>Высокий spill на диск

</td><td>Нехватка HBM/host‑пула

</td><td>rmm.allocFraction↑, pinnedPool↑, уменьшить конкуренцию на GPU

</td></tr><tr><td>Долгие shuffle‑стадии

</td><td>Нет UCX/узкая сеть/NVMe

</td><td>Включить UCX, проверить NVMe/сеть, уменьшить shuffle.partitions после AQE

</td></tr><tr><td>Перекос ключей (skew)

</td><td>Немного «тяжёлых» ключей

</td><td>AQE включить, salt‑техники, предварит. агрегация

</td></tr><tr><td>OOM на драйвере/экзекьюторах

</td><td>Узкая memoryOverhead/широкие строки

</td><td>Увеличить overhead, привести типы к компактным (int/decimal), меньше колоночных взрывов

</td></tr><tr><td>Резкое падение скорости на write

</td><td>Много мелких файлов

</td><td>repartition перед write, настроить row\_group\_size, группировать по партициям

</td></tr><tr><td>Латентность растёт волнами

</td><td>AQE/обновление плана/reuse

</td><td>Поиграть advisoryPartitionSize, кэшировать стабильные источники

</td></tr></tbody></table>

Больше паттернов: <https://cloudcompute.ru/solutions/interruptible-patterns/> • тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>

**Как запустить в cloudcompute.ru**

1. Откройте **Шаблоны запусков**: <https://cloudcompute.ru/solutions/templates/> — выберите **Spark RAPIDS (K8s)** или **Spark RAPIDS (Standalone)**.
2. Выберите профиль GPU: **24/48/80 ГБ** — по ширине строк/джойнам/окнам и целевому времени окна батча.
3. Подключите диски: /nvme/spill и /nvme/datasets; объектное хранилище — по вашей схеме.
4. Заполните spark-defaults.conf и spark-submit как в примерах (SQLPlugin, UCX‑shuffle, RMM/pinned‑pool, ресурсы GPU).
5. Для продакшна: дашборды (GPU/Spill/Shuffle/EXPLAIN), алерты на p95/спилл/NOT\_ON\_GPU, пулы On‑Demand/Interruptible, канареечные апдейты.

**Чек‑лист перед продом**

- spark.plugins=SQLPlugin, rapids.sql.enabled=true, UCX‑shuffle включён.
- EXPLAIN=ALL чистый (нет критичных NOT\_ON\_GPU), UDF переписаны.
- RMM‑пул 70–80% HBM, pinnedPool.size задан; spill на NVMe протестирован.
- AQE включён, перекос ключей обработан (salt/agg).
- p95 job/stage/shuffle в целевых пределах, GPU util стабильна.
- Партиционирование и размер файлов при write откалиброваны.
- Политики PII/ретеншн/шифрование внедрены.
- Канареечный релиз/откат и шаблоны деплоя готовы.

**Навигация**

- Хаб «Решения»: <https://cloudcompute.ru/solutions/>
- RAPIDS (cuDF/cuML): <https://cloudcompute.ru/solutions/rapids/>
- Рекомендательные системы: <https://cloudcompute.ru/solutions/recsys/>
- Хранилища и данные: <https://cloudcompute.ru/solutions/storage-data/>
- Производительность и тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>
- Throughput vs Latency: <https://cloudcompute.ru/solutions/throughput-vs-latency/>
- Multi‑GPU/распределёнка: <https://cloudcompute.ru/solutions/multi-gpu/>
- Планирование стоимости: <https://cloudcompute.ru/solutions/cost-planner/>
- Мониторинг и логи: <https://cloudcompute.ru/solutions/monitoring-logging/>
- Наблюдаемость пайплайнов: <https://cloudcompute.ru/solutions/llm-inference/observability/>
- Interruptible‑паттерны: <https://cloudcompute.ru/solutions/interruptible-patterns/>
- Triton Inference Server: <https://cloudcompute.ru/solutions/triton-inference-server/>
- CI/CD контейнеров: <https://cloudcompute.ru/solutions/containers-ci-cd/>

Готовы запустить?

Запустить GPU-сервер