UDT 最新源码分析 -- 开始与终止

  • UDT 开始与终止
    • 开始流程
    • 终止流程

UDT 开始与终止

开始流程

UDT:: startup -> CUDT::startup -> CUDTUnited::startup

初始化UDT库,多次调用时,调用计数增加,但实际上仅仅初始化一次。对于windows下,还需要初始化网络库WSAStartup。建立 garbageCollect 线程,用于清理失效socket,并删除分发器。

int CUDTUnited::startup()
{CGuard gcinit(m_InitLock);   //获取初始锁if (m_iInstanceCount++ > 0)  //实例被初始化的计数器,仅初始化一次return 0;if (m_bGCStatus) //garbageCollect状态,保证线程不会被建立多个return true;m_bClosing = false; //CUDTUnited状态, 此状态下 garbageCollect内 while循环不退出pthread_mutex_init(&m_GCStopLock, NULL);pthread_cond_init(&m_GCStopCond, NULL);pthread_create(&m_GCThread, NULL, garbageCollect, this); //garbageCollect 线程创建m_bGCStatus = true;return 0;
}

再来看 garbageCollect:

void* CUDTUnited::garbageCollect(void* p)
{CUDTUnited* self = (CUDTUnited*)p; //获取CUDTUnited实例,类型转化CGuard gcguard(self->m_GCStopLock);//资源清理的锁while (!self->m_bClosing) //初始 m_bClosing = false,无限循环{//当UDT协议判断某一个UDT SOCKET的状态不正确时,会将其状态设置为BROKEN,并在这个函数中进行处理self->checkBrokenSockets();//睡眠等待,等待下一次可以清理出现BROKEN状态的UDT SOCKET。睡眠时间 timeout: 1spthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);}//remove all sockets and multiplexers. m_bClosing = true, 清理目前依旧残余的资源 CGuard::enterCS(self->m_ControlLock);//遍历 m_Sockets 中的 UDT socket,设置为关闭,存入 m_ClosedSockets map,下一步处理连接记录。//查找 Listener, 将 该socket 在Listener中的 m_pQueuedSockets 与 m_pAcceptSockets中记录删除for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i){i->second->m_pUDT->m_bBroken = true; //将CUDT* 的状态设置为 BROKEN,后续进行处理i->second->m_pUDT->close(); //关闭i->second->m_Status = CLOSED;//设置状态为关闭i->second->m_TimeStamp = CTimer::getTime(); //调整最后一次操作 UDT socket 的时间self->m_ClosedSockets[i->first] = i->second;//将当前描述连接的CUDT*保存至 m_ClosedSockets// remove from listener's queuemap<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);if (ls == self->m_Sockets.end()) //如果没有找到Listener, m_ClosedSockets中继续查找{ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);if (ls == self->m_ClosedSockets.end())   //如果没有找到,就不再处理continue;}CGuard::enterCS(ls->second->m_AcceptLock);//获取Listener中的锁ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);//清理接收连接但还未接受的排队 UDT socketls->second->m_pAcceptSockets->erase(i->second->m_SocketID);//清理已完成连接的队列中UDT socketCGuard::leaveCS(ls->second->m_AcceptLock);}self->m_Sockets.clear();//最后删除m_Sockets中的所有sockets//最后再次遍历待关闭的socket队列for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j){j->second->m_TimeStamp = 0;//设置状态为 0}CGuard::leaveCS(self->m_ControlLock);while (true){//之前只是将即将清理的UDT socket 状态设置为BROKEN,此时对BROKEN状态的socket进行清理self->checkBrokenSockets();CGuard::enterCS(self->m_ControlLock);bool empty = self->m_ClosedSockets.empty(); //判断待关闭socket是否已经全部清理CGuard::leaveCS(self->m_ControlLock);if (empty)//如果为empty,就可以直接退出break;CTimer::sleep();//不行的话,再歇一会,再次进行处理}return NULL;
}

