UDT 最新源码分析 -- UDT Socket 相关函数

  • UDT socket 建立与使用
    • 主要流程
      • C/S 模式
      • Rendezvous 模式
      • UDT epoll
    • UDT socket 创建
    • UDT socket setsockopt/getsockopt
    • UDT socket bind
    • UDT::listen
    • UDT connect
    • UDT accept
  • 总结
    • C/S 模式--四次握手
    • Rendezvous模式--三次握手

UDT socket 建立与使用

主要流程

C/S 模式

  • UDT::socket -> UDT::setsockopt -> UDT::connect -> UDT::send -> UDT::close
  • UDT::socket -> UDT::setsockopt -> UDT::bind -> UDT::listen -> UDT::accept -> UDT::recv -> UDT::close

Rendezvous 模式

  • UDT::setsockopt(usock, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool))

UDT epoll

详细源码分析在下一篇文章中。

  • UDT::epoll_create -> UDT::epoll_add_usock/epoll_add_ssock -> UDT::epoll_wait/epoll_wait2 -> UDT::epoll_release

UDT socket 创建

  • UDT::socket -> CUDT::socket -> CUDTUnited::newSocket

UDT socket 的创建过程主要分为以下几步:

  1. 如果GC 线程未启动,则首先启动;
  2. 新建 CUDTSocket,初始化新建内在变量 m_pUDT,以及地址信息 m_pSelfAddr,UDT socket 的标识 m_SocketID,UDT socket的状态 m_Status 设置为 INIT, m_ListenSocket 初始设置为0;
  3. 将 UDT socket中信息注册到m_pUDT, 包括 m_SocketID,m_iSockType,m_iIPversion,m_pCache;
  4. 将 UDT socket加入 全局m_Sockets map;
  5. 返回标识 m_SocketID。

首先代码分析还是从对外提供的接口调用开始。

UDTSOCKET CUDT::socket(int af, int type, int)
{if (!s_UDTUnited.m_bGCStatus)s_UDTUnited.startup(); // 如果GC 线程未启动,那么首先启动return s_UDTUnited.newSocket(af, type); //创建一个 UDT socket
}

接下来是通过 newSocket 创建 UDT socket 的过程。

UDTSOCKET CUDTUnited::newSocket(int af, int type)
{CUDTSocket* ns = NULL;try{ns = new CUDTSocket;   //新建UDT socketns->m_pUDT = new CUDT; //CUDT 在创建socket时新建if (AF_INET == af){ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;}else{ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6); //支持IPv6((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;}}catch (...) { ... }CGuard::enterCS(m_IDLock);ns->m_SocketID = -- m_SocketID; //在初始化的随机数值上进行递减,作为 UDT socket IDCGuard::leaveCS(m_IDLock);ns->m_Status = INIT;  //设置为 INIT 状态ns->m_ListenSocket = 0;ns->m_pUDT->m_SocketID = ns->m_SocketID; //将刚刚获得 UDT socket ID 注册到 CUDT中ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM; ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;ns->m_pUDT->m_pCache = m_pCache;// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try{m_Sockets[ns->m_SocketID] = ns; //将 UDT socket加入 全局m_Sockets map}catch (...){//failure and rollback...}CGuard::leaveCS(m_ControlLock);return ns->m_SocketID;
}

m_SocketID 的初始化,在CUDTUnited构造函数中被初始化为一个随机数。构造函数在系统初始化 startup 时被调用。

CUDTUnited::CUDTUnited():
m_SocketID(0),
{// Socket ID MUST start from a random valuesrand((unsigned int)CTimer::getTime());m_SocketID = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX));
}

UDT socket setsockopt/getsockopt

UDT Socket 参数设置。

  • CUDT::setsockopt -> CUDT::setOpt
  • CUDT::getsockopt -> CUDT::getOpt

UDT 可配置的参数中包括一些系统内部自定义参数,这些参数的定义如下所示:

enum UDTOpt
{UDT_MSS,             // the Maximum Transfer UnitUDT_SNDSYN,          // if sending is blockingUDT_RCVSYN,          // if receiving is blockingUDT_CC,              // custom congestion control algorithmUDT_FC,              // Flight flag size (window size)UDT_SNDBUF,          // maximum buffer in sending queueUDT_RCVBUF,          // UDT receiving buffer sizeUDT_LINGER,          // waiting for unsent data when closingUDP_SNDBUF,          // UDP sending buffer sizeUDP_RCVBUF,          // UDP receiving buffer sizeUDT_MAXMSG,          // maximum datagram message sizeUDT_MSGTTL,          // time-to-live of a datagram messageUDT_RENDEZVOUS,      // rendezvous connection modeUDT_SNDTIMEO,        // send() timeoutUDT_RCVTIMEO,        // recv() timeoutUDT_REUSEADDR,       // reuse an existing port or create a new oneUDT_MAXBW,           // maximum bandwidth (bytes per second) that the connection can useUDT_STATE,           // current socket state, see UDTSTATUS, read onlyUDT_EVENT,           // current avalable events associated with the socketUDT_SNDDATA,         // size of data in the sending bufferUDT_RCVDATA          // size of data available for recv
};

UDT socket bind

  • CUDT::bind -> CUDTUnited::bind
  • 不同形式
    • int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
    • int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)

