epoll 是为处理大量句柄而改进的poll,在UDT中也有支持。UDT使用了内核提供的epoll,主要是epoll_create,epoll_wait,epoll_ctl,UDT定义了CEPollDesc这个结构来管理epoll的描述符和套接字。
struct CEPollDesc
{
int m_iID;                                // epoll ID
std::set<UDTSOCKET> m_sUDTSocksOut;       // set of UDT sockets waiting for write events
std::set<UDTSOCKET> m_sUDTSocksIn;        // set of UDT sockets waiting for read events
std::set<UDTSOCKET> m_sUDTSocksEx;        // set of UDT sockets waiting for exceptions
int m_iLocalID;                           // local system epoll ID
std::set<SYSSOCKET> m_sLocals;            // set of local (non-UDT) descriptors
std::set<UDTSOCKET> m_sUDTWrites;         // UDT sockets ready for write
std::set<UDTSOCKET> m_sUDTReads;          // UDT sockets ready for read
std::set<UDTSOCKET> m_sUDTExcepts;        // UDT sockets with exceptions (connection broken, etc.)
};
特别要提醒的是,当对端socket连接中断的时候,也是在m_sUDTReads里的
UDT还实现了一个类来进行各项操作,实现的有
create():创建一个epoll,调用了epoll_create
add_usock():添加一个UDT套接字到epoll
add_ssock():添加一个系统套接字到epoll,调用了epoll_ctl
remove_usock():从epoll中移除一个UDT套接字
remove_ssock():从epoll中移除一个系统套接字,调用了epoll_ctl
wait():等待epoll事件或者超时,调用了epoll_wait
release():关闭并释放一个epoll
UDT里对epoll的调用是四段式的,比如add_usock这里,调用顺序是epoll_add_usock()->CUDT::epoll_add_usock()->s_UDTUnited.epoll_add_usock()->CEPoll::add_usock
int epoll_add_usock(int eid, UDTSOCKET u, const int* events)
{
return CUDT::epoll_add_usock(eid, u, events);
}
int CUDT::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
{
try
{
return s_UDTUnited.epoll_add_usock(eid, u, events);
}
catch (CUDTException e)
{
s_UDTUnited.setError(new CUDTException(e));
return ERROR;
}
catch (...)
{
s_UDTUnited.setError(new CUDTException(-1, 0, 0));
return ERROR;
}
}
int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
{
CUDTSocket* s = locate(u);
int ret = -1;
if (NULL != s)
{
ret = m_EPoll.add_usock(eid, u, events);
s->m_pUDT->addEPoll(eid);
}
else
{
throw CUDTException(5, 4);
}
return ret;
}
int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(5, 13);
if (!events || (*events & UDT_EPOLL_IN))       //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算
p->second.m_sUDTSocksIn.insert(u);
if (!events || (*events & UDT_EPOLL_OUT))
p->second.m_sUDTSocksOut.insert(u);
return 0;
}
UDT命名空间提供给应用程序调用接口,可成为API层,API层调用CUDT API,这一层主要做错误处理,捕捉下面两层抛出的错误保存起来交给应用程序使用,CUDT API调用CUDTUnited的实现,如果是UDT SOCKET的socket(),bind(),listen()等,到这层也就结束了,不过epoll要多个第四层,再调用CEPoll的实现。现在来看看CUDTUnited和CEPoll的实现。
CUDTSocket* s = locate(u);
调用CUDTUnited::locate(),根据SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到对应的CUDTSocket结构,如果找不到,抛出错误,找到了就调用CEPoll的add_usock实现,根据传的event参数,将socket导入相应的队列。之后调用s->m_pUDT->addEPoll(eid)
void CUDT::addEPoll(const int eid)
{
CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);                      //这种通过类来实现加锁解锁的,我第一次见,还挺方便。
m_sPollID.insert(eid);
CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
if (!m_bConnected || m_bBroken || m_bClosing)
return;
if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||
((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))
{
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
}
if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())
{
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
}
}
这里已经开始更新m_sUDTWrites,m_sUDTReads,通过update_events(),如前所述,update_events()也是CEPoll的成员函数。
int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p;
vector<int> lost;
for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
{
p = m_mPolls.find(*i);
if (p == m_mPolls.end())
{
lost.push_back(*i);
}
else
{
if ((events & UDT_EPOLL_IN) != 0)
update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);   //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算
if ((events & UDT_EPOLL_OUT) != 0)
update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
if ((events & UDT_EPOLL_ERR) != 0)
update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
}
}
for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
eids.erase(*i);
return 0;
}
void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
{
if (enable && (watch.find(uid) != watch.end()))
{
result.insert(uid);
}
else if (!enable)
{
result.erase(uid);
}
}
最后说下wait函数的实现,一样是四段式,就跳过前面三段,直接看第四段
int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
{
// if all fields is NULL and waiting time is infinite, then this would be a deadlock   都空的的话,会死锁,抛出异常
if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))
throw CUDTException(5, 3, 0);
// Clear these sets in case the app forget to do it.  清空
if (readfds) readfds->clear();
if (writefds) writefds->clear();
if (lrfds) lrfds->clear();
if (lwfds) lwfds->clear();
int total = 0;
int64_t entertime = CTimer::getTime();
while (true)
{
CGuard::enterCS(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
{
CGuard::leaveCS(m_EPollLock);
throw CUDTException(5, 13);
}
if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
{
// no socket is being monitored, this may be a deadlock  都空的的话,会死锁,抛出异常
CGuard::leaveCS(m_EPollLock);
throw CUDTException(5, 3);
}
// Sockets with exceptions are returned to both read and write sets.          把m_sUDTReads和m_sUDTExcepts都读到readfds里
if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
{
*readfds = p->second.m_sUDTReads;
for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
readfds->insert(*i);
total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();
}
if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))          //把m_sUDTWrites和m_sUDTExcepts都读到writefds里
{
*writefds = p->second.m_sUDTWrites;
for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
writefds->insert(*i);
total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
}
if (lrfds || lwfds)     //读系统套接字
{
#ifdef LINUX
const int max_events = p->second.m_sLocals.size();
epoll_event ev[max_events];
int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);
for (int i = 0; i < nfds; ++ i)
{
if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
{
lrfds->insert(ev[i].data.fd);
++ total;
}
if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
{
lwfds->insert(ev[i].data.fd);
++ total;
}
}
#else
//currently "select" is used for all non-Linux platforms.
//faster approaches can be applied for specific systems in the future.
//"select" has a limitation on the number of sockets
fd_set readfds;
fd_set writefds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
{
if (lrfds)
FD_SET(*i, &readfds);
if (lwfds)
FD_SET(*i, &writefds);
}
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
if (::select(0, &readfds, &writefds, NULL, &tv) > 0)
{
for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
{
if (lrfds && FD_ISSET(*i, &readfds))
{
lrfds->insert(*i);
++ total;
}
if (lwfds && FD_ISSET(*i, &writefds))
{
lwfds->insert(*i);
++ total;
}
}
}
#endif
}
CGuard::leaveCS(m_EPollLock);
if (total > 0)
return total;
if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))
throw CUDTException(6, 3, 0);
CTimer::waitForEvent();
}
return 0;
}

