作者|KIDGINBROOK
更新|潘丽晨

上次介绍到rank0的机器生成了ncclUniqueId,并完成了机器的bootstrap网络和通信网络的初始化,这节接着看下所有节点间bootstrap的连接是如何建立的。

rank0节点执行ncclGetUniqueId生成ncclUniqueId,通过mpi将Id广播到所有节点,然后所有节点都会执行ncclCommInitRank,这里其他节点也会进行初始化bootstrap网络和通信网络的操作,然后会执行到ncclCommInitRankSync。

ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {ncclResult_t res;CUDACHECK(cudaSetDevice(cudaDev));NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup);NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup);NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup);INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId);return ncclSuccess;
cleanup:if ((*newcomm) && (*newcomm)->bootstrap) bootstrapAbort((*newcomm)->bootstrap);*newcomm = NULL;return res;
}

ncclComm_t是指向ncclComm的指针,ncclComm是一个大杂烩,包含了通信用到的所有上下文信息,里面的字段等用到的时候再介绍,然后通过commAlloc分配newcom,并且完成初始化,比如当前是哪个卡,对应的pcie busid是什么,然后执行initTransportsRank。

static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {// We use 3 AllGathers// 1. { peerInfo, comm }// 2. ConnectTransport[nranks], ConnectValue[nranks]// 3. { nThreads, nrings, compCap, prev[MAXCHANNELS], next[MAXCHANNELS] }int rank = comm->rank;int nranks = comm->nRanks;uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));// AllGather1 - beginstruct {struct ncclPeerInfo peerInfo;struct ncclComm* comm;} *allGather1Data;NCCLCHECK(ncclCalloc(&allGather1Data, nranks));allGather1Data[rank].comm = comm;struct ncclPeerInfo* myInfo = &allGather1Data[rank].peerInfo;NCCLCHECK(fillInfo(comm, myInfo, commHash));NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather1Data, sizeof(*allGather1Data)));NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks+1)); // Extra rank to represent CollNet rootfor (int i = 0; i < nranks; i++) {memcpy(comm->peerInfo+i, &allGather1Data[i].peerInfo, sizeof(struct ncclPeerInfo));if ((i != rank) && (comm->peerInfo[i].hostHash == myInfo->hostHash) && (comm->peerInfo[i].busId == myInfo->busId)) {WARN("Duplicate GPU detected : rank %d and rank %d both on CUDA device %x", rank, i, myInfo->busId);return ncclInvalidUsage;}}

看下bootstrapInit:

ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;struct extState* state;NCCLCHECK(ncclCalloc(&state, 1));state->rank = rank;state->nranks = nranks;*commState = state;TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);struct extInfo info = { 0 };info.rank = rank;info.nranks = nranks;void *tmpSendComm, *tmpRecvComm;// Pass the remote address to listen via infoif (idFromEnv) {memcpy(&info.extHandleListen, netHandle, sizeof(ncclNetHandle_t));memcpy(&info.extHandleListenRoot, netHandle, sizeof(ncclNetHandle_t));}// listen will return the local address via info (specify interface type 'findSubnetIf')state->dev = idFromEnv ? findSubnetIf : 0;void* extBstrapListenCommRoot;NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListen, &state->extBstrapListenComm));NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListenRoot, &extBstrapListenCommRoot));// stagger connection times to avoid an overload of the root at very high rank countsif (nranks > 128) {long msec = rank;struct timespec tv;tv.tv_sec = msec / 1000;tv.tv_nsec = 1000000 * (msec % 1000);TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);(void) nanosleep(&tv, NULL);}// send info on my listening socket to rootNCCLCHECK(bootstrapNetConnect(state->dev, netHandle, &tmpSendComm));NCCLCHECK(bootstrapNetSend(tmpSendComm, &info, sizeof(info)));NCCLCHECK(bootstrapNetCloseSend(tmpSendComm));// get info on my "next" rank in the bootstrap ring from root
}

首先看下commState,即ncclComm的bootstrap,类型为extState。

struct extState {void* extBstrapListenComm;void* extBstrapRingRecvComm;void* extBstrapRingSendComm;ncclNetHandle_t* peerBstrapHandles;struct unexConn* unexpectedConnections;int rank;int nranks;int dev;
};

