Решения
Распределённый I/O для обучения LLM: веб‑шарды и префетч
Цель страницы. Обеспечить стабильную подачу данных на GPU при обучении LLM на одном или нескольких узлах: веб‑шарды (tar), потоковая загрузка, локальный NVMe‑кэш, префетч и правильное распределение шардов по ранкам/воркерам.
TL;DR
- Храните датасет в крупных tar‑шардах (0.5–2 ГБ) и стримьте их: это уменьшает накладные расходы FS и ускоряет чтение.
- Делите шарды детерминированно по ранкам/воркерам и включайте prefetch + pinned memory.
- Грейдите производительность через локальный NVMe‑кэш, последовательный I/O, минимум мелких файлов.
Отдавайте приоритет WebDataset/MDS; JSONL/Parquet оставляйте для прототипирования/стриминга.
Где «теряется» скорость: типичные узкие места
- Миллионы мелких файлов → высокие накладные на метаданные и open/seek.
- Все ранки читают одни и те же шарды → hot‑spot по сети/сторандже.
- Недостаточный префетч и num_workers → GPU простаивают.
- Отсутствие локального кэша под популярные шарды → дублирующие скачивания.
Форматы и подходы
Web‑шарды (tar; «WebDataset‑подход»)
- Каждый архив — набор сэмплов (напр., *.json, *.npy, *.txt), читается последовательным I/O.
- Плюсы: простая шардировка, хорошо работает по HTTP/объектным хранилищам, легко кешируется на NVMe.
MDS/Streaming‑форматы
- Оптимизированы под высокую пропускную способность и «рандомный» доступ к сэмплам; удобны при долгих тренировках с возобновлением.
JSONL/Parquet + стриминг
- Удобно для быстрых прототипов, но к прод‑обучению лучше переходить на тар‑шарды/MDS.
Связанные разделы: /solutions/llm-training/datasets/, /solutions/storage-data/.

Шардирование корпуса: практические правила

- Размер шара: 0.5–2 ГБ (чем медленнее сеть/диск — тем крупнее шарды).
- Именование: data-{000000..000999}.tar — легко делить по диапазонам.
- Манифест: CSV/JSON с полями url,size,nsamples,checksum; он нужен для детерминированного распределения.
- Решардинг по эпохам: либо перетасовывайте список шардов, либо используйте режим resampled.
Распределение шардов по процессам и воркерам
Ниже — два рабочих паттерна.
Вариант A: «Режем список шардов по ранкам» (универсально)
import os, math, random
world_size = int(os.environ.get("WORLD_SIZE", 1))
rank = int(os.environ.get("RANK", 0))
# Пример: список URL-ов/путей к tar-шардам
urls = [f"https://storage/ds-{i:06d}.tar" for i in range(1000)]
# Детеминированная разбивка списка по ранкам
urls_rank = urls[rank::world_size]
# Для эпохи: перетасуйте КОПИЮ списка на seed(epoch)
def epoch_urls(epoch):
rng = random.Random(epoch)
u = urls_rank.copy()
rng.shuffle(u)
return u
Вариант B: «Дай‑плейн» с авто‑сплитом по нодам/воркерам
import webdataset as wds
dataset = (
wds.WebDataset("https://storage/ds-{000000..000999}.tar", resampled=True)
.split_by_node() # распределяет шарды по нодам/ранкам
.split_by_worker() # делит работу между DataLoader-воркерами
.shuffle(1000) # лёгкая перетасовка сэмплов
.decode() # нужные декодеры: .to_tuple("json","npy"), .decode("pil"), ...
)
Оба подхода устраняют «гонку за одними и теми же файлами» и равномерно грузят сторадж.
Префетч и DataLoader‑настройки
PyTorch DataLoader (рекомендовано как минимум):
loader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size,
num_workers=8, prefetch_factor=4, persistent_workers=True,
pin_memory=True
)
Рекомендации:
- Начните с num_workers = 4–8 на GPU; повышайте до упора по CPU/диску.
- prefetch_factor = 2–8 (для длинных шагов — выше).
- pin_memory=True и pinned host buffers ускоряют H2D‑копирование.
- В Docker‑окружениях для устойчивости используйте multiprocessing_context=»forkserver» или spawn (если есть нестандартные расширения).