UDT bind 过程涉及到的模块较多,总的来说,就是将创建的 UDT socket 的信息注册到一个复用器上,如果复用器不存在则创建。每个复用器保证用于一个端口,每个复用器有一个 channel, 用于 udp socket 的创建,端口绑定等,修改 UDT socket 状态, 从 INIT 迁移到 OPENED。

也就是说,UDT socket通过UDT bind 与复用器 CMultiplexer 关联在一起,channel 作为 udp socket 的真正执行者进行运行,通过发送接收的两个工作线程完成数据的收发。发送接收的两个队列属于复用器,但是通过复用器ID使得 UDT socket 发送数据时直接与 channel 打交道,不再需要查找复用器。

int CUDTUnited::bind(const UDTSOCKET u, ...)
{CUDTSocket* s = locate(u);CGuard cg(s->m_ControlLock);// cannot bind a socket more than onceif (INIT != s->m_Status)throw CUDTException(5, 0, 0);s->m_pUDT->open();  //m_pUDT中一堆参数初始化updateMux(s, name); //更新复用器s->m_Status = OPENED; //更新UDT socket 状态为 OPENED// copy address information of local nodes->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); return 0;
}

每个端口可能被多个UDT socket复用,所以绑定端口实际上是注册到端口唯一的复用器上

  • void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls)
  • void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
{CGuard cg(m_ControlLock);if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr)){int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);// find a reusable addressfor (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i){if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable){if (i->second.m_iPort == port) //找到端口对应复用器{// reuse the existing multiplexer++ i->second.m_iRefCount; //引用计数加一s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue; //发送队列s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue; //接收队列s->m_iMuxID = i->second.m_iID; //将复用器 ID 告知 UDT socketreturn;}}}}// a new multiplexer is neededCMultiplexer m;m.m_iMSS = s->m_pUDT->m_iMSS;m.m_iIPversion = s->m_pUDT->m_iIPversion;m.m_iRefCount = 1;m.m_bReusable = s->m_pUDT->m_bReuseAddr;m.m_iID = s->m_SocketID;// 新建传输channel,设置IP 版本,以及发送接收缓冲大小,默认65536m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);try{  //创建并绑定真正用于传输的 udp socket,保存于 channel 中if (NULL != udpsock)m.m_pChannel->open(*udpsock);elsem.m_pChannel->open(addr);}catch (CUDTException& e){m.m_pChannel->close();delete m.m_pChannel;throw e;}//复用器相关参数赋值sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;m.m_pChannel->getSockAddr(sa);m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;m.m_pTimer = new CTimer;m.m_pSndQueue = new CSndQueue; //新建发送队列m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);m.m_pRcvQueue = new CRcvQueue; //新建接收队列m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);m_mMultiplexer[m.m_iID] = m;s->m_pUDT->m_pSndQueue = m.m_pSndQueue;s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;s->m_iMuxID = m.m_iID;
}

创建 channel 以后,新建发送接收队列,同时还需要分别调用 init 函数,该方法的主要作用除了对一部分参数初始化外,就是创建工作线程 worker 。

以发送队列举例,接收队列以及具体的使用过程后续文章介绍:

void CSndQueue::init(CChannel* c, CTimer* t)
{m_pChannel = c;m_pTimer = t;m_pSndUList = new CSndUList;m_pSndUList->m_pWindowLock = &m_WindowLock;m_pSndUList->m_pWindowCond = &m_WindowCond;m_pSndUList->m_pTimer = m_pTimer;if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)){m_WorkerThread = 0;throw CUDTException(3, 1);}
}

对于发送队列的工作线程:

void* CSndQueue::worker(void* param)
{CSndQueue* self = (CSndQueue*)param;while (!self->m_bClosing) //只要队列处于正常状态,就无限循环{uint64_t ts = self->m_pSndUList->getNextProcTime();if (ts > 0){// wait until next processing time of the first socket on the listuint64_t currtime;CTimer::rdtsc(currtime);if (currtime < ts)  //未到时间,继续等待self->m_pTimer->sleepto(ts);// it is time to send the next pktsockaddr* addr;CPacket pkt;if (self->m_pSndUList->pop(addr, pkt) < 0)continue;self->m_pChannel->sendto(addr, pkt);}else{// wait here if there is no sockets with data to be sent      pthread_mutex_lock(&self->m_WindowLock);if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);//条件等待pthread_mutex_unlock(&self->m_WindowLock);}}return NULL;
}