其中extBstrapRingSendComm是当前节点连接next的socket连接,extBstrapRingRecvComm是当前节点和prev节点的socket连接,extBstrapListenComm是当前节点的监听socket,peerBstrapHandles是所有rank的ip port(对应extBstrapListenComm),dev默认为0,表示用第几个ip地址。

然后通过bootstrapNetListen创建extHandleListen和extHandleListenRoot两个bootstrap comm,如前文所述,bootstrap comm其实就是保存了fd,这里创建两个comm的原因是extHandleListen是rank之间实际使用的bootstrap连接,extHandleListenRoot是rank0节点和其他所有rank进行通信使用的连接。

static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm)

bootstrapNetListen函数上节有介绍过,会获取到第dev个当前机器的ip,然后listen获取监听fd,将ip port写到nethandle,获取到的bootstrap comm写到listencomm。

然后将rank,nrank,extHandleListen和extHandleListenRoot写到extInfo里。

struct extInfo {int rank;int nranks;ncclNetHandle_t extHandleListenRoot;ncclNetHandle_t extHandleListen;
};

netHandle为ncclUniqueId,即rank0的ip port,然后通过bootstrapNetConnect创建bootstrap send comm,类比bootstrapNetListen,bootstrapNetConnect就是建立到netHandle的socket连接,将socket写到sendComm里,这里dev并没有用到。

static ncclResult_t bootstrapNetConnect(int dev, ncclNetHandle_t* netHandle, void** sendComm)

然后通过bootstrapNetSend将extInfo发送出去,即发给rank0:

static ncclResult_t bootstrapNetSend(void* sendComm, void* data, int size) {struct bootstrapNetComm* comm = (struct bootstrapNetComm*)sendComm;NCCLCHECK(socketSend(comm->fd, &size, sizeof(int)));NCCLCHECK(socketSend(comm->fd, data, size));return ncclSuccess;
}

其中socketSend就是执行send接口发送数据。

然后通过bootstrapNetCloseSend关闭fd。

rank0收到数据后会做什么工作呢,回顾一下,rank0的节执行ncclGetUniqueId生成ncclUniqueId,其中在执行bootstrapCreateRoot的最后会启动一个线程执行bootstrapRoot。

static void *bootstrapRoot(void* listenComm) {struct extInfo info;ncclNetHandle_t *rankHandles = NULL;ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchangencclNetHandle_t zero = { 0 }; // for sanity checkingvoid* tmpComm;ncclResult_t res;setFilesLimit();TRACE(NCCL_INIT, "BEGIN");/* Receive addresses from all ranks */int nranks = 0, c = 0;do {NCCLCHECKGOTO(bootstrapNetAccept(listenComm, &tmpComm), res, out);NCCLCHECKGOTO(bootstrapNetRecv(tmpComm, &info, sizeof(info)), res, out);NCCLCHECKGOTO(bootstrapNetCloseRecv(tmpComm), res, out);if (c == 0) {nranks = info.nranks;NCCLCHECKGOTO(ncclCalloc(&rankHandles, nranks), res, out);NCCLCHECKGOTO(ncclCalloc(&rankHandlesRoot, nranks), res, out);}if (nranks != info.nranks) {WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nranks);goto out;}if (memcmp(&zero, &rankHandlesRoot[info.rank], sizeof(ncclNetHandle_t)) != 0) {WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks);goto out;}// Save the connection handle for that rankmemcpy(rankHandlesRoot+info.rank, info.extHandleListenRoot, sizeof(ncclNetHandle_t));memcpy(rankHandles+info.rank, info.extHandleListen, sizeof(ncclNetHandle_t));++c;TRACE(NCCL_INIT, "Received connect from rank %d total %d/%d",  info.rank, c, nranks);} while (c < nranks);TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);// Send the connect handle for the next rank in the AllGather ringfor (int r=0; r<nranks; ++r) {int next = (r+1) % nranks;void *tmpSendComm;NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);}TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);out:bootstrapNetCloseListen(listenComm);if (rankHandles) free(rankHandles);if (rankHandlesRoot) free(rankHandlesRoot);TRACE(NCCL_INIT, "DONE");return NULL;
}

