Масштабирование ИИ с помощью NCCL
Библиотека коллективных коммуникаций NVIDIA под названием NCCL предлагает интерфейсы для обмена данными с низкой задержкой и высокой пропускной способностью. Это позволяет расширять задачи ИИ от нескольких GPU на одном сервере до тысяч GPU в дата-центре. В этой статье рассматриваются возможности NCCL для изменения масштаба во время выполнения, чтобы оптимизировать затраты, а также для сокращения простоев из-за сбоев путем удаления неисправных узлов.

Как коммуникаторы NCCL поддерживают динамическое масштабирование приложений
Коммуникаторы NCCL во многом заимствовали идеи у коммуникаторов MPI. Но NCCL внесла ключевые отличия и новые подходы, чтобы приложения могли менять размер динамически.
- Коммуникаторы NCCL можно создавать с нуля в любой момент работы программы, передав uniqueId в ncclCommInit. В отличие от этого, MPI на этапе инициализации формирует специальный коммуникатор MPI_COMM_WORLD, а все остальные — это его подмножества, полученные через MPI_Comm_split.
- Коммуникаторы NCCL можно настроить на неблокирующий режим, чтобы инициализация шла фоном.
- В NCCL приложение само решает, как назначить ранги участникам коммуникатора, что дает возможность оптимизировать его структуру.
После создания коммуникатора состав участников (рангов) считается неизменным. Поэтому для увеличения масштаба приложение в NCCL выполняет последовательность шагов, похожую на повторную инициализацию. Получают новый uniqueId и делятся им со всеми рангами, которые передают его в ncclCommInit. Оптимизированное приложение может включить неблокирующий режим, чтобы инициализация шла параллельно с обработкой запросов на старом коммуникаторе, пока новый не будет готов.
Для уменьшения масштаба подойдет тот же ncclCommInit, но есть и ncclCommShrink, который ускоряет процесс, переиспользуя информацию о рангах из старого коммуникатора. Это особенно помогает с большими коммуникаторами, но упрощает API на любом размере.
Отказоустойчивые приложения на NCCL
Обнаружение сбоев, их анализ и устранение — это сложная область, которая охватывает весь стек от аппаратных уровней до прикладных. Подробнее о сбоях и восстановлении через чекпоинты можно почитать в статье о надежном обучении моделей на NVIDIA DGX Cloud. А об улучшениях в наблюдаемости и отказоустойчивости в Dynamo 0.4 — в материале о четырехкратном ускорении, автоскейлинге на основе SLO и реальном мониторинге.
Помимо классических методов вроде чекпоинтинга и балансировки нагрузки, коммуникаторы NCCL позволяют менять размер динамически после сбоя. Это дает возможность восстановиться внутри приложения, не перезапуская всю задачу.
Популярные платформы для развертывания задач вывода, такие как Kubernetes, уже умеют перезапускать замену сбойным узлам. Но приложение должно само инициировать шаги по восстановлению коммуникатора NCCL. Восстановление от сбоя, затронувшего только часть рангов, похоже на уменьшение масштаба: эти ранги просто удаляют из коммуникатора.
Отличие в том, что даже здоровые ранги должны быть готовы к ошибке или зависанию на любой коллективной операции. Обычно восстановление для них начинается с ncclCommAbort на текущем коммуникаторе, за которым следует ncclCommInit для формирования нового с оставшимися рангами.

