Граф‑аналитика на GPU: PageRank и сообщества
Задача страницы. Дать инженерный каркас для запуска граф‑алгоритмов на облачных GPU (cuGraph/RAPIDS): PageRank, поиск сообществ (Louvain/Leiden), компоненты связности, треугольники, centrality‑метрики, а также много‑GPU/многоузловые пайплайны с Dask‑CUDA. Разберём архитектуры, конфиги, метрики, экономику (edges_per_s, Cost_per_1B_edges), устойчивость к прерываниям и требования к I/O.
TL;DR
- Формат данных и I/O критичны. Читайте рёбра из Parquet/CSV в cuDF, делайте renumbering до int32 и строите CSR/CSC на NVMe‑префетченном датасете. См. https://cloudcompute.ru/solutions/storage-data/
- **Масштабирование. — Один узел: 1 процесс = 1 GPU, NVLink/NVSwitch, RMM‑пул. — Много узлов: Dask‑CUDA + UCX/UCX‑Py (RDMA при наличии), shuffle по GPU. См. https://cloudcompute.ru/solutions/multi-gpu/
- Interruptible‑паттерн. Долгие расчёты режем на итерации/снапшоты ≤ 120 с с чекпоинтом статуса алгоритма + детерминированным разбиением. См. https://cloudcompute.ru/solutions/interruptible-patterns/
- Профили GPU. 24 ГБ (Compact) — средние графы/один GPU; 48 ГБ (Balanced) — основной профиль; 80 ГБ (HQ) — крупные графы/много‑GPU; для экстремальной плотности — MIG. См. https://cloudcompute.ru/solutions/mig/
- Наблюдаемость. edges_per_s, iter_time_seconds{pagerank,louvain}, shuffle_bytes, rmm_pool_free_bytes, gpu_utilization, p50/p95. См. https://cloudcompute.ru/solutions/monitoring-logging/
- Экономика. Cost_per_1B_edges = (c_gpu / U) * (1e9 / edges_per_s) / 3600 * N_gpu * r + Storage_cost. См. https://cloudcompute.ru/solutions/cost-planner/
Сценарии (когда это нужно)
- PageRank/Personalized PR (PPR). Релевантность/ранжирование, телеком/маркетплейсы/поисковые задачи.
- Сообщества и иерархии. Louvain/Leiden, modularity‑оптимизация, обнаружение кластеров в соц‑/трафик‑/био‑графах.
- Компоненты связности/кол-во треугольников/топ‑k degree. Быстрые топологические сводки.
- SSSP/BFS/Jaccard. Локальная связность/сходство, рекомендации.
- Потоки/микро‑батчи. Периодические перерасчёты PageRank/сообществ на обновлениях графа.
- Гибриды с ETL/ML. Подготовка графа RAPIDS‑ETL → аналитика → экспорт фич для GNN/ML. См. https://cloudcompute.ru/solutions/rapids/ и https://cloudcompute.ru/solutions/graph-ml/
Архитектуры/пайплайны A) Один узел, multi‑GPU (NVLink) — быстрый PageRank/Louvain
[Warm Storage: Parquet/CSV] -> [Prefetch -> NVMe] -> [cuDF ingest] -> [renumber int32]
| |
v v
[CSR/CSC] -------> [cuGraph PR/Louvain]
| |
+-----> [Metrics/logs] -> [Publish -> Warm/Cold]
Ключи: RMM‑пул памяти, NVMe‑кэш для ingest, фиксированные чанки итераций для безопасных рестартов.
B) Много узлов (Dask‑CUDA + UCX)
[Edge Parquet partitioned] -> [Dask-cudf read_parquet] -> [distributed renumber] -> [mgGraph build]
| | | |
+--> [UCX/Shuffle] <-------------+----------------------------+--> [mgPageRank/mgLouvain] -> [Reduce/Export]
Ключи: UCX/UCX‑Py, RDMA при наличии; баланс партиций; контроль shuffle_bytes и task_inflight.
C) Потоковые обновления (микро‑батчи)
[Kafka/S3 micro-batches] -> [Append edges] -> [Incremental recompute (≤120 s)] -> [Checkpoints] -> [Serve metrics]
Ключи: инкрементальные окна, снапшоты графа, SLA на p95 iter_time_seconds.
Профили GPU/VRAM/скоростей
| **Профиль** | **VRAM** | **Объёмы графов/задачи** | **Режим** | **Отн. throughput (≈)** | **Примечания** |
| **24 ГБ (Compact)** | 24 ГБ | До ~сотен млн рёбер (сжатые ID), single‑GPU | On‑Demand / Interruptible | 1.0 | Плотность ↑ при **MIG** и batch‑перерасчётах |
| **48 ГБ (Balanced)** | 48 ГБ | Сотни млн–1‑2 млрд рёбер (multi‑GPU) | Оба | 1.5–1.8 | Базовый профиль кластеров PR/Louvain |
| **80 ГБ (HQ)** | 80 ГБ | 1–3+ млрд рёбер (multi‑GPU/multi‑node) | Оба | 2.2–2.6 | Запас под shuffle/ренумерацию/CSR |
Индексы относительны; фактическая скорость зависит от формата, степени, параметров алгоритма и сети.
Конфиги и скелеты
1) Один GPU: PageRank и Louvain (Python)
import cudf, cugraph, rmm
rmm.reinitialize(pool_allocator=True, initial_pool_size=8<<30) # ~8 ГБ пул
# Чтение рёбер
df = cudf.read_parquet("/nvme/edges.parquet")[["src","dst"]] # int64/str допустимо
# Ренумерация (сжатие ID -> int32)
G = cugraph.Graph(directed=True)
G.from_cudf_edgelist(df, source="src", destination="dst", renumber=True)
# PageRank
pr = cugraph.pagerank(G, alpha=0.85, max_iter=100, tol=1e-6)
# Сообщества (Louvain) — на неориентированном графе
UG = G.to_undirected()
parts, mod = cugraph.louvain(UG) # parts: vertex->community
# Топ-10 вершин по PR
top10 = pr.sort_values("pagerank", ascending=False).head(10)
top10.to_pandas().to_csv("/nvme/out/top10.csv", index=False)
2) Multi‑GPU/Multi‑node: Dask‑CUDA + cuGraph‑dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf as dcudf
import cugraph.dask as dcg
import rmm, os
# UCX/коммуникации (для многоузлового — UCX + RDMA)
os.environ.update({
"DASK_RMM__POOL_SIZE": "8GB",
"UCX_TLS": "rc,ud,sm,self,cuda_copy,cuda_ipc",
})
rmm.reinitialize(pool_allocator=True, initial_pool_size=8<<30)
cluster = LocalCUDACluster(protocol="ucx", enable_tcp_over_ucx=True) # внутри узла
client = Client(cluster)
# Распределённое чтение (разделённый Parquet)
ddf = dcudf.read_parquet("s3://warm/graph/edges/") # src, dst[, w]
G = dcg.Graph(directed=True)
G.from_dask_cudf_edgelist(ddf, source="src", destination="dst", renumber=True)
# mgPageRank
pr_ddf = dcg.pagerank(G, alpha=0.85, max_iter=50, tol=1e-6)
# mgLouvain
parts_ddf, mod = dcg.louvain(G.to_undirected())
# Сохранение
pr_ddf.compute().to_parquet("/nvme/out/pr/")
parts_ddf.compute().to_parquet("/nvme/out/communities/")
3) RMM/ENV, Docker Compose (один узел с несколькими GPU)
services:
rapids-worker:
image: registry.local/rapids:latest
deploy:
resources:
reservations:
devices: [{ capabilities: ["gpu"] }]
environment:
- DASK_RMM__POOL_SIZE=8GB
- UCX_TLS=rc,ud,sm,self,cuda_copy,cuda_ipc
- RMM_POOL_ALLOCATOR=1
volumes:
- /nvme:/nvme
command: >
bash -lc "python /opt/graph/run_mg_pagerank.py --input /nvme/edges/"
4) Slurm: кластер Dask‑CUDA (много узлов)
#!/bin/bash
#SBATCH -J graph_mg
#SBATCH -N 2
#SBATCH --gpus-per-node=4
#SBATCH -t 02:00:00
#SBATCH --exclusive
module load cuda
export UCX_TLS=rc,ud,sm,self,cuda_copy,cuda_ipc
export DASK_RMM__POOL_SIZE=8GB
# На узлах поднимаем dask-cuda-workers с protocol=ucx; scheduler — на head-узле
srun --mpi=pmix_v3 dask-cuda-worker --protocol ucx --rmm-pool-size 8GB --enable-tcp-over-ucx
5) YAML задания (универсально)
job:
name: "graph_analytics_pr_louvain_v1"
mode: "interruptible" # on-demand | interruptible
gpu_profile: "48GB-Balanced"
nodes: 2
gpus_per_node: 4
network: "rdma"
io:
prefetch: ["s3://warm/graph/edges/"]
scratch: "/nvme"
checkpoint: "/nvme/ckpt"
interval_seconds: 120
run:
entrypoint: "python"
args: ["/opt/graph/run_mg.py","--alpha","0.85","--tol","1e-6","--algo","pagerank,louvain"]
hooks:
on_start: ["python /scripts/prefetch.py"]
on_checkpoint: ["python /scripts/register_ckpt.py"]
on_failure: ["python /scripts/requeue.py --backoff"]
``` **Оценка VRAM и размеров**
Пусть V — число вершин, E — число рёбер. При renumbering к int32 и не взвешенных рёбрах:
VRAM\_edges ≈ E \* (id\_src\_4B + id\_dst\_4B) = E \* 8
VRAM\_offsets ≈ (V+1) \* ptr\_bytes
(ptr\_bytes=8 для CSR/CSC)
VRAM\_maps ≈ V \* (id\_orig\_bytes + id\_new\_bytes) (для обратного маппинга)
Overhead ≈ 10–30% (временные буферы/перестройка)
VRAM\_total ≈ (VRAM\_edges + VRAM\_offsets + VRAM\_maps) \* (1 + overhead)
Для взвешенных рёбер добавьте w\_bytes (обычно 4B).
## **Наблюдаемость/метрики/алерты**
Опорные хабы:[ https://cloudcompute.ru/solutions/monitoring-logging/](https://cloudcompute.ru/solutions/monitoring-logging/?utm_source=chatgpt.com) и[ https://cloudcompute.ru/solutions/llm-inference/observability/](https://cloudcompute.ru/solutions/llm-inference/observability/?utm_source=chatgpt.com)
**Метрики:**
- Производительность: edges\_per\_s, iter\_time\_seconds{pagerank,louvain}, levels\_bfs, triangles\_per\_s.
- Качество/конвергенция: pr\_residual, modularity, components\_count.
- GPU: gpu\_utilization, gpu\_memory\_used\_bytes, rmm\_pool\_free\_bytes, sm\_occupancy, gpu\_power\_watts.
- Dask/Shuffle/Сеть: shuffle\_bytes, ucx\_active\_connections, task\_inflight, task\_retries\_total.
- I/O: io\_read\_mb\_s, io\_write\_mb\_s, nvme\_usage\_bytes, cache\_hit\_ratio{tier="nvme"}.
**Алерты:**
- p95 iter\_time\_seconds вырос > X% к базовому профилю.
- gpu\_memory\_used\_bytes → потолок VRAM / rmm\_pool\_free\_bytes → 0 → OOM‑риск.
- shuffle\_bytes высокий при низком edges\_per\_s → дисбаланс партиций/узкая сеть.
- task\_retries\_total растёт → проблемы с устойчивостью/прерывания.
- io\_read\_mb\_s стабильно низкий при пустой кэше → префетч на NVMe не настроен.
## **Экономика и формулы**
См.[ https://cloudcompute.ru/solutions/cost-planner/](https://cloudcompute.ru/solutions/cost-planner/?utm_source=chatgpt.com) и <https://cloudcompute.ru/solutions/throughput-vs-latency/>
**Стоимость миллиард рёбер:**
Cost\_per\_1B\_edges = ((c\_gpu \* N\_gpu \* t\_1B) / 3600) / U \* r + Storage\_cost
t\_1B = 1e9 / edges\_per\_s
**Время задачи:**
T\_total ≈ (E\_total / (edges\_per\_s \* N\_gpu\_eff))
**Эффективность масштаба:**
Speedup(N) = T1 / TN
Scale\_eff(N) = Speedup(N) / N
Подбирайте N\_gpu, где Scale\_eff приемлема, а Cost\_per\_1B\_edges — минимальна.
## **Безопасность/политики**
См.[ https://cloudcompute.ru/solutions/security/](https://cloudcompute.ru/solutions/security/?utm_source=chatgpt.com)
- **Приватные графы/идентификаторы.** Разделение проектов, шифрование «на покое», маскирование PII на ingest.
- **Секреты.** Токены бакетов/каталогов — только из секрет‑хранилища; не писать в логи.
- **Изоляция сети.** Для кластера Dask — отдельные сети/ACL; RDMA‑пулы — отдельно.
- **Ретеншн.** Временные CSR/перекодировки — автоочистка; публикуем только агрегаты/модулярность/PR‑скор.
- **Аудит.** Кто запускал, какие версии контейнеров/пакетов и параметры использовал.
## **Практики производительности**
- **Renumbering и int32.** Сжимайте ID, держите CSR/CSC в GPU, избегайте строковых ID в горячем цикле.
- **RMM‑пул.** Инициализируйте rmm.reinitialize(pool\_allocator=True) с достаточным размером; избегайте дробления.
- **Баланс партиций.** Равномерные партиции по рёбрам; учитывайте «хабы» (высокая степень) — возможен таргетированный сплит.
- **Shuffle/UCX.** Включайте UCX и RDMA, следите за shuffle\_bytes; сжимайте при передаче, где возможно.
- **NVMe‑prefetch.** Сначала — локально на NVMe; затем ingest в cuDF; используйте бинарные форматы (Parquet).
- **Interruptible.** Снапшоты итераций PageRank/сообществ, периодическая запись статуса/кандидатов.
- **Формат вычислений.** FP32 достаточно для многих метрик; редукции — с компенсацией/FP64 при необходимости. См.[ https://cloudcompute.ru/solutions/performance-tuning/](https://cloudcompute.ru/solutions/performance-tuning/?utm_source=chatgpt.com)
**Траблшутинг (симптом → причина → решение)**
<table><tbody><tr><td>**Симптом**
</td><td>**Причина**
</td><td>**Решение**
</td></tr><tr><td>OOM / RMM out of memory
</td><td>Нет renumbering / фрагментация пула
</td><td>Включить renumbering; увеличить RMM\_POOL\_SIZE; перейти на 48–80 ГБ
</td></tr><tr><td>Низкий edges\_per\_s при полной загрузке
</td><td>Дисбаланс партиций / «хабы»
</td><td>Перепартиционирование по рёбрам; таргетированный split high‑degree
</td></tr><tr><td>Высокий shuffle\_bytes и низкий throughput
</td><td>Слабая сеть/нет RDMA
</td><td>Включить UCX+RDMA; увеличить NVLink‑локальность; уменьшить межузловые передачи
</td></tr><tr><td>Падения задач Dask
</td><td>Слишком крупные партиции/таймауты
</td><td>Уменьшить размер партиций; увеличитьtimeouts; ограничить tasks\_inflight
</td></tr><tr><td>Конвергенция PR медленная
</td><td>Порог tol мал/граф тяжёлый
</td><td>Увеличить tol; warm‑start из предыдущей оценки; ограничить max\_iter
</td></tr><tr><td>Louvain «прыгает» между разбиениями
</td><td>Нестабильная модульность/инициализация
</td><td>Фиксировать seed; запускать несколько прогонов; брать лучшее разбиение
</td></tr><tr><td>Высокая задержка ingest
</td><td>Чтение по сети/мелкие файлы
</td><td>Консолидировать в Parquet; префетч на NVMe; увеличить размер фрагментов
</td></tr></tbody></table>
## **Как запустить в cloudcompute.ru**
1. **Шаблоны:**[ https://cloudcompute.ru/solutions/templates/](https://cloudcompute.ru/solutions/templates/?utm_source=chatgpt.com)
- «RAPIDS/cuGraph (single/multi‑GPU)» — быстрый старт для PageRank/Louvain.
- «Dask‑CUDA Cluster» — многоузловой кластер с UCX/RDMA.
2. **Профиль GPU:** 24/48/80 ГБ по таблице выше; крупные графы и много‑GPU — 48–80 ГБ; высокая плотность мелких задач — 24 ГБ + **MIG**.
3. **Режим:
- SLA‑критично/интерактив — **On‑Demand**.
- Пакетные перерасчёты — **Interruptible** (снапшоты итераций ≤ 120 с). См. <https://cloudcompute.ru/solutions/interruptible-patterns/>
4. **Данные/I/O:** храните сырые рёбра в Parquet с партиционированием; ingest на **NVMe**; результаты/агрегаты — «тёплое/холодное». См.[ https://cloudcompute.ru/solutions/storage-data/](https://cloudcompute.ru/solutions/storage-data/?utm_source=chatgpt.com)
5. **Оркестрация/CI:** контейнеры с зафиксированными версиями, YAML‑спеки, smoke‑граф для регрессий. См. <https://cloudcompute.ru/solutions/containers-ci-cd/>
6. **Наблюдаемость:** дашборды edges\_per\_s, p95 iter\_time\_seconds, GPU/I‑O/Shuffle; алерты по росту p95 и падению throughput. См.[ https://cloudcompute.ru/solutions/monitoring-logging/](https://cloudcompute.ru/solutions/monitoring-logging/?utm_source=chatgpt.com)
**Чек‑лист перед продом**
- Данные в Parquet/CSV с партиционированием; ingest идёт через NVMe‑кэш.
- Renumbering включён; ID приведены к int32 где возможно.
- RMM‑пул настроен (размер/фрагментация); метрики rmm\_pool\_free\_bytes на дашборде.
- Dask/UCX и, при наличии, RDMA активны; партиции по рёбрам сбалансированы.
- Interruptible‑снапшоты итераций ≤ 120 с; сценарий рестарта проверен.
- Метрики/алерты заведены: edges\_per\_s, p95 iter\_time\_seconds, shuffle\_bytes, gpu\_utilization, io\_\*.
- Экономика посчитана: Cost\_per\_1B\_edges для профилей 24/48/80 ГБ и выбранного масштаба.
- Политики безопасности/ретенции применены; публикуются только агрегаты/результаты.
**Навигация**
- Хаб HPC: <https://cloudcompute.ru/solutions/hpc/>
- Родственные задачи:
- RAPIDS/ETL: <https://cloudcompute.ru/solutions/rapids/>
- Spark RAPIDS: <https://cloudcompute.ru/solutions/spark-rapids/>
- Граф‑ML (GNN): <https://cloudcompute.ru/solutions/graph-ml/>
- Линал (cuBLAS/cuSPARSE): <https://cloudcompute.ru/solutions/hpc/linear-algebra/>
- Сквозные:
- Multi‑GPU/топологии:[ https://cloudcompute.ru/solutions/multi-gpu/](https://cloudcompute.ru/solutions/multi-gpu/?utm_source=chatgpt.com) • MIG: <https://cloudcompute.ru/solutions/mig/>
- Данные/хранилище:[ https://cloudcompute.ru/solutions/storage-data/](https://cloudcompute.ru/solutions/storage-data/?utm_source=chatgpt.com)
- Контейнеры/CI/CD: <https://cloudcompute.ru/solutions/containers-ci-cd/>
- Мониторинг/логи:[ https://cloudcompute.ru/solutions/monitoring-logging/](https://cloudcompute.ru/solutions/monitoring-logging/?utm_source=chatgpt.com)
- Наблюдаемость:[ https://cloudcompute.ru/solutions/llm-inference/observability/](https://cloudcompute.ru/solutions/llm-inference/observability/?utm_source=chatgpt.com)
- Оптимизация вычислений:[ https://cloudcompute.ru/solutions/performance-tuning/](https://cloudcompute.ru/solutions/performance-tuning/?utm_source=chatgpt.com) • <https://cloudcompute.ru/solutions/throughput-vs-latency/>
- Interruptible‑паттерны: <https://cloudcompute.ru/solutions/interruptible-patterns/>
Готовы запустить?
Запустить GPU-сервер