上面代码中出现了两次 checkBrokenSockets,该方法用于清理处于BROKEN状态的 UDT socket。

  • 检查所有的 UDT socket

    • 如果不是处于broken状态,查找下一个,如果处于broken状态:

      • 如果 socket 处于 LISTENING 状态,须再等待3s,以防止有客户端正在连接
      • 如果 recvbuffer 存在数据,且 brokencount 计数器仍大于0,继续等待更长的时间
      • 否则:
        • 设置socket为CLOSED状态,更新 m_TimeStamp 为当前时间, 开启移除定时器。
        • 将当前socket加入待关闭 tbc vector, socket加入临时存储closed socket的 m_ClosedSockets map.
        • 查找 Listener, 从m_pQueuedSockets 和 m_pAcceptSockets 移除接收到的socket 连接 socket。
  • 对于移入临时存储待关闭的 m_ClosedSockets map,检查每个socket,
    • 如果 m_ullLingerExpiration > 0,表示在发送缓冲中存在数据时,GC设置了延迟关闭的时间

      • 如果发送缓冲不存在,或为空,或设置关闭时间已经超时:

        • 将m_ullLingerExpiration设置为0;设置UDT socket为关闭状态,在下一次GC中将被回收。更新关闭时间为当前时间。
    • 否则:
      • 如果标记关闭时间已经超过1s,且socket对应接收队列信息节点已经被删除,或者节点不存在list中
      • 将UDT socket加入tbr 移除队列
  • 遍历tbc,从 m_Sockets中删除。遍历tbr, 将这些超时的 socket 移除。
void CUDTUnited::checkBrokenSockets()
{CGuard cg(m_ControlLock);//获取GC锁// set of sockets To Be Closed and To Be Removedvector<UDTSOCKET> tbc; //收集处于Closed状态的 UDT SOCKETvector<UDTSOCKET> tbr; //收集处于Removed状态的 UDT SOCKETfor (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i){// check broken connectionif (i->second->m_pUDT->m_bBroken) //如果处于BROKEN状态{if (i->second->m_Status == LISTENING) //如果是LISTENING UDT SOCKET{// for a listening socket, it should wait an extra 3 seconds in case a client is connectingif (CTimer::getTime() - i->second->m_TimeStamp < 3000000)continue;}else if ((i->second->m_pUDT->m_pRcvBuffer != NULL) && (i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0)){// if there is still data in the receiver buffer, wait longercontinue;//如果缓冲区中依旧有数据,应该等待更长的时间}//close broken connections and start removal timeri->second->m_Status = CLOSED;  //将状态设置为CLOSEDi->second->m_TimeStamp = CTimer::getTime(); //设置UDT SOCKET的关闭时间tbc.push_back(i->first);//将这个UDT SOCKET添加进Closed Array,稍后处理m_ClosedSockets[i->first] = i->second;//将这个UDT SOCKET添加进CLOSED UDT SOCKET MAP// remove from listener's queuemap<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);if (ls == m_Sockets.end()){ls = m_ClosedSockets.find(i->second->m_ListenSocket);if (ls == m_ClosedSockets.end())continue;}CGuard::enterCS(ls->second->m_AcceptLock);ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);CGuard::leaveCS(ls->second->m_AcceptLock);}}for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j){if (j->second->m_pUDT->m_ullLingerExpiration > 0) //如果还没有到等待关闭时间{// asynchronous close: if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime())){ //如果发送缓冲区为空,接收缓冲区为空或者等待关闭时间小于0,调整状态为CLOSED直接关闭j->second->m_pUDT->m_ullLingerExpiration = 0;j->second->m_pUDT->m_bClosing = true; //更新Closed状态,由下一次启动的GC线程回收j->second->m_TimeStamp = CTimer::getTime();//更新关闭的时间}}// timeout 1 second to destroy a socket AND it has been removed from RcvUListif ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList)){tbr.push_back(j->first);}}// move closed sockets to the ClosedSockets structurefor (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)m_Sockets.erase(*k); // remove those timeout socketsfor (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)removeSocket(*l);
}

