Решения

Распределённый I/O для обучения LLM: веб‑шарды и префетч

Цель страницы. Обеспечить стабильную подачу данных на GPU при обучении LLM на одном или нескольких узлах: веб‑шарды (tar), потоковая загрузка, локальный NVMe‑кэш, префетч и правильное распределение шардов по ранкам/воркерам.

TL;DR

  • Храните датасет в крупных tar‑шардах (0.5–2 ГБ) и стримьте их: это уменьшает накладные расходы FS и ускоряет чтение.
  • Делите шарды детерминированно по ранкам/воркерам и включайте prefetch + pinned memory.
  • Грейдите производительность через локальный NVMe‑кэш, последовательный I/O, минимум мелких файлов.

Отдавайте приоритет WebDataset/MDS; JSONL/Parquet оставляйте для прототипирования/стриминга.

Где «теряется» скорость: типичные узкие места

  • Миллионы мелких файлов → высокие накладные на метаданные и open/seek.
  • Все ранки читают одни и те же шарды → hot‑spot по сети/сторандже.
  • Недостаточный префетч и num_workersGPU простаивают.
  • Отсутствие локального кэша под популярные шарды → дублирующие скачивания.

Форматы и подходы

Web‑шарды (tar; «WebDataset‑подход»)

  • Каждый архив — набор сэмплов (напр., *.json, *.npy, *.txt), читается последовательным I/O.
  • Плюсы: простая шардировка, хорошо работает по HTTP/объектным хранилищам, легко кешируется на NVMe.

MDS/Streaming‑форматы

  • Оптимизированы под высокую пропускную способность и «рандомный» доступ к сэмплам; удобны при долгих тренировках с возобновлением.

JSONL/Parquet + стриминг

  • Удобно для быстрых прототипов, но к прод‑обучению лучше переходить на тар‑шарды/MDS.

Связанные разделы: /solutions/llm-training/datasets/, /solutions/storage-data/.

Шардирование корпуса: практические правила

  • Размер шара: 0.5–2 ГБ (чем медленнее сеть/диск — тем крупнее шарды).
  • Именование: data-{000000..000999}.tar — легко делить по диапазонам.
  • Манифест: CSV/JSON с полями url,size,nsamples,checksum; он нужен для детерминированного распределения.
  • Решардинг по эпохам: либо перетасовывайте список шардов, либо используйте режим resampled.

Распределение шардов по процессам и воркерам

Ниже — два рабочих паттерна.

Вариант A: «Режем список шардов по ранкам» (универсально)

				
					import os, math, random

world_size = int(os.environ.get("WORLD_SIZE", 1))
rank       = int(os.environ.get("RANK", 0))

# Пример: список URL-ов/путей к tar-шардам
urls = [f"https://storage/ds-{i:06d}.tar" for i in range(1000)]

# Детеминированная разбивка списка по ранкам
urls_rank = urls[rank::world_size]

# Для эпохи: перетасуйте КОПИЮ списка на seed(epoch)
def epoch_urls(epoch):
    rng = random.Random(epoch)
    u = urls_rank.copy()
    rng.shuffle(u)
    return u

				
			

Вариант B: «Дай‑плейн» с авто‑сплитом по нодам/воркерам

				
					import webdataset as wds

dataset = (
    wds.WebDataset("https://storage/ds-{000000..000999}.tar", resampled=True)
      .split_by_node()    # распределяет шарды по нодам/ранкам
      .split_by_worker()  # делит работу между DataLoader-воркерами
      .shuffle(1000)      # лёгкая перетасовка сэмплов
      .decode()           # нужные декодеры: .to_tuple("json","npy"), .decode("pil"), ...
)

				
			

Оба подхода устраняют «гонку за одними и теми же файлами» и равномерно грузят сторадж.

Префетч и DataLoader‑настройки

PyTorch DataLoader (рекомендовано как минимум):

				
					loader = torch.utils.data.DataLoader(
    dataset, batch_size=batch_size,
    num_workers=8, prefetch_factor=4, persistent_workers=True,
    pin_memory=True
)

				
			

Рекомендации:

  • Начните с num_workers = 4–8 на GPU; повышайте до упора по CPU/диску.
  • prefetch_factor = 2–8 (для длинных шагов — выше).
  • pin_memory=True и pinned host buffers ускоряют H2D‑копирование.
  • В Docker‑окружениях для устойчивости используйте multiprocessing_context=»forkserver» или spawn (если есть нестандартные расширения).

 

Локальный NVMe‑кэш (warm‑cache)

Идея: стримить новый шард напрямую в локальный файл, затем отдавать из него — и на следующей эпохе читать с NVMe.

				
					import os, requests

CACHE_DIR = "/nvme/webcache"
os.makedirs(CACHE_DIR, exist_ok=True)

