作者|KIDGINBROOK
更新|潘丽晨

NCCL是英伟达开源的GPU通信库,支持集合通信和点对点通信。

看下官方给的一个demo:

#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>#define MPICHECK(cmd) do {                          \int e = cmd;                                      \if( e != MPI_SUCCESS ) {                          \printf("Failed: MPI error %s:%d '%d'\n",        \__FILE__,__LINE__, e);   \exit(EXIT_FAILURE);                             \}                                                 \
} while(0)#define CUDACHECK(cmd) do {                         \cudaError_t e = cmd;                              \if( e != cudaSuccess ) {                          \printf("Failed: Cuda error %s:%d '%s'\n",             \__FILE__,__LINE__,cudaGetErrorString(e));   \exit(EXIT_FAILURE);                             \}                                                 \
} while(0)#define NCCLCHECK(cmd) do {                         \ncclResult_t r = cmd;                             \if (r!= ncclSuccess) {                            \printf("Failed, NCCL error %s:%d '%s'\n",             \__FILE__,__LINE__,ncclGetErrorString(r));   \exit(EXIT_FAILURE);                             \}                                                 \
} while(0)static uint64_t getHostHash(const char* string) {// Based on DJB2a, result = result * 33 ^ charuint64_t result = 5381;for (int c = 0; string[c] != '\0'; c++){result = ((result << 5) + result) ^ string[c];}return result;
}static void getHostName(char* hostname, int maxlen) {gethostname(hostname, maxlen);for (int i=0; i< maxlen; i++) {if (hostname[i] == '.') {hostname[i] = '\0';return;}}
}int main(int argc, char* argv[])
{int size = 32*1024*1024;int myRank, nRanks, localRank = 0;//initializing MPIMPICHECK(MPI_Init(&argc, &argv));MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));//calculating localRank which is used in selecting a GPUuint64_t hostHashs[nRanks];char hostname[1024];getHostName(hostname, 1024);hostHashs[myRank] = getHostHash(hostname);MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));for (int p=0; p<nRanks; p++) {if (p == myRank) break;if (hostHashs[p] == hostHashs[myRank]) localRank++;}//each process is using two GPUsint nDev = 2;float** sendbuff = (float**)malloc(nDev * sizeof(float*));float** recvbuff = (float**)malloc(nDev * sizeof(float*));cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);//picking GPUs based on localRankfor (int i = 0; i < nDev; ++i) {CUDACHECK(cudaSetDevice(localRank*nDev + i));CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));CUDACHECK(cudaStreamCreate(s+i));}ncclUniqueId id;ncclComm_t comms[nDev];//generating NCCL unique ID at one process and broadcasting it to allif (myRank == 0) ncclGetUniqueId(&id);MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));//initializing NCCL, group API is required around ncclCommInitRank as it is//called across multiple GPUs in each thread/processNCCLCHECK(ncclGroupStart());for (int i=0; i<nDev; i++) {CUDACHECK(cudaSetDevice(localRank*nDev + i));NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));}NCCLCHECK(ncclGroupEnd());//calling NCCL communication API. Group API is required when using//multiple devices per thread/processNCCLCHECK(ncclGroupStart());for (int i=0; i<nDev; i++)NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,comms[i], s[i]));NCCLCHECK(ncclGroupEnd());//synchronizing on CUDA stream to complete NCCL communicationfor (int i=0; i<nDev; i++)CUDACHECK(cudaStreamSynchronize(s[i]));//freeing device memoryfor (int i=0; i<nDev; i++) {CUDACHECK(cudaFree(sendbuff[i]));CUDACHECK(cudaFree(recvbuff[i]));}//finalizing NCCLfor (int i=0; i<nDev; i++) {ncclCommDestroy(comms[i]);}//finalizing MPIMPICHECK(MPI_Finalize());printf("[MPI Rank %d] Success \n", myRank);return 0;
}

在上边的示例中,rank0会执行ncclGetUniqueId获取Id,然后通过mpi广播给其他rank,接下来看下UniqueId是怎么产生的。

ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {NCCLCHECK(ncclInit());NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));return bootstrapGetUniqueId(out);
}

然后看下ncclInit。

首先执行initEnv,设置环境变量。

然后执行initNet,用来初始化nccl所需要的网络,包括两个,一个是bootstrap网络,另外一个是数据通信网络,bootstrap网络主要用于初始化时交换一些简单的信息,比如每个机器的ip端口,由于数据量很小,而且主要是在初始化阶段执行一次,因此bootstrap使用的是tcp;而通信网络是用于实际数据的传输,因此会优先使用rdma(支持gdr的话会优先使用gdr)。