UDT::listen

  • CUDT::listen -> CUDTUnited::listen

UDT listen 在 UDT socket 处于OPENED 状态时,开始端口监听,从 UDT bind 可知,此时已经 bind 成功。一个端口上只能有一个 listening socket。这里的 listen 用于 C/S 模式,不支持汇合模式。对于已经处于监听状态的 UDT socket, 不会多次监听。

执行 UDT listen 成功后,m_bListening 修改为 true, UDT socket 状态 m_Status 变成 LISTENING。在 UDT socket 中新建两个集合 m_pQueuedSockets 和 m_pAcceptSockets,分别存放接收但还未来得及处理接受的连接请求 ,或者已经接受的连接请求。使用集合,也是借用集合元素唯一的特性。

实际上,执行 UDT listen 是设置监听到复用器中的接收队列 m_pRcvQueue。在队列的工作线程中,将会根据到来的包的类型进行对应的响应,并发送。

int CUDTUnited::listen(const UDTSOCKET u, int backlog)
{CUDTSocket* s = locate(u);CGuard cg(s->m_ControlLock);// do nothing if the socket is already listeningif (LISTENING == s->m_Status)return 0;// a socket can listen only if is in OPENED statusif (OPENED != s->m_Status)throw CUDTException(5, 5, 0);// listen is not supported in rendezvous connection setupif (s->m_pUDT->m_bRendezvous)throw CUDTException(5, 7, 0);if (backlog <= 0)throw CUDTException(5, 3, 0);s->m_uiBackLog = backlog;try{  //新建 接收到但未接受的socket集合与以及接受的集合,使用 set保证每个socket唯一s->m_pQueuedSockets = new set<UDTSOCKET>; s->m_pAcceptSockets = new set<UDTSOCKET>;}catch (...) { ... }s->m_pUDT->listen();s->m_Status = LISTENING;return 0;
}

通过接收队列设置UDT 实例中的Listener,实际上是设置到复用器中

void CUDT::listen()
{CGuard cg(m_ConnectionLock);if (!m_bOpened)throw CUDTException(5, 0, 0);if (m_bConnecting || m_bConnected)throw CUDTException(5, 2, 0);// listen can be called more than onceif (m_bListening)return;// if there is already another socket listening on the same portif (m_pRcvQueue->setListener(this) < 0)  // 为CRcvQueue 中 设置  = thisthrow CUDTException(5, 11, 0);m_bListening = true; //修改监听状态
}

通过 UDT listen 设置,为复用器设置 Listener,当接收到数据时,将数据分发到对应的 UDT 实例。同时修改UDT 的当前状态。

在 setListener 以后, 非空,在CRcvQueue中线程函数 worker 循环中,会调用recvfrom 接收到连接请求,检查m_Packet.m_iID, 决定是否调用 connect。