如果 m_ClosedSockets 中UDT socket 已经超过超过1s, 就会放入 tbr。遍历 tbr, 移除所有socket,更新相关资源。这里用到了removeSocket 方法。

  • 如果UDT socket 是一个 Listener,将所有收到的但未接受的socket 关闭,设置为 BROKEN 状态,等待移除。
  • 删除 m_PeerRec 中记录的连接。
  • 删除 UDT socket, 检查对应的复用器,计数器减一,如果已经为0, 则关闭channel,清理复用器内的资源。
void CUDTUnited::removeSocket(const UDTSOCKET u)
{map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);// invalid socket IDif (i == m_ClosedSockets.end())return;// 由于是多个UDT实例共享一个资源复用器,销毁时要减少引用计数// decrease multiplexer reference count, and remove it if necessaryconst int mid = i->second->m_iMuxID;if (NULL != i->second->m_pQueuedSockets){CGuard::enterCS(i->second->m_AcceptLock);// if it is a listener, close all un-accepted sockets in its queue and remove them laterfor (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q){m_Sockets[*q]->m_pUDT->m_bBroken = true;       //1. 将CUDT*的状态设置为BROKENm_Sockets[*q]->m_pUDT->close();                //2. 调用CUDT中的close()m_Sockets[*q]->m_TimeStamp = CTimer::getTime();//3. 更新UDT SOCKET的关闭时间m_Sockets[*q]->m_Status = CLOSED;              //4. 将UDT SOCKET设置为Closedm_ClosedSockets[*q] = m_Sockets[*q];          //5. 在Closed Array中添加当前UDT SOCKET,在GC线程中进行处理m_Sockets.erase(*q);                         //6. 从全局的MAP中删除}CGuard::leaveCS(i->second->m_AcceptLock);}// remove from peer recmap<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);if (j != m_PeerRec.end()){j->second.erase(u);if (j->second.empty())m_PeerRec.erase(j);}// delete this onei->second->m_pUDT->close();delete i->second;m_ClosedSockets.erase(i);map<int, CMultiplexer>::iterator m;m = m_mMultiplexer.find(mid);if (m == m_mMultiplexer.end())//如果这个资源复用器不存在,直接返回{//something is wrong!!!return;}m->second.m_iRefCount --;//否则的话,减少这个资源复用器的引用计数if (0 == m->second.m_iRefCount){m->second.m_pChannel->close(); //与UDP SOCKET关联的Channel直接关闭delete m->second.m_pSndQueue;  //清理资源复用器的资源delete m->second.m_pRcvQueue;delete m->second.m_pTimer;delete m->second.m_pChannel;m_mMultiplexer.erase(m);}
}

终止流程

  • UDT::cleanup -> CUDT::cleanup -> CUDTUnited::cleanup

终止流程与开始流程相反,在计数器清零以后才会真正退出,每次调用,计数减一。修改 m_bClosing 与 m_bGCStatus 状态,终止垃圾回收线程。

int CUDTUnited::cleanup()
{CGuard gcinit(m_InitLock);if (--m_iInstanceCount > 0) //计数器减一return 0;if (!m_bGCStatus) return 0;m_bClosing = true;pthread_cond_signal(&m_GCStopCond);pthread_join(m_GCThread, NULL);pthread_mutex_destroy(&m_GCStopLock);pthread_cond_destroy(&m_GCStopCond);m_bGCStatus = false;return 0;
}

对UDT库的初始化与终止流程分析结束,在例程中,基本上都是显示调用这两个接口,但是在 appclient.cpp 中,通过构造函数与析构函数隐式完成。两种方式均可,可根据需要自选。

struct UDTUpDown{UDTUpDown(){// use this function to initialize the UDT libraryUDT::startup();}~UDTUpDown(){// use this function to release the UDT libraryUDT::cleanup();}
};

UDT 最新源码分析(二) -- 开始与终止相关推荐

  1. UDT 最新源码分析(五) -- 网络数据收发