ncclResult_t initNet() {// Always initialize bootstrap networkNCCLCHECK(bootstrapNetInit());NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));if (ncclNet != NULL) return ncclSuccess;if (initNet(&ncclNetIb) == ncclSuccess) {ncclNet = &ncclNetIb;} else {NCCLCHECK(initNet(&ncclNetSocket));ncclNet = &ncclNetSocket;}return ncclSuccess;
}

bootstrapNetInit就是bootstrap网络的初始化,主要就是通过findInterfaces遍历机器上所有的网卡信息,通过prefixList匹配选择使用哪些网卡,将可用网卡的信息保存下来,将ifa_name保存到全局的bootstrapNetIfNames,ip地址保存到全局bootstrapNetIfAddrs,默认除了docker和lo其他的网卡都可以使用。

例如在测试机器上有三张网卡,分别是xgbe0、xgbe1、xgbe2,那么就会把这三个ifaname和对应的ip地址保存下来,另外nccl提供了环境变量NCCL_SOCKET_IFNAME可以用来指定想用的网卡名,例如通过export NCCL_SOCKET_IFNAME=xgbe0来指定使用xgbe0,其实就是通过prefixList来匹配做到的。

static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) {struct netIf userIfs[MAX_IFS];bool searchNot = prefixList && prefixList[0] == '^';if (searchNot) prefixList++;bool searchExact = prefixList && prefixList[0] == '=';if (searchExact) prefixList++;int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);int found = 0;struct ifaddrs *interfaces, *interface;getifaddrs(&interfaces);for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) {if (interface->ifa_addr == NULL) continue;int family = interface->ifa_addr->sa_family;if (family != AF_INET && family != AF_INET6)continue;if (sock_family != -1 && family != sock_family)continue;if (family == AF_INET6) {struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr);if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;}if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {continue;}bool duplicate = false;for (int i = 0; i < found; i++) {if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; }}if (!duplicate) {strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize);int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);memcpy(addrs+found, interface->ifa_addr, salen);found++;}}freeifaddrs(interfaces);return found;
}

开始初始化通信网络。

ncclNet_t结构体是一系列的函数指针,比如初始化,发送,接收等;socket,IB等通信方式都实现了自己的ncclNet_t,如ncclNetSocket,ncclNetIb,初始化通信网络的过程就是依次看哪个通信模式可用,然后赋值给全局的ncclNet。

首先执行initNetPlugin,查看是否有libnccl-net.so,测试环境没有这个so,所以直接返回。

然后尝试使用IB网络:

首先执行ncclNetIb的init函数,就是ncclIbInit。

ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {static int shownIbHcaEnv = 0;if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; }if (ncclParamIbDisable()) return ncclInternalError;if (ncclNIbDevs == -1) {pthread_mutex_lock(&ncclIbLock);wrap_ibv_fork_init();if (ncclNIbDevs == -1) {ncclNIbDevs = 0;if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) {WARN("NET/IB : No IP interface found.");return ncclInternalError;}// Detect IB cardsint nIbDevs;struct ibv_device** devices;// Check if user defined which IB device:port to usechar* userIbEnv = getenv("NCCL_IB_HCA");if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv);struct netIf userIfs[MAX_IB_DEVS];bool searchNot = userIbEnv && userIbEnv[0] == '^';if (searchNot) userIbEnv++;bool searchExact = userIbEnv && userIbEnv[0] == '=';if (searchExact) userIbEnv++;int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS);if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError;for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) {struct ibv_context * context;if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) {WARN("NET/IB : Unable to open device %s", devices[d]->name);continue;}int nPorts = 0;struct ibv_device_attr devAttr;memset(&devAttr, 0, sizeof(devAttr));if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) {WARN("NET/IB : Unable to query device %s", devices[d]->name);if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }continue;}for (int port = 1; port <= devAttr.phys_port_cnt; port++) {struct ibv_port_attr portAttr;if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) {WARN("NET/IB : Unable to query port %d", port);continue;}if (portAttr.state != IBV_PORT_ACTIVE) continue;if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND&& portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue;// check against user specified HCAs/portsif (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) {continue;}TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port,portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");ncclIbDevs[ncclNIbDevs].device = d;ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid;ncclIbDevs[ncclNIbDevs].port = port;ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer;ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);ncclIbDevs[ncclNIbDevs].context = context;strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE);NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort));ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp;ncclNIbDevs++;nPorts++;pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context);}if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }}if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; };}if (ncclNIbDevs == 0) {INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found.");} else {char line[1024];line[0] = '\0';for (int d=0; d<ncclNIbDevs; d++) {snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName,ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");}line[1023] = '\0';char addrline[1024];INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline));}pthread_mutex_unlock(&ncclIbLock);}return ncclSuccess;
}

