Решения
Spark RAPIDS: ускорение Spark‑пайплайнов на GPU
Задача страницы. Практический гид по запуску Spark RAPIDS для ускорения Spark SQL/DataFrame/ETL и части ML‑операций на GPU. Разберём архитектуру плагина (Catalyst→GPU), рекомендуемые конфиги (SQLPlugin, UCX‑shuffle, RMM‑пулы), пайплайны single‑/multi‑GPU, sizing по профилям 24/48/80 ГБ, метрики/алерты, экономику и траблшутинг. Предполагается NVMe‑кэш, режимы On‑Demand (интерактив/низкая латентность) и Interruptible (ночные батчи), контейнеризация и мониторинг.
TL;DR
- Что ускоряем. DataFrame/Spark SQL‑операторы (scan, filter, project, join, agg, window, sort, repartition) и часть ML‑класса — переводятся в GPU‑колоннарные эквиваленты. RDD‑API и произвольные Python/Scala UDF — не ускоряются (часто фоллбэк на CPU).
- Включить GPU в Spark:
spark.plugins = com.nvidia.spark.SQLPlugin
spark.rapids.sql.enabled = true
spark.executor.resource.gpu.amount = 1
spark.task.resource.gpu.amount ∈ [0.0625..0.5] (параллелизм на GPU) - Сетевой/Shuffle: RapidsShuffleManager + UCX для межузловых шифлов, NVMe под spill.
- Память: RMM‑пул 70–80% HBM, pinned‑pool для fast host↔device, крупные batch‑размеры.
- Наблюдаемость: EXPLAIN=ALL для контроля фоллбэков, gpuOpTime/OpTime, p95 job/stage/task, GPU util/HBM, spill.
- Экономика: T_job ≈ S_data / (N_gpu × BW_eff). Cost ≈ c_gpu × T_job. Часто 3–10× ускорение ETL при меньшем числе узлов. См. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
- Если нужна «чистая» RAPIDS без Spark: см. соседнюю страницу https://cloudcompute.ru/solutions/rapids/
Сценарии (когда это нужно)
- Ежедневные батчи ETL: парсинг логов/Parquet/ORC → джойны (каталог/события) → агрегаты/окна → выгрузка фичей для рексиса/ML.
- Интерактивная аналитика: ad‑hoc SQL на GiB–TiB таблицах, быстрые выборки/группировки.
- Подготовка фичей для рекомендаций: категоризация/хэширование/агрегации, негатив‑семплинг (in‑batch), экспорт для индексов. См. https://cloudcompute.ru/solutions/recsys/
- Гибрид Spark↔GPU‑ML: ETL в Spark RAPIDS → обучение/инференс в Triton/cuDF/cuML. См. https://cloudcompute.ru/solutions/triton-inference-server/ и https://cloudcompute.ru/solutions/rapids/
Архитектуры и пайплайны