listenComm是上一个博文中rank0创建的监听fd,bootstrapNetAccept是从listenComm中获取一个新连接,使用新连接的fd创建recvcomm。

static ncclResult_t bootstrapNetAccept(void* listenComm, void** recvComm)

然后通过bootstrapNetRecv读取tmpComm的数据,即其他rank发送来的extInfo,然后保存其他rank的extHandleListen和extHandleListenRoot,这个时候rank0就获取到其他所有rank的ip和port了。

获取完所有rank的info之后开始建环,将节点(r+1) % nranks的extHandleListen发送给节点r,就是说将节点r的next节点的nethandle发送给节点r。这里可以看出,每个节点创建了两个listen comm,其中rank0使用extHandleListenRoot进行通信,其他节点之间通过extHandleListen进行通信。

然后再回去接着看bootstrapInit。

ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {// get info on my "next" rank in the bootstrap ring from rootncclNetHandle_t extHandleNext;NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));NCCLCHECK(bootstrapNetConnect(state->dev, &extHandleNext, &state->extBstrapRingSendComm));// Accept the connect request from the previous rank in the AllGather ringNCCLCHECK(bootstrapNetAccept(state->extBstrapListenComm, &state->extBstrapRingRecvComm));// AllGather all listen handlersNCCLCHECK(ncclCalloc(&state->peerBstrapHandles, nranks));memcpy(state->peerBstrapHandles+rank, info.extHandleListen, sizeof(ncclNetHandle_t));NCCLCHECK(bootstrapAllGather(state, state->peerBstrapHandles, sizeof(ncclNetHandle_t)));TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);return ncclSuccess;
}

接着所有rank都会在extHandleListenRoot上接收新连接创建tmpRecvComm,然后接收到当前rank的next的ip,port;然后连接next创建bscomm到state->extBstrapRingSendComm,接收prev的连接创建bscomm到state->extBstrapRingRecvComm,到现在bootstrap网络连接就完全建立起来了,如下图:

最后gather所有rank的ip port,首先将自己的nethandle放到peerBstrapHandles的对应位置,如下所示。

然后执行bootstrapAllGather:

ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {struct extState* state = (struct extState*)commState;char* data = (char*)allData;int rank = state->rank;int nranks = state->nranks;TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);/* Simple ring based AllGather* At each step i receive data from (rank-i-1) from left* and send previous step's data from (rank-i) to right*/for (int i=0; i<nranks-1; i++) {size_t rslice = (rank - i - 1 + nranks) % nranks;size_t sslice = (rank - i + nranks) % nranks;// Send slice to the rightNCCLCHECK(bootstrapNetSend(state->extBstrapRingSendComm, data+sslice*size, size));// Recv slice from the leftNCCLCHECK(bootstrapNetRecv(state->extBstrapRingRecvComm, data+rslice*size, size));}TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);return ncclSuccess;
}

每一次将自己的data发送给对应的rank,然后接收其他rank发送过来的data,如下图。

第一步:

第二步:

到这里每个rank就都有了全局所有rank的ip port。

最后总结一下,本节主要创建了bootstrap环形网络连接,并保存到ncclComm里。

其他人都在看

  • “ChatGPT们”的淘金时代

  • 大型语言模型的推理演算

  • GPT-4创造者:第二次改变AI浪潮的方向

  • ChatGPT作者Schulman:我们成功的秘密武器

  • 比快更快,开源Stable Diffusion刷新作图速度

  • OneEmbedding:单卡训练TB级推荐模型不是梦

  • GLM训练加速:性能最高提升3倍,显存节省1/3

欢迎Star、试用OneFlow: github.com/Oneflow-Inc/oneflow/http://github.com/Oneflow-Inc/oneflow/