void* CRcvQueue::worker(void* param)
{CRcvQueue* self = (CRcvQueue*)param;sockaddr* addr = ...CUDT* u = NULL;int32_t id;while (!self->m_bClosing){unit->m_Packet.setLength(self->m_iPayloadSize);// reading next incoming packet, recvfrom returns -1 is nothing has been receivedif (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)goto TIMER_CHECK;id = unit->m_Packet.m_iID;// ID 0 is for connection request, which should be passed to the listening socket or rendezvous socketsif (0 == id){if (NULL != self->m_pListener)self->m_pListener->listen(addr, unit->m_Packet);else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))){// asynchronous connect: call connect here// otherwise wait for the UDT socket to retrieve this packetif (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}}
}

对于到达的一个连接请求,如果非空,就可以调用 CUDT 中的私有 listen 方法。对到达的建立连接的包进行解析,生成 coockie字符串,如果时正常的连接请求,则调用发送队列 sendto 发送包。如果是响应消息,且通过cookie 验证,则建立新连接。

int CUDT::listen(sockaddr* addr, CPacket& packet)
{CHandShake hs;hs.deserialize(packet.m_pcData, packet.getLength());// SYN cookiechar clienthost[NI_MAXHOST];char clientport[NI_MAXSERV];getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV);int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000;  // secret changes every one minutestringstream cookiestr;cookiestr << clienthost << ":" << clientport << ":" << timestamp;unsigned char cookie[16];CMD5::compute(cookiestr.str().c_str(), cookie);// connection request type: 1: regular connection request, 0: rendezvous connection request, -1/-2: responseif (1 == hs.m_iReqType) {hs.m_iCookie = *(int*)cookie;packet.m_iID = hs.m_iID;int size = packet.getLength();hs.serialize(packet.m_pcData, size);m_pSndQueue->sendto(addr, packet);return 0;}else{if (hs.m_iCookie != *(int*)cookie){timestamp --;cookiestr << clienthost << ":" << clientport << ":" << timestamp;CMD5::compute(cookiestr.str().c_str(), cookie);if (hs.m_iCookie != *(int*)cookie)return -1;}}int32_t id = hs.m_iID;// When a peer side connects in...if ((1 == packet.getFlag()) && (0 == packet.getType())){ // 控制包,且当前为  Connection Handshakeif ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)){// mismatch, reject the requesths.m_iReqType = 1002;int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);}else{  int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);if (result == -1)hs.m_iReqType = 1002;// send back a response if connection failed or connection already existed// new connection response should be sent in connect()if (result != 1){int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);}else{// a new connection has been created, enable epoll for write s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);}}}return hs.m_iReqType;
}

在上面的代码中,newConnection 主要作用是建立一个新的连接,实质上是将连接的对端信息加入到 UDT 中存放对端连接socket 记录的 m_PeerRec map,并在 m_Sockets 加入新建立的 UDT socket。

首先检查 这个连接是否已经建立,如果已经建立,则返回已经存在 UDT socket 信息;如果当前处于 BROKEN 状态,会修改状态到 CLOSED, 并进行一些清理工作。其余的清理流程在 UDT 关闭过程中处理。

如果是一个新的连接,新建 UDT socket, 初始化部分参数,包括 m_pSelfAddr, m_SocketID, m_ListenSocket,m_PeerID, m_iISN等,然后绑定新的地址到监听socket。修改状态为 CONNECTED。修改 m_PeerRec 与 m_Sockets, 插入socket 进入 m_pQueuedSockets, 更新本地节点信息, 更新事件与定时器。等待 accept 事件到来。

int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs)
{CUDTSocket* ns = NULL;CUDTSocket* ls = locate(listen); //在 m_Sockets中查找本地 UDT socket。if (NULL == ls)return -1;// if this connection has already been processedif (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) //在 m_PeerRec 与 m_Sockets中查找。{if (ns->m_pUDT->m_bBroken){// last connection from the "peer" address has been broken...}else{// connection already exist, this is a repeated connection request// respond with existing HS information...return 0;//except for this situation a new connection should be started}}// exceeding backlog, refuse the connection requestif (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)return -1;try{ns = new CUDTSocket;ns->m_pUDT = new CUDT(*(ls->m_pUDT));...ns->m_pSelfAddr = ......}catch (...) { ...  }CGuard::enterCS(m_IDLock);ns->m_SocketID = -- m_SocketID;CGuard::leaveCS(m_IDLock);ns->m_ListenSocket = listen;ns->m_iIPversion = ls->m_iIPversion;ns->m_pUDT->m_SocketID = ns->m_SocketID;ns->m_PeerID = hs->m_iID;ns->m_iISN = hs->m_iISN;int error = 0;try{// bind to the same addr of listening socketns->m_pUDT->open();updateMux(ns, ls);ns->m_pUDT->connect(peer, hs);}catch (...){error = 1;goto ERR_ROLLBACK;}ns->m_Status = CONNECTED;// copy address information of local nodens->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try{m_Sockets[ns->m_SocketID] = ns;m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);}catch (...){error = 2;}CGuard::leaveCS(m_ControlLock);CGuard::enterCS(ls->m_AcceptLock);try{ls->m_pQueuedSockets->insert(ns->m_SocketID);}catch (...){error = 3;}CGuard::leaveCS(ls->m_AcceptLock);// acknowledge users waiting for new connections on the listening socketm_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);CTimer::triggerEvent();ERR_ROLLBACK:if (error > 0){ns->m_pUDT->close();ns->m_Status = CLOSED;ns->m_TimeStamp = CTimer::getTime();return -1;}// wake up a waiting accept() call#ifndef WIN32pthread_mutex_lock(&(ls->m_AcceptLock));pthread_cond_signal(&(ls->m_AcceptCond));pthread_mutex_unlock(&(ls->m_AcceptLock));#elseSetEvent(ls->m_AcceptCond);#endifreturn 1;
}

UDT connect

  • CUDT::connect( api.cpp, CUDT::public method) -> CUDTUnited::connect -> CUDT::connect( core.cpp, CUDT::private method)

如果UDT socket 能够 connect,首先应该处于 INIT 或者 OPENED 状态。如果处于 INIT状态,表明为新创建的UDT Socket,需要初始化 m_pUDT 内参数并注册到复用器,修改状态为 OPENED。如果处于OPENED 状态,可能已经被 bind 过,则可以进入 CONNECTING 状态,并调用 m_pUDT->connect。记录对端地址 m_pPeerAddr到该 UDT socket内部结构。

