Решения

Spark RAPIDS: ускорение Spark‑пайплайнов на GPU

Задача страницы. Практический гид по запуску Spark RAPIDS для ускорения Spark SQL/DataFrame/ETL и части ML‑операций на GPU. Разберём архитектуру плагина (Catalyst→GPU), рекомендуемые конфиги (SQLPlugin, UCX‑shuffle, RMM‑пулы), пайплайны single‑/multi‑GPU, sizing по профилям 24/48/80 ГБ, метрики/алерты, экономику и траблшутинг. Предполагается NVMe‑кэш, режимы On‑Demand (интерактив/низкая латентность) и Interruptible (ночные батчи), контейнеризация и мониторинг.

TL;DR

  • Что ускоряем. DataFrame/Spark SQL‑операторы (scan, filter, project, join, agg, window, sort, repartition) и часть ML‑класса — переводятся в GPU‑колоннарные эквиваленты. RDD‑API и произвольные Python/Scala UDF — не ускоряются (часто фоллбэк на CPU).
  • Включить GPU в Spark:
    spark.plugins = com.nvidia.spark.SQLPlugin
    spark.rapids.sql.enabled = true
    spark.executor.resource.gpu.amount = 1
    spark.task.resource.gpu.amount ∈ [0.0625..0.5] (параллелизм на GPU)
  • Сетевой/Shuffle: RapidsShuffleManager + UCX для межузловых шифлов, NVMe под spill.
  • Память: RMM‑пул 70–80% HBM, pinned‑pool для fast host↔device, крупные batch‑размеры.
  • Наблюдаемость: EXPLAIN=ALL для контроля фоллбэков, gpuOpTime/OpTime, p95 job/stage/task, GPU util/HBM, spill.
  • Экономика: T_job ≈ S_data / (N_gpu × BW_eff). Cost ≈ c_gpu × T_job. Часто 3–10× ускорение ETL при меньшем числе узлов. См. https://cloudcompute.ru/solutions/cost-planner/ и https://cloudcompute.ru/solutions/throughput-vs-latency/
  • Если нужна «чистая» RAPIDS без Spark: см. соседнюю страницу https://cloudcompute.ru/solutions/rapids/

Сценарии (когда это нужно)

  • Ежедневные батчи ETL: парсинг логов/Parquet/ORC → джойны (каталог/события) → агрегаты/окна → выгрузка фичей для рексиса/ML.
  • Интерактивная аналитика: ad‑hoc SQL на GiB–TiB таблицах, быстрые выборки/группировки.
  • Подготовка фичей для рекомендаций: категоризация/хэширование/агрегации, негатив‑семплинг (in‑batch), экспорт для индексов. См. https://cloudcompute.ru/solutions/recsys/
  • Гибрид Spark↔GPU‑ML: ETL в Spark RAPIDS → обучение/инференс в Triton/cuDF/cuML. См. https://cloudcompute.ru/solutions/triton-inference-server/ и https://cloudcompute.ru/solutions/rapids/

Архитектуры и пайплайны

Как работает плагин (упрощённо)

				
					Spark SQL/DataFrame
   └─> Catalyst Optimizer
        └─> RAPIDS Plugin (rule-based)
             ├─ GPU-колоннарные операторы (scan/join/agg/...)
             └─ [Fallback] CPU-операторы (если не поддерживается)
                  └─ Columnar <-> Row конверсия (минимизировать!)


				
			

1) GPU‑ETL с UCX‑shuffle (батчи)

				
					Parquet/ORC on NVMe/Object
  └─> Spark SQL (GPU)
       ├─ Read -> Filter/Project
       ├─ Repartition/Shuffle (UCX, NVMe spill)
       ├─ Join/Aggregations/Windows
       └─ Write Parquet (partitioned)

				
			

2) Гибрид с downstream ML

				
					Spark RAPIDS ETL (GPU) ──> Features Parquet ──> cuDF/cuML/Triton (GPU) ──> Serving


				
			