首先第三行通过wrap_ibv_symbols加载动态库libibverbs.so,然后获取动态库的各个函数。

然后通过wrap_ibv_fork_init避免fork引起rdma网卡读写出错。

后面会讲到ib网络也会用到socket进行带外网络的传输,所以这里也通过findInterfaces获取一个可用的网卡保存到ncclIbIfAddr。

通过ibv_get_device_list获取所有rdma设备到devices中,遍历devices的每个device,因为每个HCA可能有多个物理port,所以对每个device遍历每一个物理port,获取每个port的信息。

然后将相关信息保存到全局的ncclIbDevs中,比如是哪个device的哪个port,使用的是IB还是ROCE,device的pci路径,maxqp,device的name等,注意这里也有类似bootstrap网络NCCL_SOCKET_IFNAME的环境变量,叫NCCL_IB_HCA,可以指定使用哪个IB HCA。

到这里整个初始化的过程就完成了,一句话总结就是,获取了当前机器上所有可用的IB网卡和普通以太网卡之后保存下来。

然后开始生成UniqueId。

ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;void* listenComm;NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));pthread_t thread;pthread_create(&thread, NULL, bootstrapRoot, listenComm);return ncclSuccess;
}

ncclNetHandle_t也是一个字符数组,然后执行bootstrapNetListen。

static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) {union socketAddress* connectAddr = (union socketAddress*) netHandle;static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large");// if dev >= 0, listen based on devif (dev >= 0) {NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr));} else if (dev == findSubnetIf) {...} // Otherwise, handle stores a local addressstruct bootstrapNetComm* comm;NCCLCHECK(bootstrapNetNewComm(&comm));NCCLCHECK(createListenSocket(&comm->fd, connectAddr));*listenComm = comm;return ncclSuccess;
}

依次看下这三个函数,通过bootstrapNetGetSocketAddr获取一个可用的ip地址。

static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) {if (dev >= bootstrapNetIfs) return ncclInternalError;memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr));return ncclSuccess;
}

此时dev是0, bootstrapNetIfs是初始化bootstrap网络的时候一共找到了几个可用的网卡,这里就是获取了第0个可用的ip地址。

然后通过bootstrapNetNewComm创建bootstrapNetComm,bootstrapNetComm其实就是fd,bootstrapNetNewComm其实就是new了一个bootstrapNetComm。

struct bootstrapNetComm {int fd;
};

通过createListenSocket启动socker server。

static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) {/* IPv4/IPv6 support */int family = localAddr->sa.sa_family;int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);/* Create socket and bind it to a port */int sockfd = socket(family, SOCK_STREAM, 0);if (sockfd == -1) {WARN("Net : Socket creation failed : %s", strerror(errno));return ncclSystemError;}if (socketToPort(&localAddr->sa)) {// Port is forced by env. Make sure we get the port.int opt = 1;
#if defined(SO_REUSEPORT)SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");
#elseSYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");
#endif}// localAddr port should be 0 (Any port)SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind");/* Get the assigned Port */socklen_t size = salen;SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname");#ifdef ENABLE_TRACEchar line[1024];TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));
#endif/* Put the socket in listen mode* NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn*/SYSCHECK(listen(sockfd, 16384), "listen");*fd = sockfd;return ncclSuccess;
}

创建监听fd,ip由localaddr指定,初始端口为0,bind时随机找一个可用端口,并通过getsockname(sockfd, &localAddr->sa, &size)将ip端口写回到localaddr,这里localaddr就是UniqueId。

到这里UniqueId也就产生了,其实就是当前机器的ip和port。