NCCL源码解析②:Bootstrap网络连接的建立相关推荐

  1. 通过ConnectInterceptor源码掌握OKHttp3网络连接原理 呕心沥血第十弹【十】

    ConnectInterceptor 系列 前言 连接拦截器 Http协议发展 OKHttp创新 源码分析 ConnectionPool StreamAllocation RealConnection ...

  2. Volley 源码解析之网络请求

    Volley源码分析三部曲 Volley 源码解析之网络请求 Volley 源码解析之图片请求 Volley 源码解析之缓存机制 Volley 是 Google 推出的一款网络通信框架,非常适合数据量 ...

  3. NVIDIA NCCL 源码学习(二)- bootstrap网络连接的建立

    上次介绍到rank0的机器生成了ncclUniqueId,并完成了机器的bootstrap网络和通信网络的初始化,这节接着看下所有节点间bootstrap的连接是如何建立的 rank0节点执行nccl ...

  4. NCCL源码解析①:初始化及ncclUniqueId的产生

    作者|KIDGINBROOK 更新|潘丽晨 NCCL是英伟达开源的GPU通信库,支持集合通信和点对点通信. 看下官方给的一个demo: #include <stdio.h> #includ ...

  5. OKHttp源码解析 (复用连接池)

    原文地址:https://www.jianshu.com/p/6166d28983a2 复用连接池 相关的主要三个类 RealConnection ConnectionPool StreamAlloc ...

  6. Dubbo源码解析之Zookeeper连接

    2019独角兽企业重金招聘Python工程师标准>>> 注:Dubbo的版本是2.5.7. 图1 RegistryProtocol的export时序图 注册中心有Zookeeper. ...

  7. okhttp的应用详解与源码解析--android网络请求框架发展史

    乘5G之势,借物联网之风,Android未来亦可期,Android优势在于开放,手机.平板.车载设备.智能家居等都是Android的舞台,Google不倒,Android不灭,本专栏的同步视频教程已经 ...

  8. libjingle源码解析(4)-【PseudoTcp】建立UDP之上的TCP(2):对交互数据流的处理

    对交互数据流的处理 TCP包含两类数据流,交互数据流和成块数据流.交互数据流的特点是每个报文数据字节数比较小,大部分是10字节一下,而成块数据流的特点是大部分报文是满长度的,一般能达到MSS. 本文先 ...

  9. Redis源码解析(15) 哨兵机制[2] 信息同步与TILT模式

    Redis源码解析(1) 动态字符串与链表 Redis源码解析(2) 字典与迭代器 Redis源码解析(3) 跳跃表 Redis源码解析(4) 整数集合 Redis源码解析(5) 压缩列表 Redis ...

最新文章

  1. 在VS2010下使用 UNICODE 和 ANSI 的混合编程
  2. GraphPad Prism 统计教程:简单线性回归原理
  3. ES6(三)——Set、WeakSet、Map、WeakMap
  4. 大力出奇迹,揭秘昇腾CANN的AI超能力
  5. [收藏转载]明星软件工程师的十种特质
  6. 性能测试的基本流程【最新】
  7. BZOJ1934[SHOI2007] Vote 善意的投票
  8. 四、day02切换滑块
  9. 微信文件夹储存在什么位置?如何修改保存路径
  10. html中span标签的详细介绍
  11. 浩辰3D软件入门攻略:什么是有限元分析?
  12. Git No newline at end of file
  13. linux系统上安装python工具的步骤
  14. Layui写后台登录页面 蓝奏云 下载
  15. 根轨迹和系统参数的确定
  16. Flutter实现天气查询App
  17. 爬取腾讯新闻中省份疫情数据到Mysql数据库
  18. oracle date类型字段,Oracle Date类型
  19. 源代码探案系列之 .NET Core 限流中间件 AspNetCoreRateLimit
  20. 月黑风高夜!把室友的STM32换成了GD32

热门文章

  1. 物联网解决方案,一个基于WiFi,一个基于ZigBee,两者的优势和劣势有哪些?
  2. 获取上一个交易日—python
  3. 中国科技大学科学岛计算机系,中国科学技术大学2020年优秀大学生科学岛推免夏令营通知...
  4. 生成带有logo的二维码
  5. 河北大学计算机科学与技术考研,计算机专业考研经验贴(重)
  6. flowable申请页面
  7. AKM项目轶事之Flyback遭遇航班取消
  8. Kubernetes CrashLookBackOff的问题让我生无可恋
  9. android 仿相册,Android 仿新浪相册选择器 PhotoSelector
  10. python爬虫教程书籍-Python网络爬虫实例教程(视频讲解版)