Spark RAPIDS: ускорение Spark‑пайплайнов на GPU
Задача страницы. Практический гид по запуску Spark RAPIDS для ускорения Spark SQL/DataFrame/ETL и части ML‑операций на GPU. Разберём архитектуру плагина (Catalyst→GPU), рекомендуемые конфиги (SQLPlugin, UCX‑shuffle, RMM‑пулы), пайплайны single‑/multi‑GPU, sizing по профилям 24/48/80 ГБ, метрики/алерты, экономику и траблшутинг. Предполагается NVMe‑кэш, режимы On‑Demand (интерактив/низкая латентность) и Interruptible (ночные батчи), контейнеризация и мониторинг.
TL;DR
- Что ускоряем. DataFrame/Spark SQL‑операторы (scan, filter, project, join, agg, window, sort, repartition) и часть ML‑класса — переводятся в GPU‑колоннарные эквиваленты. RDD‑API и произвольные Python/Scala UDF — не ускоряются (часто фоллбэк на CPU).
- **Включить GPU в Spark: spark.plugins = com.nvidia.spark.SQLPlugin spark.rapids.sql.enabled = true spark.executor.resource.gpu.amount = 1 spark.task.resource.gpu.amount ∈ [0.0625..0.5] (параллелизм на GPU)
- Сетевой/Shuffle: RapidsShuffleManager + UCX для межузловых шифлов, NVMe под spill.
- Память: RMM‑пул 70–80% HBM, pinned‑pool для fast host↔device, крупные batch‑размеры.
- Наблюдаемость: EXPLAIN=ALL для контроля фоллбэков, gpuOpTime/OpTime, p95 job/stage/task, GPU util/HBM, spill.
- Экономика: T_job ≈ S_data / (N_gpu × BW_eff). Cost ≈ c_gpu × T_job. Часто 3–10× ускорение ETL при меньшем числе узлов. См. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
- Если нужна «чистая» RAPIDS без Spark: см. соседнюю страницу https://cloudcompute.ru/solutions/rapids/
Сценарии (когда это нужно)
- Ежедневные батчи ETL: парсинг логов/Parquet/ORC → джойны (каталог/события) → агрегаты/окна → выгрузка фичей для рексиса/ML.
- Интерактивная аналитика: ad‑hoc SQL на GiB–TiB таблицах, быстрые выборки/группировки.
- Подготовка фичей для рекомендаций: категоризация/хэширование/агрегации, негатив‑семплинг (in‑batch), экспорт для индексов. См. https://cloudcompute.ru/solutions/recsys/
- Гибрид Spark↔GPU‑ML: ETL в Spark RAPIDS → обучение/инференс в Triton/cuDF/cuML. См. https://cloudcompute.ru/solutions/triton-inference-server/ и https://cloudcompute.ru/solutions/rapids/
Архитектуры и пайплайны Как работает плагин (упрощённо)
Spark SQL/DataFrame
└─> Catalyst Optimizer
└─> RAPIDS Plugin (rule-based)
├─ GPU-колоннарные операторы (scan/join/agg/...)
└─ [Fallback] CPU-операторы (если не поддерживается)
└─ Columnar <-> Row конверсия (минимизировать!)
1) GPU‑ETL с UCX‑shuffle (батчи)
Parquet/ORC on NVMe/Object
└─> Spark SQL (GPU)
├─ Read -> Filter/Project
├─ Repartition/Shuffle (UCX, NVMe spill)
├─ Join/Aggregations/Windows
└─ Write Parquet (partitioned)
2) Гибрид с downstream ML
Spark RAPIDS ETL (GPU) ──> Features Parquet ──> cuDF/cuML/Triton (GPU) ──> Serving
3) Интерактив на кластере
Spark Thrift/Notebook ─> SQL (GPU plan) ─> Low-latency scans/joins ─> Caching ─> BI/Ad-hoc
Профили GPU и ориентиры пропускной способности
Инженерные ориентиры для ETL‑пайплайна (filter→join→groupby на Parquet, NVMe, UCX‑shuffle). Фактические цифры зависят от схемы/кардинальностей/компрессии/сети.
| **Профиль GPU** | **Память** | **Типичный кластер** | **Эффективная скорость ETL\*** | **Комментарии** |
| 24 ГБ (Compact) | 24 ГБ | 2–4 executors×1 GPU | **0.5–1.2 GiB/s** | Малая/средняя ширина строк, ограниченные шифлы. |
| 48 ГБ (Balanced) | 48 ГБ | 4–8 executors×1 GPU | **1–3 GiB/s** | Баланс join‑нагрузок и оконных ф‑ий. |
| 80 ГБ (HQ) | 80 ГБ | 8–16 executors×1 GPU | **2–5 GiB/s** | Тяжёлые джойны/окна, большие партиции. |
* На узел — при хорошей NVMe/сети и UCX‑shuffle; масштабируйте линейно до сетевых/IO‑потолков. Тюнинг и распределёнка: https://cloudcompute.ru/solutions/performance-tuning/, https://cloudcompute.ru/solutions/multi-gpu/
Конфиги (YAML/submit) и скелеты кода
spark-defaults.conf (минимум для GPU)
spark.plugins com.nvidia.spark.SQLPlugin
spark.rapids.sql.enabled true
spark.rapids.sql.concurrentGpuTasks 2
spark.rapids.memory.gpu.allocFraction 0.8
spark.rapids.memory.pinnedPool.size 2G
spark.rapids.sql.explain ALL # видеть NOT_ON_GPU
spark.sql.adaptive.enabled true # AQE
spark.sql.files.maxPartitionBytes 512m
spark.sql.shuffle.partitions 200
spark.shuffle.manager org.apache.spark.shuffle.rapids.RapidsShuffleManager
spark.rapids.shuffle.mode UCX
spark.rapids.shuffle.ucx.bounceBuffers.size 64m
spark.executor.resource.gpu.amount 1
spark.task.resource.gpu.amount 0.125 # 8 задач на GPU (подберите)
spark.executor.memory 12g
spark.executor.memoryOverhead 6g
spark-submit (Kubernetes; 1 GPU на executor)
spark-submit
--master k8s://https://$K8S_API
--deploy-mode cluster
--name rapids-etl
--conf spark.kubernetes.container.image=cloudcompute/spark-rapids:latest
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
--conf spark.executor.instances=8
--conf spark.plugins=com.nvidia.spark.SQLPlugin
--conf spark.rapids.sql.enabled=true
--conf spark.shuffle.manager=org.apache.spark.shuffle.rapids.RapidsShuffleManager
--conf spark.rapids.shuffle.mode=UCX
--conf spark.executor.resource.gpu.amount=1
--conf spark.task.resource.gpu.amount=0.125
--conf spark.rapids.memory.gpu.allocFraction=0.8
--conf spark.rapids.memory.pinnedPool.size=2G
--conf spark.locality.wait=0s
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.files.maxPartitionBytes=512m
--conf spark.sql.shuffle.partitions=200
local:///opt/app/jobs/etl.py
Пример PySpark (GPU‑friendly SQL/DataFrame)
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder
.appName("rapids-etl")
.getOrCreate())
# Важно: избегаем произвольных UDF (фоллбэк на CPU), используем встроенные функции
events = (spark.read.parquet("s3a://bucket/events/")
.select("user_id","item_id","ts","price","category")
.where(F.col("ts") >= "2025-01-01")
.fillna({"price": 0.0}))
catalog = spark.read.parquet("s3a://bucket/catalog/").select("item_id","brand","category")
# Джойн по ключу + агрегации — остаются на GPU
df = (events.join(catalog.hint("broadcast"), "item_id", "left")
.withColumn("week", F.date_trunc("week", F.col("ts")))
.groupBy("user_id","week","category")
.agg(F.sum("price").alias("spend_sum"),
F.count("*").alias("views"))
.withColumn("arpu", F.col("spend_sum") / F.greatest(F.col("views"), F.lit(1))))
# Партиционированная запись
(df.repartition(256, "week")
.write.mode("overwrite")
.partitionBy("week").parquet("s3a://bucket/features/"))
# Контроль плана: GPU/CPU
print(df._jdf.queryExecution().toString()) # кратко
Spark‑Operator (опционально): SparkApplication (K8s)
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata: { name: rapids-etl }
spec:
type: Scala
mode: cluster
image: cloudcompute/spark-rapids:latest
imagePullPolicy: IfNotPresent
mainClass: com.company.jobs.Etl
mainApplicationFile: local:///opt/app/jobs/etl.jar
sparkVersion: "3.5.1"
restartPolicy: { type: OnFailure }
driver:
cores: 1
memory: 4g
serviceAccount: spark
env:
- name: SPARK_CONF_DIR
value: /opt/spark/conf
executor:
instances: 8
cores: 4
memory: 12g
memoryOverhead: 6g
gpu:
name: nvidia.com/gpu
quantity: 1
deps: {}
``` **Наблюдаемость/метрики/алерты**
**Spark + RAPIDS (Prometheus/лог‑метрики):**
- **План/ускорение:** rapids\_explain\_not\_on\_gpu\_count, rapids\_plan\_gpu\_operators\_total, rapids\_gpuOpTime\_ms, rapids\_opTime\_ms — доля GPU‑времени.
- **Latency/Perf:** spark\_job\_duration\_seconds, spark\_stage\_duration\_seconds{type=shuffle|agg|join}, spark\_task\_duration\_seconds (p50/p95).
- **Shuffle/IO:** spark\_shuffle\_read\_bytes, spark\_shuffle\_write\_bytes, rapids\_spill\_bytes{to=host|disk}, nvme\_{read,write}\_mb\_s.
- **GPU:** gpu\_utilization, gpu\_memory\_bytes, gpu\_mem\_peak\_bytes.
- **AQE:** spark\_sql\_adaptive\_skewed\_partitions, spark\_sql\_exchange\_reuse (эффективность адаптива).
**Алерты (примеры):**
- rapids\_explain\_not\_on\_gpu\_count > 0 — есть CPU‑операторы: переписать UDF/выражения, проверить типы.
- rapids\_spill\_bytes{to=disk}\_p95 ↑ — не хватает HBM/host‑пула: увеличить RMM/pinned, уменьшить batch, оптимизировать join.
- spark\_stage\_duration\_p95{type=shuffle} ↑ — UCX/сеть/NVMe узкие; увеличить параллелизм/ресурсы, перенести джобы в Interruptible.
- gpu\_mem\_peak/HBM > 0.9 — снизить concurrentGpuTasks, batch, увеличить HBM‑профиль.
Детали по мониторингу: <https://cloudcompute.ru/solutions/monitoring-logging/> • <https://cloudcompute.ru/solutions/llm-inference/observability/>
## **Экономика и формулы**
Обозначения: S\_data — объём данных (GiB), N\_gpu — число GPU‑executors, BW\_eff — эффективная скорость GPU‑ETL (GiB/с/узел), c\_gpu — цена/час, U — целевая загрузка.
- **Время джобы:
T\_job ≈ S\_data / (N\_gpu × BW\_eff).
- **Стоимость:
Cost ≈ c\_gpu × T\_job = c\_gpu × S\_data / (N\_gpu × BW\_eff).
- **Сравнение CPU vs GPU (оценка):
Пусть BW\_cpu — GiB/с на CPU‑узел, BW\_gpu — на GPU‑узел.
Выигрыш по стоимости при одинаковом S\_data:
Cost\_ratio ≈ (c\_gpu / BW\_gpu) / (c\_cpu / BW\_cpu).
Обычно BW\_gpu >> BW\_cpu, и при сопоставимых тарифах выигрывает GPU‑кластер.
Планирование: <https://cloudcompute.ru/solutions/cost-planner/> • компромисс throughput↔latency: <https://cloudcompute.ru/solutions/throughput-vs-latency/>
**Безопасность/политики**
- **PII/комплаенс:** маскирование и минимизация колонок ещё в ETL; разделение «сырых/очищенных» бакетов.
- **Шифрование:** транспорт межузлов (UCX/TCP) и хранилище (NVMe/object).
- **Ретеншн:** TTL на spill/временные артефакты, аудит доступа.
- **Изоляция:** пулы On‑Demand/Interruptible, квоты на executors, ограничение дорогих SQL‑паттернов.
См. также: <https://cloudcompute.ru/solutions/security/> • <https://cloudcompute.ru/solutions/storage-data/>
**Траблшутинг**
<table><tbody><tr><td>**Симптом**
</td><td>**Возможная причина**
</td><td>**Решение**
</td></tr><tr><td>В плане много **NOT\_ON\_GPU**
</td><td>UDF/неподдерж. выражения/типы
</td><td>Переписать на встроенные функции, привести типы, отключить проблемные шаги
</td></tr><tr><td>Низкая утилизация GPU
</td><td>Малые batch/узкие партиции
</td><td>Увеличить maxPartitionBytes, concurrentGpuTasks, батч‑размеры
</td></tr><tr><td>Высокий spill на диск
</td><td>Нехватка HBM/host‑пула
</td><td>rmm.allocFraction↑, pinnedPool↑, уменьшить конкуренцию на GPU
</td></tr><tr><td>Долгие shuffle‑стадии
</td><td>Нет UCX/узкая сеть/NVMe
</td><td>Включить UCX, проверить NVMe/сеть, уменьшить shuffle.partitions после AQE
</td></tr><tr><td>Перекос ключей (skew)
</td><td>Немного «тяжёлых» ключей
</td><td>AQE включить, salt‑техники, предварит. агрегация
</td></tr><tr><td>OOM на драйвере/экзекьюторах
</td><td>Узкая memoryOverhead/широкие строки
</td><td>Увеличить overhead, привести типы к компактным (int/decimal), меньше колоночных взрывов
</td></tr><tr><td>Резкое падение скорости на write
</td><td>Много мелких файлов
</td><td>repartition перед write, настроить row\_group\_size, группировать по партициям
</td></tr><tr><td>Латентность растёт волнами
</td><td>AQE/обновление плана/reuse
</td><td>Поиграть advisoryPartitionSize, кэшировать стабильные источники
</td></tr></tbody></table>
Больше паттернов: <https://cloudcompute.ru/solutions/interruptible-patterns/> • тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>
**Как запустить в cloudcompute.ru**
1. Откройте **Шаблоны запусков**: <https://cloudcompute.ru/solutions/templates/> — выберите **Spark RAPIDS (K8s)** или **Spark RAPIDS (Standalone)**.
2. Выберите профиль GPU: **24/48/80 ГБ** — по ширине строк/джойнам/окнам и целевому времени окна батча.
3. Подключите диски: /nvme/spill и /nvme/datasets; объектное хранилище — по вашей схеме.
4. Заполните spark-defaults.conf и spark-submit как в примерах (SQLPlugin, UCX‑shuffle, RMM/pinned‑pool, ресурсы GPU).
5. Для продакшна: дашборды (GPU/Spill/Shuffle/EXPLAIN), алерты на p95/спилл/NOT\_ON\_GPU, пулы On‑Demand/Interruptible, канареечные апдейты.
**Чек‑лист перед продом**
- spark.plugins=SQLPlugin, rapids.sql.enabled=true, UCX‑shuffle включён.
- EXPLAIN=ALL чистый (нет критичных NOT\_ON\_GPU), UDF переписаны.
- RMM‑пул 70–80% HBM, pinnedPool.size задан; spill на NVMe протестирован.
- AQE включён, перекос ключей обработан (salt/agg).
- p95 job/stage/shuffle в целевых пределах, GPU util стабильна.
- Партиционирование и размер файлов при write откалиброваны.
- Политики PII/ретеншн/шифрование внедрены.
- Канареечный релиз/откат и шаблоны деплоя готовы.
**Навигация**
- Хаб «Решения»: <https://cloudcompute.ru/solutions/>
- RAPIDS (cuDF/cuML): <https://cloudcompute.ru/solutions/rapids/>
- Рекомендательные системы: <https://cloudcompute.ru/solutions/recsys/>
- Хранилища и данные: <https://cloudcompute.ru/solutions/storage-data/>
- Производительность и тюнинг: <https://cloudcompute.ru/solutions/performance-tuning/>
- Throughput vs Latency: <https://cloudcompute.ru/solutions/throughput-vs-latency/>
- Multi‑GPU/распределёнка: <https://cloudcompute.ru/solutions/multi-gpu/>
- Планирование стоимости: <https://cloudcompute.ru/solutions/cost-planner/>
- Мониторинг и логи: <https://cloudcompute.ru/solutions/monitoring-logging/>
- Наблюдаемость пайплайнов: <https://cloudcompute.ru/solutions/llm-inference/observability/>
- Interruptible‑паттерны: <https://cloudcompute.ru/solutions/interruptible-patterns/>
- Triton Inference Server: <https://cloudcompute.ru/solutions/triton-inference-server/>
- CI/CD контейнеров: <https://cloudcompute.ru/solutions/containers-ci-cd/>
Готовы запустить?
Запустить GPU-сервер