UDT中的epoll相关推荐

  1. Learning by doing 系列文章(之一)如何在 Python 中使用 epoll ?

    epoll 简介 参见本博前一文<epoll使用详解> Epoll Within Python Python 在 2.6 版中引入了用于处理Linux epoll系统调用的API,本文简单 ...

  2. epoll监听文件_介绍一下 Android Handler 中的 epoll 机制?

    介绍一下 Android Handler 中的 epoll 机制? 目录: IO 多路复用 select.poll.epoll 对比 epoll API epoll 使用示例 Handler 中的 e ...

  3. linux中的epoll机制

    在linux的网络编程中,很长的时间都在使用select来做事件触发.在linux新的内核中,有了一种替换它的机制,就是epoll. 相比于select,epoll最大的好处在于它不会随着监听fd数目 ...

  4. UDT中select异常

    转载:http://blog.csdn.net/seebit/article/details/6050428 select, selectEx的参数采用了stl的容器,在DLL方式下,会发生异常. s ...

  5. Python中的select、epoll详解

    Python中的select.epoll详解 文章目录 Python中的select.epoll详解 一.select 1.相关概念 2.select的特性 1.那么单进程是如何实现多并发的呢??? ...

  6. html select选择事件_一道搜狗面试题:IO多路复用中select、poll、epoll之间的区别...

    (1)select==>时间复杂度O(n) 它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对 ...

  7. IO多路复用中select、poll、epoll之间的区别

    本文来说下IO多路复用中select.poll.epoll之间的区别 文章目录 什么是IO多路复用 为什么有IO多路复用机制 同步阻塞(BIO) 同步非阻塞(NIO) IO多路复用(现在的做法) 3种 ...

  8. 应用服务器中对JDK的epoll空转bug的处理

    前面讲到了epoll的一些机制,与select和poll等传统古老的IO多路复用机制的一些区别,这些区别实质可以总结为一句话, 就是epoll将重要的基于事件的fd集合放在了内核中来完成,因为内核是高 ...

  9. UDT协议实现分析——连接的建立

    UDT Server在执行UDT::listen()之后,就可以接受其它节点的连接请求了.这里我们研究一下UDT连接建立的过程. 连接的发起 来看连接的发起方.如前面我们看到的那样,UDT Clien ...