int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
{CUDTSocket* s = locate(u);CGuard cg(s->m_ControlLock);// a socket can "connect" only if it is in INIT or OPENED statusif (INIT == s->m_Status){if (!s->m_pUDT->m_bRendezvous){s->m_pUDT->open();updateMux(s);s->m_Status = OPENED;}elsethrow CUDTException(5, 8, 0);}else if (OPENED != s->m_Status)throw CUDTException(5, 2, 0);// connect_complete() may be called before connect() returns.// So we need to update the status before connect() is called,// otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).s->m_Status = CONNECTING;try{s->m_pUDT->connect(name);}catch (CUDTException e){s->m_Status = OPENED;throw e;}// record peer addressdelete s->m_pPeerAddr;if (AF_INET == s->m_iIPversion){s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));}else{s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));}return 0;
}

在 CUDT 中, 在private 方法中,connect 有三种形式:

// Functionality:
//    Connect to a UDT entity listening at address "peer".
// Parameters:
//    0) [in] peer: The address of the listening UDT entity.
// Returned value:
//    None.
void CUDT::connect(const sockaddr* serv_addr) // Functionality:
//    Process the response handshake packet.
// Parameters:
//    0) [in] pkt: handshake packet.
// Returned value:
//    Return 0 if connected, positive value if connection is in progress, otherwise error code.
int CUDT::connect(const CPacket& response) throw ()// Functionality:
//    Connect to a UDT entity listening at address "peer", which has sent "hs" request.
// Parameters:
//    0) [in] peer: The address of the listening UDT entity.
//    1) [in/out] hs: The handshake information sent by the peer side (in), negotiated value (out).
// Returned value:
//    None.
void CUDT::connect(const sockaddr* peer, CHandShake* hs)

在 connect 中调用的 connect 参数为 sockaddr,即第一种形式,这也是从外部接口 UDT::connect 调用后进入的函数:

void CUDT::connect(const sockaddr* serv_addr)
{CGuard cg(m_ConnectionLock);if (!m_bOpened) // UDT socket 处于 OPENED 状态throw CUDTException(5, 0, 0);if (m_bListening) //不能同时 listen 与 connectthrow CUDTException(5, 2, 0);if (m_bConnecting || m_bConnected) //以前没有进行 connect 过throw CUDTException(5, 2, 0);m_bConnecting = true; //修改状态,防止被多次 connect// record peer/server addressdelete m_pPeerAddr;m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));// register this socket in the rendezvous queue// RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this functionuint64_t ttl = 3000000; //这也是关闭时需要额外等3s的原因if (m_bRendezvous)ttl *= 10;ttl += CTimer::getTime();//将 UDT socket 插入一个链表 m_lRendezvousID,临时存储。不管是否汇合模式,都会保存。此处可能导致误解。m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl); // This is my current configurationsm_ConnReq.m_iVersion = m_iVersion;m_ConnReq.m_iType = m_iSockType;m_ConnReq.m_iMSS = m_iMSS;m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;m_ConnReq.m_iID = m_SocketID;CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);// Random Initial Sequence Numbersrand((unsigned int)CTimer::getTime());m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));m_iLastDecSeq = m_iISN - 1;m_iSndLastAck = m_iISN;m_iSndLastDataAck = m_iISN;m_iSndCurrSeqNo = m_iISN - 1;m_iSndLastAck2 = m_iISN;m_ullSndLastAck2Time = CTimer::getTime();// Inform the server my configurations.CPacket request;char* reqdata = new char [m_iPayloadSize];request.pack(0, NULL, reqdata, m_iPayloadSize); //建包// ID = 0, connection requestrequest.m_iID = 0;int hs_size = m_iPayloadSize;m_ConnReq.serialize(reqdata, hs_size); //写入请求request.setLength(hs_size);m_pSndQueue->sendto(serv_addr, request); // 发送请求m_llLastReqTime = CTimer::getTime(); //更新定时器// asynchronous connect, return immediatelyif (!m_bSynRecving){delete [] reqdata;return;}// Wait for the negotiated configurations from the peer side.CPacket response;char* resdata = new char [m_iPayloadSize];response.pack(0, NULL, resdata, m_iPayloadSize);CUDTException e(0, 0);while (!m_bClosing) // 等待 connect 返回,最多等待3s, 如果没有响应,会重复发送请求{// avoid sending too many requests, at most 1 request per 250msif (CTimer::getTime() - m_llLastReqTime > 250000){m_ConnReq.serialize(reqdata, hs_size);request.setLength(hs_size);if (m_bRendezvous)request.m_iID = m_ConnRes.m_iID;m_pSndQueue->sendto(serv_addr, request);m_llLastReqTime = CTimer::getTime();}response.setLength(m_iPayloadSize);if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0){if (connect(response) <= 0)break;// new request/response should be sent out immediately on receving a responsem_llLastReqTime = 0;}if (CTimer::getTime() > ttl){// timeoute = CUDTException(1, 1, 0);break;}}delete [] reqdata;delete [] resdata;if (e.getErrorCode() == 0){if (m_bClosing)                                                 // if the socket is closed before connection...e = CUDTException(1);else if (1002 == m_ConnRes.m_iReqType)                          // connection request rejectede = CUDTException(1, 2, 0);else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))      // secuity checke = CUDTException(1, 4, 0);}if (e.getErrorCode() != 0)throw e;
}