Как работает плагин (упрощённо)
Spark SQL/DataFrame
└─> Catalyst Optimizer
└─> RAPIDS Plugin (rule-based)
├─ GPU-колоннарные операторы (scan/join/agg/...)
└─ [Fallback] CPU-операторы (если не поддерживается)
└─ Columnar <-> Row конверсия (минимизировать!)
1) GPU‑ETL с UCX‑shuffle (батчи)
Parquet/ORC on NVMe/Object
└─> Spark SQL (GPU)
├─ Read -> Filter/Project
├─ Repartition/Shuffle (UCX, NVMe spill)
├─ Join/Aggregations/Windows
└─ Write Parquet (partitioned)
2) Гибрид с downstream ML
Spark RAPIDS ETL (GPU) ──> Features Parquet ──> cuDF/cuML/Triton (GPU) ──> Serving
3) Интерактив на кластере
Spark Thrift/Notebook ─> SQL (GPU plan) ─> Low-latency scans/joins ─> Caching ─> BI/Ad-hoc
Профили GPU и ориентиры пропускной способности
Инженерные ориентиры для ETL‑пайплайна (filter→join→groupby на Parquet, NVMe, UCX‑shuffle). Фактические цифры зависят от схемы/кардинальностей/компрессии/сети.
Профиль GPU | Память | Типичный кластер | Эффективная скорость ETL* | Комментарии |
24 ГБ (Compact) | 24 ГБ | 2–4 executors×1 GPU | 0.5–1.2 GiB/s | Малая/средняя ширина строк, ограниченные шифлы. |
48 ГБ (Balanced) | 48 ГБ | 4–8 executors×1 GPU | 1–3 GiB/s | Баланс join‑нагрузок и оконных ф‑ий. |
80 ГБ (HQ) | 80 ГБ | 8–16 executors×1 GPU | 2–5 GiB/s | Тяжёлые джойны/окна, большие партиции. |
* На узел — при хорошей NVMe/сети и UCX‑shuffle; масштабируйте линейно до сетевых/IO‑потолков.
Тюнинг и распределёнка: https://cloudcompute.ru/solutions/performance-tuning/, https://cloudcompute.ru/solutions/multi-gpu/
Конфиги (YAML/submit) и скелеты кода
spark-defaults.conf (минимум для GPU)
spark.plugins com.nvidia.spark.SQLPlugin
spark.rapids.sql.enabled true
spark.rapids.sql.concurrentGpuTasks 2
spark.rapids.memory.gpu.allocFraction 0.8
spark.rapids.memory.pinnedPool.size 2G
spark.rapids.sql.explain ALL # видеть NOT_ON_GPU
spark.sql.adaptive.enabled true # AQE
spark.sql.files.maxPartitionBytes 512m
spark.sql.shuffle.partitions 200
spark.shuffle.manager org.apache.spark.shuffle.rapids.RapidsShuffleManager
spark.rapids.shuffle.mode UCX
spark.rapids.shuffle.ucx.bounceBuffers.size 64m
spark.executor.resource.gpu.amount 1
spark.task.resource.gpu.amount 0.125 # 8 задач на GPU (подберите)
spark.executor.memory 12g
spark.executor.memoryOverhead 6g
spark-submit (Kubernetes; 1 GPU на executor)
spark-submit \
--master k8s://https://$K8S_API \
--deploy-mode cluster \
--name rapids-etl \
--conf spark.kubernetes.container.image=cloudcompute/spark-rapids:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=8 \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.shuffle.rapids.RapidsShuffleManager \
--conf spark.rapids.shuffle.mode=UCX \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.125 \
--conf spark.rapids.memory.gpu.allocFraction=0.8 \
--conf spark.rapids.memory.pinnedPool.size=2G \
--conf spark.locality.wait=0s \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.files.maxPartitionBytes=512m \
--conf spark.sql.shuffle.partitions=200 \
local:///opt/app/jobs/etl.py
Пример PySpark (GPU‑friendly SQL/DataFrame)
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder
.appName("rapids-etl")
.getOrCreate())
# Важно: избегаем произвольных UDF (фоллбэк на CPU), используем встроенные функции
events = (spark.read.parquet("s3a://bucket/events/")
.select("user_id","item_id","ts","price","category")
.where(F.col("ts") >= "2025-01-01")
.fillna({"price": 0.0}))
catalog = spark.read.parquet("s3a://bucket/catalog/").select("item_id","brand","category")
# Джойн по ключу + агрегации — остаются на GPU
df = (events.join(catalog.hint("broadcast"), "item_id", "left")
.withColumn("week", F.date_trunc("week", F.col("ts")))
.groupBy("user_id","week","category")
.agg(F.sum("price").alias("spend_sum"),
F.count("*").alias("views"))
.withColumn("arpu", F.col("spend_sum") / F.greatest(F.col("views"), F.lit(1))))
# Партиционированная запись
(df.repartition(256, "week")
.write.mode("overwrite")
.partitionBy("week").parquet("s3a://bucket/features/"))
# Контроль плана: GPU/CPU
print(df._jdf.queryExecution().toString()) # кратко
Spark‑Operator (опционально): SparkApplication (K8s)
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata: { name: rapids-etl }
spec:
type: Scala
mode: cluster
image: cloudcompute/spark-rapids:latest
imagePullPolicy: IfNotPresent
mainClass: com.company.jobs.Etl
mainApplicationFile: local:///opt/app/jobs/etl.jar
sparkVersion: "3.5.1"
restartPolicy: { type: OnFailure }
driver:
cores: 1
memory: 4g
serviceAccount: spark
env:
- name: SPARK_CONF_DIR
value: /opt/spark/conf
executor:
instances: 8
cores: 4
memory: 12g
memoryOverhead: 6g
gpu:
name: nvidia.com/gpu
quantity: 1
deps: {}