最新文章

  1. Microbiome:Kraken2进行16S物种注释又快又准
  2. JavaScript的写类方式(4)——转
  3. TCP协议经典数据--TCP/IP高效编程 : 改善网络程序的44个技巧
  4. 方程式漏洞之复现window2008/win7 远程命令执行漏洞
  5. 计算机绘图中特征建模的概念,2016年电大 -机械cadcam计算机辅助设计制造习题集.doc...
  6. linux控制协程参数,Linux高性能网络:协程系列05-协程实现之原语操作
  7. 2016年2月23日----Javascript运算符
  8. 让你的 Linux 远离黑客(二):另外三个建议
  9. Java eclipse汉字乱码解决
  10. 如何实现SSID白名单管控
  11. CSV 文件中的字段中的开头和结尾上,可能会存在空格或制表符,但是该如何处理呢?
  12. 台式电脑怎么组装步骤_详细教您台式电脑如何快速组装
  13. littleVGL开发:littleVGL的介绍
  14. 1068 万绿丛中一点红 (C++)
  15. 浅入浅出redis----II
  16. 由Nginx源码写双向循环链表
  17. windows更改密码脚本_如何更改您的Windows密码
  18. 小学教师计算机培训记录内容,小学教师个人的培训工作总结
  19. 第二章.Java程序设计基础
  20. 卷积后的尺寸大小问题

热门文章

  1. HTML5 Notification
  2. android中shape的属性
  3. IIS出现 分析器错误消息: 在应用程序级别之外使用注册为 allowDefinition='MachineToApplication' 的节是错误的...
  4. [转]深入理解Java 8 Lambda(语言篇——lambda,方法引用,目标类型和默认方法)...
  5. Install Java on Ubuntu server
  6. PyTorch 入坑五 autograd与逻辑回归
  7. Halcon/MFC混合编程入门
  8. python自动测试m_python自动化测试实例解析
  9. php 循环获取分类,PHP 循环删除无限分类子节点
  10. vue项目没有router文件夹_Vue路由(vue-router)配置实战——动态路由,重定向,工程非根目录...