Локальный NVMe‑кэш (warm‑cache)
Идея: стримить новый шард напрямую в локальный файл, затем отдавать из него — и на следующей эпохе читать с NVMe.
import os, requests
CACHE_DIR = "/nvme/webcache"
os.makedirs(CACHE_DIR, exist_ok=True)
def localize(url):
name = url.rsplit("/", 1)[1]
path = os.path.join(CACHE_DIR, name)
if not os.path.exists(path):
with requests.get(url, stream=True, timeout=60) as r, open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=8<<20): # 8 МБ блоки
if chunk: f.write(chunk)
return path
# Перед созданием датасета замените URL на локальные пути:
urls_rank_local = [localize(u) for u in urls_rank]
Плюсы: меньше сетевой нагрузки, стабильные времена эпох.
Важно: чистите кэш по TTL/лимиту диска, держите контрольные суммы.
Потоковая подача сэмплов из тар‑шардов
Пример конвейера: из шара → парсим сэмпл → батчим.
import webdataset as wds
dataset = (
wds.WebDataset(urls_rank_local, resampled=True)
.shuffle(1000)
.to_tuple("json", "tokens.npy") # пример: пары (метаданные, токены)
)
# Батчинг переменной длины: упаковывать или паддить в коллаторе
def collate(batch):
metas, token_arrays = zip(*batch)
# Реализуйте padding/packing под ваш seq_len
...
loader = wds.WebLoader(dataset, batch_size=batch_size, num_workers=8, prefetch_factor=4,
pin_memory=True, persistent_workers=True, collate_fn=collate)
Справедливость и повторяемость эпох
- Детерминированно перемешивайте список шардов по seed(epoch).
- Храните манифест и состояние самплера (последний шард/смещение).
- Для resampled‑режима фиксируйте набор генераторов/сидов, чтобы рестарты давали сопоставимые метрики.
Расчёт требуемой пропускной способности
Оцените, сколько байт в секунду нужно подать на все GPU.
- Пусть L — длина контекста (токенов), μ — микро‑батч на GPU, acc — grad accumulation, G — world size.
- Токенов за шаг ≈ L × μ × acc × G.
- Для int32‑токенов и масок берите ~5–8 байт на токен (включая служебные массивы/паддинг).
- При 200 к ток/с поток данных может достигать 1–1.6 ГБ/с на кластер — без учёта сжатия.
Вывод: последовательный I/O + локальный кэш критичны.
Валидация и мониторинг I/O
- Логируйте долю простоев GPU и время загрузки батча; целевой простой — <5–10%.
- Стройте дашборд: «tokens/sec», «it/sec», «avg batch load time», «NVMe read MB/s», «network MB/s».
- Включайте алерты при падении it/sec и росте batch load time.
Связанные разделы: /solutions/monitoring-logging/, /solutions/performance-tuning/.
Типовые конфигурации
A) 1×GPU (LoRA‑SFT, локальный NVMe)
- JSONL/Parquet стриминг или тар‑шарды.
- num_workers=4–8, prefetch_factor=4, pin_memory=True.
- Локальный кэш обязателен при сетевых датасетах.
B) 1 узел, 8×GPU (FSDP/ZeRO)
- Тар‑шарды + split_by_node().split_by_worker().
- NVMe‑кэш; мониторинг сети/диска.
- Перемешивание шардов по эпохам.
C) 2–8 узлов (multi‑node)
- Манифест и детерминированная разбивка диапазонов по node_rank.
- Кэш на каждом узле; избегайте кросс‑узловых чтений.
- Проверка «справедливой» доли шардов на узел/эпоху.
Траблшутинг
- GPU «голодают». Увеличьте num_workers/prefetch_factor, включите pinned memory, переключитесь на тар‑шарды/последовательный I/O, добавьте NVMe‑кэш.
- «Too many open files». Увеличьте ulimit -n, укрупните шарды, уменьшите параллелизм открытия.
- Случайные фризы. Проверьте таймауты сети, увеличьте размер читаемых блоков (≥4–8 МБ), исключите проблемные шарды (повреждённые .tar).
- Дублирующее чтение. Проверьте, что деление по ранкам/воркерам работает (используйте split_by_node/worker или ручную разбивку списка).
- Случайный порядок при рестарте. Фиксируйте seed(epoch) и сохраняйте состояние самплера.
Как это запускать в cloudcompute.ru
- В разделе /solutions/templates/ используйте шаблоны:
— Data Prep (Web‑shards) — сборка шардов, манифест, проверка checksum.
— Torch DDP I/O — готовый конвейер: split_by_node/worker, NVMe‑кэш, префетч, pinned memory, метрики загрузки. - Хранение и перенос больших корпусов: см. /solutions/storage-data/.
- Резервирование и устойчивость к прерываниям: /solutions/llm-training/checkpointing/.
- Предобучение и масштабирование: /solutions/llm-training/pretraining/, /solutions/llm-training/fsdp-deepspeed/.
Чек‑лист перед стартом
- Датасет в тар‑шардах (0.5–2 ГБ) + манифест.
- Детеминированное деление шардов по ранкам/воркерам.
- NVMe‑кэш на узлах; есть политика ротации/TTL.
- prefetch + pinned memory включены; num_workers подобран экспериментально.
- Логируются «tokens/sec», «it/sec», «batch load time», нагрузка диска/сети.
- План на рестарт (состояние самплера/эпохи), проверка «холодного» запуска.