def localize(url):
    name = url.rsplit("/", 1)[1]
    path = os.path.join(CACHE_DIR, name)
    if not os.path.exists(path):
        with requests.get(url, stream=True, timeout=60) as r, open(path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8<<20):  # 8 МБ блоки
                if chunk: f.write(chunk)
    return path

# Перед созданием датасета замените URL на локальные пути:
urls_rank_local = [localize(u) for u in urls_rank]

				
			

Плюсы: меньше сетевой нагрузки, стабильные времена эпох.
Важно: чистите кэш по TTL/лимиту диска, держите контрольные суммы.

Потоковая подача сэмплов из тар‑шардов

Пример конвейера: из шара → парсим сэмпл → батчим.

				
					import webdataset as wds

dataset = (
    wds.WebDataset(urls_rank_local, resampled=True)
      .shuffle(1000)
      .to_tuple("json", "tokens.npy")  # пример: пары (метаданные, токены)
)

# Батчинг переменной длины: упаковывать или паддить в коллаторе
def collate(batch):
    metas, token_arrays = zip(*batch)
    # Реализуйте padding/packing под ваш seq_len
    ...

loader = wds.WebLoader(dataset, batch_size=batch_size, num_workers=8, prefetch_factor=4,
                       pin_memory=True, persistent_workers=True, collate_fn=collate)

				
			

Справедливость и повторяемость эпох

  • Детерминированно перемешивайте список шардов по seed(epoch).
  • Храните манифест и состояние самплера (последний шард/смещение).
  • Для resampled‑режима фиксируйте набор генераторов/сидов, чтобы рестарты давали сопоставимые метрики.

Расчёт требуемой пропускной способности

Оцените, сколько байт в секунду нужно подать на все GPU.

  • Пусть L — длина контекста (токенов), μ — микро‑батч на GPU, acc — grad accumulation, G — world size.
  • Токенов за шагL × μ × acc × G.
  • Для int32‑токенов и масок берите ~5–8 байт на токен (включая служебные массивы/паддинг).
  • При 200 к ток/с поток данных может достигать 1–1.6 ГБ/с на кластер — без учёта сжатия.

Вывод: последовательный I/O + локальный кэш критичны.

Валидация и мониторинг I/O

  • Логируйте долю простоев GPU и время загрузки батча; целевой простой — <5–10%.
  • Стройте дашборд: «tokens/sec», «it/sec», «avg batch load time», «NVMe read MB/s», «network MB/s».
  • Включайте алерты при падении it/sec и росте batch load time.

Связанные разделы: /solutions/monitoring-logging/, /solutions/performance-tuning/.

Типовые конфигурации

A) 1×GPU (LoRA‑SFT, локальный NVMe)

  • JSONL/Parquet стриминг или тар‑шарды.
  • num_workers=4–8, prefetch_factor=4, pin_memory=True.
  • Локальный кэш обязателен при сетевых датасетах.

B) 1 узел, 8×GPU (FSDP/ZeRO)

  • Тар‑шарды + split_by_node().split_by_worker().
  • NVMe‑кэш; мониторинг сети/диска.
  • Перемешивание шардов по эпохам.

C) 2–8 узлов (multi‑node)

  • Манифест и детерминированная разбивка диапазонов по node_rank.
  • Кэш на каждом узле; избегайте кросс‑узловых чтений.
  • Проверка «справедливой» доли шардов на узел/эпоху.

Траблшутинг

  • GPU «голодают». Увеличьте num_workers/prefetch_factor, включите pinned memory, переключитесь на тар‑шарды/последовательный I/O, добавьте NVMe‑кэш.
  • «Too many open files». Увеличьте ulimit -n, укрупните шарды, уменьшите параллелизм открытия.
  • Случайные фризы. Проверьте таймауты сети, увеличьте размер читаемых блоков (≥4–8 МБ), исключите проблемные шарды (повреждённые .tar).
  • Дублирующее чтение. Проверьте, что деление по ранкам/воркерам работает (используйте split_by_node/worker или ручную разбивку списка).
  • Случайный порядок при рестарте. Фиксируйте seed(epoch) и сохраняйте состояние самплера.

Как это запускать в cloudcompute.ru

Чек‑лист перед стартом

  • Датасет в тар‑шардах (0.5–2 ГБ) + манифест.
  • Детеминированное деление шардов по ранкам/воркерам.
  • NVMe‑кэш на узлах; есть политика ротации/TTL.
  • prefetch + pinned memory включены; num_workers подобран экспериментально.
  • Логируются «tokens/sec», «it/sec», «batch load time», нагрузка диска/сети.
  • План на рестарт (состояние самплера/эпохи), проверка «холодного» запуска.