3) Интерактив на кластере

				
					Spark Thrift/Notebook ─> SQL (GPU plan) ─> Low-latency scans/joins ─> Caching ─> BI/Ad-hoc

				
			

Профили GPU и ориентиры пропускной способности

Инженерные ориентиры для ETL‑пайплайна (filter→join→groupby на Parquet, NVMe, UCX‑shuffle). Фактические цифры зависят от схемы/кардинальностей/компрессии/сети.

Профиль GPU

Память

Типичный кластер

Эффективная скорость ETL*

Комментарии

24 ГБ (Compact)

24 ГБ

2–4 executors×1 GPU

0.5–1.2 GiB/s

Малая/средняя ширина строк, ограниченные шифлы.

48 ГБ (Balanced)

48 ГБ

4–8 executors×1 GPU

1–3 GiB/s

Баланс join‑нагрузок и оконных ф‑ий.

80 ГБ (HQ)

80 ГБ

8–16 executors×1 GPU

2–5 GiB/s

Тяжёлые джойны/окна, большие партиции.

* На узел — при хорошей NVMe/сети и UCX‑shuffle; масштабируйте линейно до сетевых/IO‑потолков.
Тюнинг и распределёнка: https://cloudcompute.ru/solutions/performance-tuning/, https://cloudcompute.ru/solutions/multi-gpu/

Конфиги (YAML/submit) и скелеты кода

spark-defaults.conf (минимум для GPU)

				
					spark.plugins                               com.nvidia.spark.SQLPlugin
spark.rapids.sql.enabled                    true
spark.rapids.sql.concurrentGpuTasks         2
spark.rapids.memory.gpu.allocFraction       0.8
spark.rapids.memory.pinnedPool.size         2G
spark.rapids.sql.explain                    ALL          # видеть NOT_ON_GPU

spark.sql.adaptive.enabled                  true         # AQE
spark.sql.files.maxPartitionBytes           512m
spark.sql.shuffle.partitions                200

spark.shuffle.manager                       org.apache.spark.shuffle.rapids.RapidsShuffleManager
spark.rapids.shuffle.mode                   UCX
spark.rapids.shuffle.ucx.bounceBuffers.size 64m

spark.executor.resource.gpu.amount          1
spark.task.resource.gpu.amount              0.125        # 8 задач на GPU (подберите)
spark.executor.memory                       12g
spark.executor.memoryOverhead               6g


				
			

spark-submit (Kubernetes; 1 GPU на executor)

				
					spark-submit \
  --master k8s://https://$K8S_API \
  --deploy-mode cluster \
  --name rapids-etl \
  --conf spark.kubernetes.container.image=cloudcompute/spark-rapids:latest \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.executor.instances=8 \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.rapids.sql.enabled=true \
  --conf spark.shuffle.manager=org.apache.spark.shuffle.rapids.RapidsShuffleManager \
  --conf spark.rapids.shuffle.mode=UCX \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.125 \
  --conf spark.rapids.memory.gpu.allocFraction=0.8 \
  --conf spark.rapids.memory.pinnedPool.size=2G \
  --conf spark.locality.wait=0s \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.sql.files.maxPartitionBytes=512m \
  --conf spark.sql.shuffle.partitions=200 \
  local:///opt/app/jobs/etl.py


				
			

Пример PySpark (GPU‑friendly SQL/DataFrame)

				
					from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder
         .appName("rapids-etl")
         .getOrCreate())

# Важно: избегаем произвольных UDF (фоллбэк на CPU), используем встроенные функции
events = (spark.read.parquet("s3a://bucket/events/")
          .select("user_id","item_id","ts","price","category")
          .where(F.col("ts") >= "2025-01-01")
          .fillna({"price": 0.0}))

catalog = spark.read.parquet("s3a://bucket/catalog/").select("item_id","brand","category")