发出 connect 连接请求以后,如果属于同步方式,将等待返回,超时时间设置为3s,并会间隔250ms 不断发送请求,直至收到响应。接收到响应以后, connect(response), 即为 第二种 connect 方法。

建立连接的过程可以参考 TCP 的半连接的思想。这第二次的connect 实际上就是 第二个半连接的建立过程,也是最后一个协商过程。对于非汇合模式,从 m_lRendezvousID 中移除,重新配置所有的连接参数, 为UDT socket 建立对应的各种数据结构,包括 发送接收buffer,丢失链表,窗口等,这些是数据传输过程中需要使用的内部结构,服务于 UDT 的核心传输算法,包括拥塞避免,重传等。所以也会初始化拥塞控制相关参数,最后,设置当前状态 为已连接状态。通过 connect_complete 与 update_events 通知管理模块与epool 状态更新。

int CUDT::connect(const CPacket& response) throw ()
{// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.// returning -1 means there is an error.// returning 1 or 2 means the connection is in process and needs more handshakeif (!m_bConnecting)return -1;if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType)){//a data packet or a keep-alive packet comes, which means the peer side is already connected// in this situation, the previously recorded response will be usedgoto POST_CONNECT;}if ((1 != response.getFlag()) || (0 != response.getType()))return -1;m_ConnRes.deserialize(response.m_pcData, response.getLength());if (m_bRendezvous){// regular connect should NOT communicate with rendezvous connect// rendezvous connect require 3-way handshakeif (1 == m_ConnRes.m_iReqType)return -1;if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType)){m_ConnReq.m_iReqType = -1;// the request time must be updated so that the next handshake can be sent out immediately.m_llLastReqTime = 0;return 1;}}else{// set cookieif (1 == m_ConnRes.m_iReqType){m_ConnReq.m_iReqType = -1;m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;m_llLastReqTime = 0;return 1;}}POST_CONNECT:// Remove from rendezvous queuem_pRcvQueue->removeConnector(m_SocketID);// Re-configure according to the negotiated values.m_iMSS = m_ConnRes.m_iMSS;m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;m_iPktSize = m_iMSS - 28;m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;m_iPeerISN = m_ConnRes.m_iISN;m_iRcvLastAck = m_ConnRes.m_iISN;m_iRcvLastAckAck = m_ConnRes.m_iISN;m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;m_PeerID = m_ConnRes.m_iID;memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);// Prepare all data structurestry{m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);m_pACKWindow = new CACKWindow(1024);m_pRcvTimeWindow = new CPktTimeWindow(16, 64);m_pSndTimeWindow = new CPktTimeWindow();}catch (...){throw CUDTException(3, 2, 0);}CInfoBlock ib;ib.m_iIPversion = m_iIPversion;CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);if (m_pCache->lookup(&ib) >= 0){m_iRTT = ib.m_iRTT;m_iBandwidth = ib.m_iBandwidth;}m_pCC = m_pCCFactory->create();m_pCC->m_UDT = m_SocketID;m_pCC->setMSS(m_iMSS);m_pCC->setMaxCWndSize(m_iFlowWindowSize);m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);m_pCC->setRcvRate(m_iDeliveryRate);m_pCC->setRTT(m_iRTT);m_pCC->setBandwidth(m_iBandwidth);m_pCC->init();m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);m_dCongestionWindow = m_pCC->m_dCWndSize;// And, I am connected too.m_bConnecting = false;m_bConnected = true;// register this socket for receiving data packetsm_pRNode->m_bOnList = true;m_pRcvQueue->setNewEntry(this);// acknowledge the management module.s_UDTUnited.connect_complete(m_SocketID); // 更新m_pSndQueue->m_pChannel本地节点信息,设置状态为 CONNECTED// acknowledde any waiting epolls to writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);return 0;
}

UDT accept

  • UDT::accept -> CUDT::accept -> CUDTUnited::accept