(本文经授权后由OneFlow发布。原文:https://blog.csdn.net/KIDGIN7439/article/details/126712106?spm=1001.2014.3001.5502)

其他人都在看

  • One-YOLOv5 v1.2.0发布

  • 超越ChatGPT:大模型的智能极限

  • 对抗软件系统复杂性③:恰当分层,不多不少

  • ChatGPT作者Schulman:我们成功的秘密武器

  • 比快更快,开源Stable Diffusion刷新作图速度

  • OneEmbedding:单卡训练TB级推荐模型不是梦

  • GLM训练加速:性能最高提升3倍,显存节省1/3

欢迎Star、试用OneFlow新版本:

NCCL源码解析①:初始化及ncclUniqueId的产生相关推荐

  1. junit源码解析--初始化阶段

    OK,我们接着上篇整理.上篇博客中已经列出的junit的几个核心的类,这里我们开始整理junit完整的生命周期. JUnit 的完整生命周期分为 3 个阶段:初始化阶段.运行阶段和结果捕捉阶段. 这篇 ...

  2. NCCL源码解析②:Bootstrap网络连接的建立

    作者|KIDGINBROOK 更新|潘丽晨 上次介绍到rank0的机器生成了ncclUniqueId,并完成了机器的bootstrap网络和通信网络的初始化,这节接着看下所有节点间bootstrap的 ...

  3. webserver接口_SpringBoot内置源码解析WebServer初始化过程

    WebServer 初始化过程 在上一节中 Spring Boot 初始化了 WebServer 对应的工厂类.同时,我们也知道对应 Web容器的WebServer实现类有:TomcatWebServ ...

  4. Spring源码解析 -- SpringWeb请求映射Map初始化

    简介 在上篇文章中,大致解析了Spring如何将请求路径与处理方法进行映射,但映射相关的初始化对于我们来说还是一团迷雾 本篇文章就来探索下,请求路径和处理方法的映射,是如何进行初始化的 概览 基于上篇 ...

  5. Libuv源码解析 - uv_loop整个初始化模块

    Libuv源码解析 - uv_loop整个初始化模块 loop_default_loop static uv_loop_t default_loop_struct; static uv_loop_t* ...

  6. JVM SandBox源码解析(一):启动时初始化、启动时加载模块、ModuleHttpServlet进行Http路由

    前言 上篇JVM SandBox实现原理详解文章中,主要解析了JVM SandBox的核心实现原理,并且对SandBoxClassLoader和ModuleClassLoader做了源码解析,也解释了 ...

  7. springboot启动源码解析(三):初始化启动上下文、初始化监听器列表、发布开始启动事件

    此章节主要对springboot启动过程中,发生的[初始化启动上下文].[初始化监听器列表].[发布springboot开始启动事件]进行源码解析,对应的代码如图1所示: 图1: // 首先初始化一个 ...

  8. datax源码解析-JobContainer的初始化阶段解析

    datax源码解析-JobContainer的初始化阶段解析 写在前面 此次源码分析的版本是3.0.因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大 ...

  9. [源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (5) 嵌入式hash表

    [源码解析] NVIDIA HugeCTR,GPU版本参数服务器- (5) 嵌入式hash表 文章目录 [源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (5) 嵌入式hash表 ...

最新文章

  1. 区块链技术未来可能用于哪些方面?
  2. Android官方命令深入分析之Device Monitor
  3. 队列和通知区别_消息队列,阻塞队列
  4. 我的Java开发之路
  5. pytorch笔记:实现简易LSTM
  6. python 字符编码判断 chardet评测
  7. python 正则表达式 re findall 返回能匹配的字符串
  8. 分布式Session共享解决方案
  9. python函数做n_简单Python函数的O(N)时间复杂性
  10. STL3-MyArray动态数组类模板实现
  11. ORACLE 11G负载均衡测试
  12. 【Linux】centos7安装bcp报错
  13. 如何定时唤醒计算机命令,如何设置定时开机 定时开机命令设置方法
  14. 微信公众平台H5支付
  15. 东莞潇洒培训学校分享知识点:UG编程加工参数设置,新手看过来
  16. 建立网站费用大概需要多少钱?如何计算建立网站的成本?
  17. 数学在计算机科学上的应用文献,计算机科学技术在数学中应用浅析
  18. php 点击电话号码直接拨打,在网站上为手机用户提供”点击拨打电话”功能
  19. Glide图片旋转与放大缩小
  20. 微信小程序iconfont不显示解决

热门文章

  1. 复杂科学在创客教学研究中的应用
  2. Revit二次开发-根据名称获取标高
  3. 网站接入银联网上支付(B2B)
  4. 微信订阅号通过获取Openid并获取用户基本信息
  5. 100G 数据,只有 100M 内存,怎么排序?
  6. 66.android 导入项目报错Error:Execution failed for task ':app:validateDebugSigning'. Keystore file F:\myA
  7. 阿里毕玄:阿里十年,从分布式到云时代的架构演进之路
  8. 二分网络上的电影推荐
  9. 计算机控制系统的品质指标,过程控制系统的品质指标.PPT
  10. plupload 文档