# Джойн по ключу + агрегации — остаются на GPU
df = (events.join(catalog.hint("broadcast"), "item_id", "left")
            .withColumn("week", F.date_trunc("week", F.col("ts")))
            .groupBy("user_id","week","category")
            .agg(F.sum("price").alias("spend_sum"),
                 F.count("*").alias("views"))
            .withColumn("arpu", F.col("spend_sum") / F.greatest(F.col("views"), F.lit(1))))

# Партиционированная запись
(df.repartition(256, "week")
   .write.mode("overwrite")
   .partitionBy("week").parquet("s3a://bucket/features/"))

# Контроль плана: GPU/CPU
print(df._jdf.queryExecution().toString())   # кратко


				
			

Spark‑Operator (опционально): SparkApplication (K8s)

				
					apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata: { name: rapids-etl }
spec:
  type: Scala
  mode: cluster
  image: cloudcompute/spark-rapids:latest
  imagePullPolicy: IfNotPresent
  mainClass: com.company.jobs.Etl
  mainApplicationFile: local:///opt/app/jobs/etl.jar
  sparkVersion: "3.5.1"
  restartPolicy: { type: OnFailure }
  driver:
    cores: 1
    memory: 4g
    serviceAccount: spark
    env:
      - name: SPARK_CONF_DIR
        value: /opt/spark/conf
  executor:
    instances: 8
    cores: 4
    memory: 12g
    memoryOverhead: 6g
    gpu:
      name: nvidia.com/gpu
      quantity: 1
  deps: {}


				
			

Наблюдаемость/метрики/алерты

Spark + RAPIDS (Prometheus/лог‑метрики):

  • План/ускорение: rapids_explain_not_on_gpu_count, rapids_plan_gpu_operators_total, rapids_gpuOpTime_ms, rapids_opTime_ms — доля GPU‑времени.
  • Latency/Perf: spark_job_duration_seconds, spark_stage_duration_seconds{type=shuffle|agg|join}, spark_task_duration_seconds (p50/p95).
  • Shuffle/IO: spark_shuffle_read_bytes, spark_shuffle_write_bytes, rapids_spill_bytes{to=host|disk}, nvme_{read,write}_mb_s.
  • GPU: gpu_utilization, gpu_memory_bytes, gpu_mem_peak_bytes.
  • AQE: spark_sql_adaptive_skewed_partitions, spark_sql_exchange_reuse (эффективность адаптива).

Алерты (примеры):

  • rapids_explain_not_on_gpu_count > 0 — есть CPU‑операторы: переписать UDF/выражения, проверить типы.
  • rapids_spill_bytes{to=disk}_p95 ↑ — не хватает HBM/host‑пула: увеличить RMM/pinned, уменьшить batch, оптимизировать join.
  • spark_stage_duration_p95{type=shuffle} ↑ — UCX/сеть/NVMe узкие; увеличить параллелизм/ресурсы, перенести джобы в Interruptible.
  • gpu_mem_peak/HBM > 0.9 — снизить concurrentGpuTasks, batch, увеличить HBM‑профиль.

Детали по мониторингу: https://cloudcompute.ru/solutions/monitoring-logging/https://cloudcompute.ru/solutions/llm-inference/observability/

Экономика и формулы

Обозначения: S_data — объём данных (GiB), N_gpu — число GPU‑executors, BW_eff — эффективная скорость GPU‑ETL (GiB/с/узел), c_gpu — цена/час, U — целевая загрузка.

  • Время джобы:
    T_job ≈ S_data / (N_gpu × BW_eff).
  • Стоимость:
    Cost ≈ c_gpu × T_job = c_gpu × S_data / (N_gpu × BW_eff).
  • Сравнение CPU vs GPU (оценка):
    Пусть BW_cpu — GiB/с на CPU‑узел, BW_gpu — на GPU‑узел.
    Выигрыш по стоимости при одинаковом S_data:
    Cost_ratio ≈ (c_gpu / BW_gpu) / (c_cpu / BW_cpu).
    Обычно BW_gpu >> BW_cpu, и при сопоставимых тарифах выигрывает GPU‑кластер.