UDT accept 与 TCP socket 中 的accept一样,在socket bind 以后就可以使用,等待其他连接到来。所以当前的 UDT Listener 的状态为 LISTENING。 仅在非汇合模式下使用。

主要的过程是一个while 循环,等待accept事件。当到来以后,删除 m_pQueuedSockets中的节点,插入 m_pAcceptSockets。update_events 发给 epool事件更新。存储对端地址。

UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen)
{CUDTSocket* ls = locate(listen);// the "listen" socket must be in LISTENING statusif (LISTENING != ls->m_Status)throw CUDTException(5, 6, 0);// no "accept" in rendezvous connection setupif (ls->m_pUDT->m_bRendezvous)throw CUDTException(5, 7, 0);UDTSOCKET u = CUDT::INVALID_SOCK;bool accepted = false;// !!only one conection can be set up each time!!#ifndef WIN32while (!accepted)   //循环等待连接到来 accepted = true 时退出{pthread_mutex_lock(&(ls->m_AcceptLock));if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken){// This socket has been closed.accepted = true;}else if (ls->m_pQueuedSockets->size() > 0){  //更新 m_pAcceptSockets 和 m_pQueuedSocketsu = *(ls->m_pQueuedSockets->begin()); ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());accepted = true;}else if (!ls->m_pUDT->m_bSynRecving){accepted = true;}if (!accepted && (LISTENING == ls->m_Status))pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));if (ls->m_pQueuedSockets->empty())m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);pthread_mutex_unlock(&(ls->m_AcceptLock));}#elsewhile (!accepted){WaitForSingleObject(ls->m_AcceptLock, INFINITE);if (ls->m_pQueuedSockets->size() > 0){u = *(ls->m_pQueuedSockets->begin());ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());accepted = true;}else if (!ls->m_pUDT->m_bSynRecving)accepted = true;ReleaseMutex(ls->m_AcceptLock);if  (!accepted & (LISTENING == ls->m_Status))WaitForSingleObject(ls->m_AcceptCond, INFINITE);if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken){// Send signal to other threads that are waiting to accept.SetEvent(ls->m_AcceptCond);accepted = true;}if (ls->m_pQueuedSockets->empty())m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);}#endifif (u == CUDT::INVALID_SOCK){// non-blocking receiving, no connection availableif (!ls->m_pUDT->m_bSynRecving)throw CUDTException(6, 2, 0);// listening socket is closedthrow CUDTException(5, 6, 0);}// 存储对端的地址if ((addr != NULL) && (addrlen != NULL)){if (AF_INET == locate(u)->m_iIPversion)*addrlen = sizeof(sockaddr_in);else*addrlen = sizeof(sockaddr_in6);// copy address information of peer nodememcpy(addr, locate(u)->m_pPeerAddr, *addrlen);}return u;
}

总结

UDT支持两种连接模式:C/S 模式和汇合模式。UDT client 发送一个握手消息(type为 0 的控制报文)给 server 或者 peer。消息携带信息格式见文章 UDT最新协议分析 。

C/S 模式–四次握手

如果一个UDT socket 作为server,会建立一个UDT实体, 并作为 Listener 监听绑定的端口,当有新的连接请求到来时,就会新创建一个 UDT socket,并初始化相关信息,并将新的 UDT socket 相关的信息写入到 Listener。这就是一个连接的建立过程,和TCP的连接过程比较相似。

  1. 当 UDT client 要对一个UDT server建立连接的时候,会在3s内每间隔 250ms 连续发送握手报文,直到收到server反馈回来的握手的报文或者连接超时。
  2. 当 UDT server 第一次接收到来自 UDT client 的握手连接请求的时候,它会根据 client 的 address 和一个 secret key 产生一个 cookie 值,然后发送给 client。
  3. 当 UDT client 收到回应以后,必须把收到的 cookie 再返回发送给 server。
  4. 当 UDT server 接收到一个握手报文和正确的 cookie 时,协商包大小与最大窗口,并把协商结果发送给 client。
  • UDT server 将握手报文中的 packet size 和 maximum window size 信息提取出来,并同 server 端自己的 packet size 和 maximum window size信息相比较,将较小的 packet size 和 maximum window size 信息赋值给自己。
  • UDT server 把包大小与最大窗口等结果发送给client端,并携带上 server 的版本号和初始序列号。为防止丢包,如果后续还接收到同一对端其他握手消息时,仍需要继续发送响应。
  • UDT server 准备接收发送数据。
  • UDT client 收到 server 发送的握手包,开始发送接收数据,如果还有其他握手消息,不再回应。

Rendezvous模式–三次握手