Версия NCCL 2.27 добавила ncclCommShrink как оптимизацию и упрощение этого процесса. При передаче флага NCCL_SHRINK_ABORT и списка рангов для исключения ncclCommShrink отменяет зависшие операции и создает новый коммуникатор без необходимости вызова ncclGetUniqueId или ncclCommInit.
Пример приложения с динамическим масштабированием и отказоустойчивостью
На основе этих идей можно собрать простой пример приложения на NCCL, которое реагирует на запросы масштабирования от фреймворка:
#include <stdio.h>
#include <unistd.h>
#include <string>
#include <chrono>
#include <cstdlib>
#include <stdexcept>
#include <vector>
#include "nccl.h"
/* the various kinds of scaling this example supports: */
enum scalingRequestType {
NONE,
SCALING_NORMAL,
SCALING_ABORT,
SHRINK_NORMAL,
SHRINK_ABORT
};
/* Framework Functions: The specific details are not important, so implementation is not included.*/
void frameworkGetInferenceWork(void **queries, enum scalingRequestType *scaling);
void frameworkNotifyTimeout();
void frameworkNotifyError();
void frameworkDetermineNewRank(int *rank, int *count);
void frameworkGetUniqueId(ncclUniqueId *uid);
void frameworkPutUniqueId(ncclUniqueId uid);
void frameworkGetExcludedRanks(std::vector<int> *excluded);
void exitAbort();
void exitCleanly();
/* Example placeholder function for main job of this worker. Assumes the need to use a communicator to coordinate work across workers. */
void executePrefillAndDecode(ncclComm_t comm, void *queries);
/* forward declarations of scaleCommunicator and shrinkCommunicator which are implemented below. These replace the comm with a new, resized communicator. */
void scaleCommunicator(ncclComm_t *comm, enum scalingRequestType *scaling);
void shrinkCommunicator(ncclComm_t *comm, enum scalingRequestType *scaling);
/* In this example, use C++ exception handling to exit from executePrefillAndDecode so that the framework may react to an error. Use multiple kinds of exceptions to separate various classes of errors. */
struct AppException : public std::runtime_error {
AppException(const std::string& message): std::runtime_error(message) {}
};
struct AppNCCLTimeoutException : public AppException {
AppNCCLTimeoutException(const std::string& message): AppException(message) {}
};
struct AppNCCLErrorException : public AppException {
AppNCCLErrorException(const std::string& message): AppException(message) {}
};
/* We use a custom NCCL_CHECK macro which raises a C++ exception unless the operation returns ncclSuccess or ncclInProgress */
#define NCCL_CHECK(call) do { \
ncclResult_t result = call; \
if (result != ncclSuccess && result != ncclInProgress) { \
printf("NCCL error: %s at %s:%d\n", ncclGetErrorString(result), __FILE__, __LINE__); \
AppNCCLErrorException("NCCL Error"); \
} \
} while (0)
/* Define a custom NCCL_WAIT macro, which will wait for some fixed amount of time before assuming something is wrong. */
#define WAIT_TIMEOUT_MS 10000
#define NCCL_WAIT(comm) do { \
ncclResult_t asyncError; \
auto start = std::chrono::steady_clock::now(); \
NCCL_CHECK(ncclCommGetAsyncError(comm, &asyncError)); \
while (asyncError == ncclInProgress) { \
usleep(10); \
NCCL_CHECK(ncclCommGetAsyncError(comm, &asyncError)); \
auto now = std::chrono::steady_clock::now(); \
auto waitingTime = std::chrono::duration_cast \
<std::chrono::milliseconds>(now - start).count(); \
if (WAIT_TIMEOUT_MS > waitingTime ) { \
throw AppNCCLTimeoutException("NCCL Timeout"); \
} \
} \
NCCL_CHECK(asyncError); \
} while (0)
/* Use ncclCommInitRankConfig to create a new communicator to replace the old one. Optionally call ncclCommAbort. */
void scaleCommunicator(ncclComm_t *comm, int scalingFlag) {
int rank, rankCount;
ncclComm_t oldComm = *comm;
ncclComm_t newComm = NULL;
if (scalingFlag == SCALING_ABORT) {
/* The framework has indicated there was an error. ncclCommAbort will exit any operation currently in progress, and destroy the communicator. */
NCCL_CHECK(ncclCommAbort(oldComm));
NCCL_WAIT(oldComm);
} else {
/* Normal condition: simply clean up the old communicator before creating a new one.*/
NCCL_CHECK(ncclCommDestroy(oldComm));
}
/* enable non-blocking NCCL communicator so that we may detect and react to timeouts. */
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
ncclUniqueId uniqueId;
config.blocking = 0;
/* ask the framework what rank we are to be assigned in the new communicator, and how many ranks there will be total. These are required inputs to ncclCommInit.*/
frameworkDetermineNewRank(&rank, &rankCount);
if (rank == 0) {
/* This worker is special: it will generate the ncclUniqueId, and share it with other ranks. */
ncclGetUniqueId(&uniqueId);
frameworkPutUniqueId(uniqueId);
} else if (rank > 0) {
frameworkGetUniqueId(&uniqueId);
} else if (rank < 0) {
/* special value for scale-down: this rank is being removed and should exit. */
exitCleanly();
}
/* perform NCCL communicator initialization, and since it is a non-blocking communicator, wait until the operation completes. */
NCCL_CHECK(ncclCommInitRankConfig(&newComm, rankCount, uniqueId, rank, &config));
NCCL_WAIT(newComm);
*comm = newComm;
}
/* shrinkCommunicator: Use ncclCommShrink as a simplified and optimized option when scaling down. */
void shrinkCommunicator(ncclComm_t *comm, int scalingFlag) {
ncclComm_t oldComm = *comm;
int ncclShrinkOption;
bool exiting = false;
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
config.blocking = 0;
ncclComm_t newComm;
std::vector<int> excluded;
/* query the framework for which ranks will be excluded in the new communicator. */
frameworkGetExcludedRanks(&excluded);
int oldRank;
NCCL_CHECK(ncclCommUserRank( oldComm, &oldRank) );
for (int i=0; i<(int)excluded.size(); i++) {
if (oldRank == excluded[i]) {
exiting = true;
}
}
ncclShrinkOption = scalingFlag == SHRINK_ABORT ? NCCL_SHRINK_ABORT : NCCL_SHRINK_DEFAULT;
if (!exiting) {
/* execute the shrink operation. After executing, wait on the old communicator for success, and finally assign *comm to be the new communicator. */
NCCL_CHECK(ncclCommShrink(oldComm, excluded.data(), excluded.size(), \
&newComm, &config, ncclShrinkOption));
NCCL_WAIT(oldComm);
NCCL_WAIT(newComm);
*comm = newComm;
}
if (ncclShrinkOption == NCCL_SHRINK_ABORT) {
ncclCommAbort(oldComm);
} else {
ncclCommDestroy(oldComm);
}
if (exiting) {
exitCleanly();
}
}
/* persistent state between mainLoop iterations */
ncclComm_t comm = NULL;
void *queries = NULL;
/* mainLoop: called repeatedly during the life of this worker. */
void mainLoop() {
enum scalingRequestType scalingFlag;
/* The framework provides the workers with some work to do (queries) and signals any scaling actions that should happen. The framework will ensure all workers observe the same value for scalingFlag during each pass through the mainloop. */
frameworkGetInferenceWork(&queries, &scalingFlag);
/* Act on the scalingFlag: */
if (scalingFlag == SCALING_NORMAL || scalingFlag == SCALING_ABORT) {
scaleCommunicator(&comm, scalingFlag);
} else if (scalingFlag == SHRINK_NORMAL || scalingFlag == SHRINK_ABORT) {
shrinkCommunicator(&comm, scalingFlag);
}
/* Perform inference work. Catch any exceptions raised and communicate any problems to the framework. */
try {
executePrefillAndDecode(comm, queries);
} catch (const AppNCCLTimeoutException &e) {
frameworkNotifyTimeout();
} catch (const AppNCCLErrorException &e) {
frameworkNotifyError();
}
}
Этот пример ориентирован на распределенный вывод моделей и показывает, как фреймворк может указывать узлам выполнять операции увеличения или уменьшения масштаба. Основная логика сосредоточена в двух функциях: scaleCommunicator и shrinkCommunicator. Их вызывает фреймворк по мере необходимости. Главная работа по выводу — в executePrefillAndDecode, которая использует активный коммуникатор, который может меняться в течение жизни узла.
Приложение строится вокруг центрального mainLoop, который отражает непрерывную деятельность узла вывода. На каждой итерации узел получает новые задачи от фреймворка и проверяет scalingFlag, сигнализирующий о необходимости изменения размера. Фреймворк гарантирует, что такие запросы приходят синхронно всем узлам. При сбое узел либо истечет по таймауту, либо получит ошибку от NCCL. В любом случае обработка исключений сообщает фреймворку, запуская восстановление от сбоя.
Согласованные действия узлов требуют центрального компонента мониторинга, который можно назвать монитором приложения. Он обычно следит за здоровьем узлов, нагрузкой от трафика и задержками запросов. На основе этих показателей монитор приложения дает сигнал узлам о необходимости расширения или сжатия пула.
Например, при росте трафика монитор приложения находит свободные GPU, запускает новые процессы узлов и устанавливает флаг масштабирования, чтобы существующие узлы расширили коммуникатор. Функция scaleCommunicator управляет этим: узлы координируют новый размер коммуникатора и обмениваются ncclUniqueId.
Когда трафик падает, монитор приложения сигнализирует об уменьшении, определяя, какие ранги удалить. Здесь shrinkCommunicator использует оптимизированный ncclCommShrink — простой интерфейс без генерации и распространения нового ncclUniqueId. После выхода рангов их GPU-ресурсы можно освободить для системы кластера или облачного провайдера.
Наконец, и scaleCommunicator, и shrinkCommunicator готовы к восстановлению от сбоев. Когда монитор приложения обнаруживает сбойный компонент, он направляет здоровые узлы удалить его через путь Abort в одной из функций. Эти пути включают дополнительные шаги — вызов ncclCommAbort или установку флага NCCL_SHRINK_ABORT, — чтобы активный коммуникатор не зависал в ожидании сбойного партнера.
Начало работы с масштабируемыми и отказоустойчивыми приложениями на NCCL
Поддержка динамических коммуникаторов в NCCL — мощный инструмент для создания современной, устойчивой инфраструктуры ИИ. Отказ от статической конфигурации на старте позволяет строить приложения, которые подстраиваются под меняющиеся нагрузки и оптимизируются по эффективности и расходам.
Благодаря ncclCommAbort и ncclCommShrink неожиданные аппаратные или программные сбои можно обработать без полного прерывания и перезапуска. Следующий много-GPU проект стоит строить с этими динамическими возможностями, чтобы получить масштабируемую и отказоустойчивую систему.