Планирование: https://cloudcompute.ru/solutions/cost-planner/ • компромисс throughput↔latency: https://cloudcompute.ru/solutions/throughput-vs-latency/

Безопасность/политики

  • PII/комплаенс: маскирование и минимизация колонок ещё в ETL; разделение «сырых/очищенных» бакетов.
  • Шифрование: транспорт межузлов (UCX/TCP) и хранилище (NVMe/object).
  • Ретеншн: TTL на spill/временные артефакты, аудит доступа.
  • Изоляция: пулы On‑Demand/Interruptible, квоты на executors, ограничение дорогих SQL‑паттернов.

См. также: https://cloudcompute.ru/solutions/security/https://cloudcompute.ru/solutions/storage-data/

Траблшутинг

Симптом

Возможная причина

Решение

В плане много NOT_ON_GPU

UDF/неподдерж. выражения/типы

Переписать на встроенные функции, привести типы, отключить проблемные шаги

Низкая утилизация GPU

Малые batch/узкие партиции

Увеличить maxPartitionBytes, concurrentGpuTasks, батч‑размеры

Высокий spill на диск

Нехватка HBM/host‑пула

rmm.allocFraction↑, pinnedPool↑, уменьшить конкуренцию на GPU

Долгие shuffle‑стадии

Нет UCX/узкая сеть/NVMe

Включить UCX, проверить NVMe/сеть, уменьшить shuffle.partitions после AQE

Перекос ключей (skew)

Немного «тяжёлых» ключей

AQE включить, salt‑техники, предварит. агрегация

OOM на драйвере/экзекьюторах

Узкая memoryOverhead/широкие строки

Увеличить overhead, привести типы к компактным (int/decimal), меньше колоночных взрывов

Резкое падение скорости на write

Много мелких файлов

repartition перед write, настроить row_group_size, группировать по партициям

Латентность растёт волнами

AQE/обновление плана/reuse

Поиграть advisoryPartitionSize, кэшировать стабильные источники

Больше паттернов: https://cloudcompute.ru/solutions/interruptible-patterns/ • тюнинг: https://cloudcompute.ru/solutions/performance-tuning/

Как запустить в cloudcompute.ru

  1. Откройте Шаблоны запусков: https://cloudcompute.ru/solutions/templates/ — выберите Spark RAPIDS (K8s) или Spark RAPIDS (Standalone).
  2. Выберите профиль GPU: 24/48/80 ГБ — по ширине строк/джойнам/окнам и целевому времени окна батча.
  3. Подключите диски: /nvme/spill и /nvme/datasets; объектное хранилище — по вашей схеме.
  4. Заполните spark-defaults.conf и spark-submit как в примерах (SQLPlugin, UCX‑shuffle, RMM/pinned‑pool, ресурсы GPU).
  5. Для продакшна: дашборды (GPU/Spill/Shuffle/EXPLAIN), алерты на p95/спилл/NOT_ON_GPU, пулы On‑Demand/Interruptible, канареечные апдейты.

Чек‑лист перед продом

  • spark.plugins=SQLPlugin, rapids.sql.enabled=true, UCX‑shuffle включён.
  • EXPLAIN=ALL чистый (нет критичных NOT_ON_GPU), UDF переписаны.
  • RMM‑пул 70–80% HBM, pinnedPool.size задан; spill на NVMe протестирован.
  • AQE включён, перекос ключей обработан (salt/agg).
  • p95 job/stage/shuffle в целевых пределах, GPU util стабильна.
  • Партиционирование и размер файлов при write откалиброваны.
  • Политики PII/ретеншн/шифрование внедрены.
  • Канареечный релиз/откат и шаблоны деплоя готовы.

Навигация