Наблюдаемость/метрики/алерты
Spark + RAPIDS (Prometheus/лог‑метрики):
- План/ускорение: rapids_explain_not_on_gpu_count, rapids_plan_gpu_operators_total, rapids_gpuOpTime_ms, rapids_opTime_ms — доля GPU‑времени.
- Latency/Perf: spark_job_duration_seconds, spark_stage_duration_seconds{type=shuffle|agg|join}, spark_task_duration_seconds (p50/p95).
- Shuffle/IO: spark_shuffle_read_bytes, spark_shuffle_write_bytes, rapids_spill_bytes{to=host|disk}, nvme_{read,write}_mb_s.
- GPU: gpu_utilization, gpu_memory_bytes, gpu_mem_peak_bytes.
- AQE: spark_sql_adaptive_skewed_partitions, spark_sql_exchange_reuse (эффективность адаптива).
Алерты (примеры):
- rapids_explain_not_on_gpu_count > 0 — есть CPU‑операторы: переписать UDF/выражения, проверить типы.
- rapids_spill_bytes{to=disk}_p95 ↑ — не хватает HBM/host‑пула: увеличить RMM/pinned, уменьшить batch, оптимизировать join.
- spark_stage_duration_p95{type=shuffle} ↑ — UCX/сеть/NVMe узкие; увеличить параллелизм/ресурсы, перенести джобы в Interruptible.
- gpu_mem_peak/HBM > 0.9 — снизить concurrentGpuTasks, batch, увеличить HBM‑профиль.
Детали по мониторингу: https://cloudcompute.ru/solutions/monitoring-logging/ • https://cloudcompute.ru/solutions/llm-inference/observability/
Экономика и формулы
Обозначения: S_data — объём данных (GiB), N_gpu — число GPU‑executors, BW_eff — эффективная скорость GPU‑ETL (GiB/с/узел), c_gpu — цена/час, U — целевая загрузка.
- Время джобы:
T_job ≈ S_data / (N_gpu × BW_eff). - Стоимость:
Cost ≈ c_gpu × T_job = c_gpu × S_data / (N_gpu × BW_eff). - Сравнение CPU vs GPU (оценка):
Пусть BW_cpu — GiB/с на CPU‑узел, BW_gpu — на GPU‑узел.
Выигрыш по стоимости при одинаковом S_data:
Cost_ratio ≈ (c_gpu / BW_gpu) / (c_cpu / BW_cpu).
Обычно BW_gpu >> BW_cpu, и при сопоставимых тарифах выигрывает GPU‑кластер.
Планирование: https://cloudcompute.ru/solutions/cost-planner/ • компромисс throughput↔latency: https://cloudcompute.ru/solutions/throughput-vs-latency/
Безопасность/политики
- PII/комплаенс: маскирование и минимизация колонок ещё в ETL; разделение «сырых/очищенных» бакетов.
- Шифрование: транспорт межузлов (UCX/TCP) и хранилище (NVMe/object).
- Ретеншн: TTL на spill/временные артефакты, аудит доступа.
- Изоляция: пулы On‑Demand/Interruptible, квоты на executors, ограничение дорогих SQL‑паттернов.
См. также: https://cloudcompute.ru/solutions/security/ • https://cloudcompute.ru/solutions/storage-data/
Траблшутинг
Симптом | Возможная причина | Решение |
В плане много NOT_ON_GPU | UDF/неподдерж. выражения/типы | Переписать на встроенные функции, привести типы, отключить проблемные шаги |
Низкая утилизация GPU | Малые batch/узкие партиции | Увеличить maxPartitionBytes, concurrentGpuTasks, батч‑размеры |
Высокий spill на диск | Нехватка HBM/host‑пула | rmm.allocFraction↑, pinnedPool↑, уменьшить конкуренцию на GPU |
Долгие shuffle‑стадии | Нет UCX/узкая сеть/NVMe | Включить UCX, проверить NVMe/сеть, уменьшить shuffle.partitions после AQE |
Перекос ключей (skew) | Немного «тяжёлых» ключей | AQE включить, salt‑техники, предварит. агрегация |
OOM на драйвере/экзекьюторах | Узкая memoryOverhead/широкие строки | Увеличить overhead, привести типы к компактным (int/decimal), меньше колоночных взрывов |
Резкое падение скорости на write | Много мелких файлов | repartition перед write, настроить row_group_size, группировать по партициям |
Латентность растёт волнами | AQE/обновление плана/reuse | Поиграть advisoryPartitionSize, кэшировать стабильные источники |
Больше паттернов: https://cloudcompute.ru/solutions/interruptible-patterns/ • тюнинг: https://cloudcompute.ru/solutions/performance-tuning/
Как запустить в cloudcompute.ru
- Откройте Шаблоны запусков: https://cloudcompute.ru/solutions/templates/ — выберите Spark RAPIDS (K8s) или Spark RAPIDS (Standalone).
- Выберите профиль GPU: 24/48/80 ГБ — по ширине строк/джойнам/окнам и целевому времени окна батча.
- Подключите диски: /nvme/spill и /nvme/datasets; объектное хранилище — по вашей схеме.
- Заполните spark-defaults.conf и spark-submit как в примерах (SQLPlugin, UCX‑shuffle, RMM/pinned‑pool, ресурсы GPU).
- Для продакшна: дашборды (GPU/Spill/Shuffle/EXPLAIN), алерты на p95/спилл/NOT_ON_GPU, пулы On‑Demand/Interruptible, канареечные апдейты.
Чек‑лист перед продом
- spark.plugins=SQLPlugin, rapids.sql.enabled=true, UCX‑shuffle включён.
- EXPLAIN=ALL чистый (нет критичных NOT_ON_GPU), UDF переписаны.
- RMM‑пул 70–80% HBM, pinnedPool.size задан; spill на NVMe протестирован.
- AQE включён, перекос ключей обработан (salt/agg).
- p95 job/stage/shuffle в целевых пределах, GPU util стабильна.
- Партиционирование и размер файлов при write откалиброваны.
- Политики PII/ретеншн/шифрование внедрены.
- Канареечный релиз/откат и шаблоны деплоя готовы.
Навигация
- Хаб «Решения»: https://cloudcompute.ru/solutions/
- RAPIDS (cuDF/cuML): https://cloudcompute.ru/solutions/rapids/
- Рекомендательные системы: https://cloudcompute.ru/solutions/recsys/
- Хранилища и данные: https://cloudcompute.ru/solutions/storage-data/
- Производительность и тюнинг: https://cloudcompute.ru/solutions/performance-tuning/
- Throughput vs Latency: https://cloudcompute.ru/solutions/throughput-vs-latency/
- Multi‑GPU/распределёнка: https://cloudcompute.ru/solutions/multi-gpu/
- Планирование стоимости: https://cloudcompute.ru/solutions/cost-planner/
- Мониторинг и логи: https://cloudcompute.ru/solutions/monitoring-logging/
- Наблюдаемость пайплайнов: https://cloudcompute.ru/solutions/llm-inference/observability/
- Interruptible‑паттерны: https://cloudcompute.ru/solutions/interruptible-patterns/
- Triton Inference Server: https://cloudcompute.ru/solutions/triton-inference-server/
- CI/CD контейнеров: https://cloudcompute.ru/solutions/containers-ci-cd/