    UDT 最新源码分析 -- 网络数据收发 从接口实现看 UDT 网络收发 UDT 发送 send / sendmsg / sendfile UDT 接收 recv /recvmsg /recvfile ...

  2. UDT 最新源码分析(三) -- UDT Socket 相关函数

    UDT 最新源码分析 -- UDT Socket 相关函数 UDT socket 建立与使用 主要流程 C/S 模式 Rendezvous 模式 UDT epoll UDT socket 创建 UDT ...

  3. ENS最新合约源码分析二

    ENS(以太坊域名服务)智能合约源码分析二 0.简介 ​ 本次分享直接使用线上实际注册流程来分析最新注册以太坊域名的相关代码.本次主要分析最新的关于普通域名注册合约和普通域名迁移合约,短域名竞拍合约不 ...

  4. Android Q 10.1 KeyMaster源码分析(二) - 各家方案的实现

    写在之前 这两篇文章是我2021年3月初看KeyMaster的笔记,本来打算等分析完KeyMaster和KeyStore以后再一起做成一系列贴出来,后来KeyStore的分析中断了,这一系列的文章就变 ...

  5. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  6. SpringBoot源码分析(二)之自动装配demo

    SpringBoot源码分析(二)之自动装配demo 文章目录 SpringBoot源码分析(二)之自动装配demo 前言 一.创建RedissonTemplate的Maven服务 二.创建测试服务 ...

  7. gSOAP 源码分析(二)

    gSOAP 源码分析(二) 2012-5-24 flyfish 一 gSOAP XML介绍 Xml的全称是EXtensible Markup Language.可扩展标记语言.仅仅是一个纯文本.适合用 ...

  8. 【投屏】Scrcpy源码分析二(Client篇-连接阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  9. Nouveau源码分析(二):Nouveau结构体的基本框架

    Nouveau源码分析(二) 在讨论Nouveau对Nvidia设备的初始化前,我准备先说一下Nouveau结构体的基本框架 Nouveau的很多结构体都可以看作是C++中的类,之间有很多相似的东西, ...

最新文章

  1. python 列表表达式 if_python中if else如何判断表达式成立?
  2. 前端学习记录 CSS
  3. C++ Primer 5th笔记(chap 18 大型程序工具) 多重继承与虚继承
  4. LeetCode——16. 3Sum Closest
  5. Altium designer中提示some net were not able to be matched问题解决办法
  6. JTable 失去焦点时取消编辑状态
  7. 面试:Java线程有哪几种状态,它们之间是如何切换的
  8. ELMo代码详解(一):数据准备
  9. Vsftpd 虚拟用户配置参考---终极版
  10. java常用序列化与反序列化方法
  11. 完美解决VB中int to short溢出问题
  12. 拉格朗日乘数法,一种计算条件极值的方式
  13. 学妹要的20道Redis面试题,在这也分享一下(转载自程序羊羊哥)
  14. Flutter从相册选择图片和相机拍照(image_picker)
  15. 元组创建、删除、最大值、最小值、求长度、计数
  16. 计算机分辨率无法调整,教你电脑分辨率调不过来怎么办
  17. 2020书单、影单、电视剧
  18. 虚拟机中新增磁盘空间并开机自动挂载
  19. html 实时统计字数,记一次前端 input、textarea输入框实时 统计字数(真实字数)...
  20. 阿里云团队漏洞托管、渗透测试、攻防演练

热门文章

  1. 傅里叶变换短时傅里叶变换小波变换
  2. 致谢词大全字C语言,致谢词范文
  3. c++ union学习
  4. 艺赛旗RPA 网页处理系列(三):网页检查 / 审查小技巧
  5. C#Aspose操作Word Excel简版(后会研究补充更多功能)
  6. 定义复数java_java定义复数的方法
  7. 无线个人通信(WPAN)-蓝牙
  8. 最适合freshman的Java习题集(三)
  9. git 默认的名字和账号
  10. 计算机网络协议——墙都不服,就服你系列