汇合模式下,两端均为客户端,需要两端同时调用udt::connect, 主要用于NAT穿透的情况。

  1. 两个UDT client 同时向对方发起连接请求,发送握手包。连接类型初始值为0。
  2. UDT client 收到对端发送的连接请求后,检查连接类型
  • 如果连接类型为0,那么响应报文中会被设置成-1。
  • 如果连接类型为-1,那么响应报文中会被设置成-2。
  • 如果连接类型为-2,那么将不会有任何反馈信息。

UDT 最新源码分析(三) -- UDT Socket 相关函数相关推荐

  1. UDT 最新源码分析(五) -- 网络数据收发

    UDT 最新源码分析 -- 网络数据收发 从接口实现看 UDT 网络收发 UDT 发送 send / sendmsg / sendfile UDT 接收 recv /recvmsg /recvfile ...

  2. UDT 最新源码分析(二) -- 开始与终止

    UDT 最新源码分析 -- 开始与终止 UDT 开始与终止 开始流程 终止流程 UDT 开始与终止 开始流程 UDT:: startup -> CUDT::startup -> CUDTU ...

  3. Nouveau源码分析(三):NVIDIA设备初始化之nouveau_drm_probe

    Nouveau源码分析(三) 向DRM注册了Nouveau驱动之后,内核中的PCI模块就会扫描所有没有对应驱动的设备,然后和nouveau_drm_pci_table对照. 对于匹配的设备,PCI模块 ...

  4. 【投屏】Scrcpy源码分析三(Client篇-投屏阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  5. Spring源码分析(三)

    Spring源码分析 第三章 手写Ioc和Aop 文章目录 Spring源码分析 前言 一.模拟业务场景 (一) 功能介绍 (二) 关键功能代码 (三) 问题分析 二.使用ioc和aop重构 (一) ...

  6. 【转】ABP源码分析三十四:ABP.Web.Mvc

    ABP.Web.Mvc模块主要完成两个任务: 第一,通过自定义的AbpController抽象基类封装ABP核心模块中的功能,以便利的方式提供给我们创建controller使用. 第二,一些常见的基础 ...

  7. 飞鸽传书源码分析三-网络

    转载请注明出处:http://blog.csdn.net/mxway/article/details/44195099 本文是在飞鸽传书2.06源码基础之上进行分析的. 一.网络的初始化 飞鸽传书主窗 ...

  8. paho架构_MQTT系列最终章-Paho源码分析(三)-心跳与重连机制

    写在前面 通过之前MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收的介绍,相信仔细阅读过的小伙伴已经对Eclipse.Paho内部发送和订阅消息的流程有了一个较为清晰的认识,今天 ...

  9. ABP源码分析三十四:ABP.Web.Mvc

    ABP.Web.Mvc模块主要完成两个任务: 第一,通过自定义的AbpController抽象基类封装ABP核心模块中的功能,以便利的方式提供给我们创建controller使用. 第二,一些常见的基础 ...

最新文章

  1. 配置访问oracle_SpringBoot中application.properties的常用配置
  2. 工作中用的linux命令
  3. 配置SMB共享 、 配置NFS共享
  4. linux常用网络命令详解,linux网络命令详解(鸟哥)
  5. 面试官:聊一下二分法
  6. 【今日CV 计算机视觉论文速览】 11 Mar 2019
  7. SpringCloud工作笔记070---SpringCloud中使用Redis存储List类型数据
  8. Python 中遇到note: see declaration of '_ts'
  9. PHP错误日志记录:display_errors与log_errors的区别
  10. 查看数据库中存在触发器的表
  11. 15.高性能MySQL --- 备份与恢复
  12. 给potplayer配置iptv源,看所有你想看的电视
  13. NB-IoT 的低功耗分析,我们是怎么做的
  14. 测试从零开始-No.9-软件配置管理介绍
  15. js调用exe程序,bs调用cs客户端
  16. parameterize
  17. 清华计算机系与姚班,清华“姚班”,计算机专业学子们无比向往的班级
  18. 计算机组成原理_选择
  19. mybatis-plus打印sql日志和参数到日志文件
  20. 推荐系统从入门到实战笔记

热门文章

  1. 知意字稿的语音转文字功能真的好用吗?
  2. 买天猫网店转让怎么变更资料
  3. 99%的人都理解错了HTTP中GET与POST的区别
  4. 2017 年节点——T 型成长,持续学习
  5. UML—机房收费系统
  6. 最近邻插值与双线性插值
  7. 人工智能:(C语言)采用状态空间法求解八数码问题
  8. matlab上万大型矩阵求逆,要好好总结一下超大矩阵求逆的技巧了
  9. 微软输入法怎么最小化到托盘_微软推出 Windows 10X 系统模拟器,未来的双屏电脑系统长这样?...
  10. 遍历操作__getitem__