http://blog.csdn.net/zhuxiaoping54532/article/details/56494313

处理大并发之一

对epoll的理解,epoll客户端服务端代码

序言:

该博客是一系列的博客,首先从最基础的epoll说起,然后研究libevent源码及使用方法,最后研究nginx和node.js,关于select,poll这里不做说明,只说明其相对于epoll的不足,其实select和poll我也没用过,因为我选择了epoll。

说起epoll,做过大并发的估计都不陌生,之前做了个STB的工具,用的就是epoll处理并发,测试1.5W的并发(非简单的发消息,包括多个服务端和客户端的处理)是通过的,epoll的功能强大,让我有一定的体会,在nginx和node.js中利用的就是libevent,而libevent使用的就是epoll,如果系统不支持epoll,会选择最佳的方式(select,poll或kqueue中的一种)。

基础资料:

epoll名词解释:

看下百科名片是怎么解释epoll的吧,(http://baike.baidu.com/view/1385104.htm)epoll是Linux内核为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

关于具体的说明可以参考网上资料,我这里罗列下epoll相对于其他多路复用机制(select,poll)的优点吧:

epoll优点:

1. 支持一个进程打开大数目的socket描述符。

2. IO效率不随FD数目增加而线性下降,传统的select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。

3. 使用mmap加速内核与用户空间的消息传递。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。

select和poll的缺点:

1. 每次调用时要重复地从用户态读入参数。

2. 每次调用时要重复地扫描文件描述符。

3. 每次在调用开始时,要把当前进程放入各个文件描述符的等待队列。在调用结束后,又把进程从各个等待队列中删除。

分析libevent的时候,发现一个博客介绍epoll的不错,网址是:http://blog.csdn.net/sparkliang/article/details/4770655

epoll用的的数据结构和函数

数据结构

typedef union epoll_data { 
                void *ptr; 
                int fd; 
                __uint32_t u32; 
                __uint64_t u64; 
        } epoll_data_t;

struct epoll_event { 
                __uint32_t events;      /* epoll events */ 
                epoll_data_t data;      /* user data variable */ 
        };

结构体epoll_event 被用于注册所感兴趣的事件和回传所发生待处理的事件. 
其中epoll_data 联合体用来保存触发事件的某个文件描述符相关的数据. 
例如一个client连接到服务器,服务器通过调用accept函数可以得到于这个client对应的socket文件描述符,可以把这文件描述符赋给epoll_data的fd字段以便后面的读写操作在这个文件描述符上进行。epoll_event 结构体的events字段是表示感兴趣的事件和被触发的事件可能的取值为: 
EPOLLIN :表示对应的文件描述符可以读; 
EPOLLOUT:表示对应的文件描述符可以写; 
EPOLLPRI:表示对应的文件描述符有紧急的数据可读 
EPOLLERR:表示对应的文件描述符发生错误; 
EPOLLHUP:表示对应的文件描述符被挂断; 
EPOLLET:表示对应的文件描述符有事件发生;

ET和LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

ET和LT的区别在于LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读,则不断的通知你。而ET则只在事件发生之时通知。可以简单理解为LT是水平触发,而ET则为边缘触发。
ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;而LT模式是只要有数据没有处理就会一直通知下去的.

函数

[cpp] view plain copy
  1. 1. int epoll_create(int size);
  2. 2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  3. 3. int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

详解:

1. int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事

3. int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

等待事件的产生,参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

资料总结这么多,其实都是一些理论,关键还在于代码,下面附上我使用epoll开发的服务端和客户端,该代码我会上传到我的资源里,可以免费下载。链接:

如果代码有bug,可以告诉我。

程序demo

首先对服务端和客户端做下说明:

我想实现的是客户端和服务端并发的程序,客户端通过配置并发数,说明有多少个用户去连接服务端。

客户端会发送消息:"Client: i send message Hello Server!”,其中i表示哪一个客户端;收到消息:"Recv Server Msg Content:%s\n"。

例如:

发送:Client: 1 send message "Hello Server!"

接收:Recv Derver Msg Content:Hello, client fd: 6

服务端收到后给客户端回复消息:"Hello, client fd: i",其中i表示服务端接收的fd,用户区别是哪一个客户端。接收客户端消息:"Terminal Received Msg Content:%s\n"

例如:

发送:Hello, client fd: 6

接收:Terminal Received Msg Content:Client: 1 send message "Hello Server!"

备注:这里在接收到消息后,直接打印出消息,如果需要对消息进行处理(如果消息处理比较占用时间,不能立即返回,可以将该消息放入一个队列中,然后开启一个线程从队列中取消息进行处理,这样的话不会因为消息处理而阻塞epoll)。libenent好像对这种有2中处理方式,一个就是回调,要求回调函数,不占用太多的时间,基本能立即返回,另一种好像也是一个队列实现的,这个还需要研究。

服务端代码说明:

服务端在绑定监听后,开启了一个线程,用于负责接收客户端连接,加入到epoll中,这样只要accept到客户端的连接,就将其add EPOLLIN到epoll中,然后进入循环调用epoll_wait,监听到读事件,接收数据,并将事件修改为EPOLLOUT;反之监听到写事件,发送数据,并将事件修改为EPOLLIN。

[cpp] view plain copy
  1. cepollserver.h
  2. #ifndef  C_EPOLL_SERVER_H
  3. #define  C_EPOLL_SERVER_H
  4. #include <sys/epoll.h>
  5. #include <sys/socket.h>
  6. #include <netinet/in.h>
  7. #include <fcntl.h>
  8. #include <arpa/inet.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <iostream>
  12. #include <pthread.h>
  13. #define _MAX_SOCKFD_COUNT 65535
  14. class CEpollServer
  15. {
  16. public:
  17. CEpollServer();
  18. ~CEpollServer();
  19. bool InitServer(const char* chIp, int iPort);
  20. void Listen();
  21. static void ListenThread( void* lpVoid );
  22. void Run();
  23. private:
  24. int        m_iEpollFd;
  25. int        m_isock;
  26. pthread_t       m_ListenThreadId;// 监听线程句柄
  27. };
  28. #endif

cepollserver.cpp

[cpp] view plain copy
  1. #include "cepollserver.h"
  2. using namespace std;
  3. CEpollServer::CEpollServer()
  4. {
  5. }
  6. CEpollServer::~CEpollServer()
  7. {
  8. close(m_isock);
  9. }
  10. bool CEpollServer::InitServer(const char* pIp, int iPort)
  11. {
  12. m_iEpollFd = epoll_create(_MAX_SOCKFD_COUNT);
  13. //设置非阻塞模式
  14. int opts = O_NONBLOCK;
  15. if(fcntl(m_iEpollFd,F_SETFL,opts)<0)
  16. {
  17. printf("设置非阻塞模式失败!\n");
  18. return false;
  19. }
  20. m_isock = socket(AF_INET,SOCK_STREAM,0);
  21. if ( 0 > m_isock )
  22. {
  23. printf("socket error!\n");
  24. return false;
  25.   }
  26.   
  27.   sockaddr_in listen_addr;
  28.       listen_addr.sin_family=AF_INET;
  29.       listen_addr.sin_port=htons ( iPort );
  30.       listen_addr.sin_addr.s_addr=htonl(INADDR_ANY);
  31.       listen_addr.sin_addr.s_addr=inet_addr(pIp);
  32.   
  33.       int ireuseadd_on = 1;//支持端口复用
  34.       setsockopt(m_isock, SOL_SOCKET, SO_REUSEADDR, &ireuseadd_on, sizeof(ireuseadd_on) );
  35.   
  36.       if ( bind ( m_isock, ( sockaddr * ) &listen_addr,sizeof ( listen_addr ) ) !=0 )
  37.       {
  38.           printf("bind error\n");
  39.           return false;
  40.       }
  41.   
  42.       if ( listen ( m_isock, 20) <0 )
  43.       {
  44.           printf("listen error!\n");
  45.           return false;
  46.       }
  47.       else
  48.       {
  49.           printf("服务端监听中...\n");
  50.       }
  51.   
  52.       // 监听线程,此线程负责接收客户端连接,加入到epoll中
  53.       if ( pthread_create( &m_ListenThreadId, 0, ( void * ( * ) ( void * ) ) ListenThread, this ) != 0 )
  54.       {
  55.           printf("Server 监听线程创建失败!!!");
  56.           return false;
  57.       }
  58.   }
  59.   // 监听线程
  60.   void CEpollServer::ListenThread( void* lpVoid )
  61.   {
  62.       CEpollServer *pTerminalServer = (CEpollServer*)lpVoid;
  63.       sockaddr_in remote_addr;
  64.       int len = sizeof (remote_addr);
  65.       while ( true )
  66.       {
  67.           int client_socket = accept (pTerminalServer->m_isock, ( sockaddr * ) &remote_addr,(socklen_t*)&len );
  68.           if ( client_socket < 0 )
  69.           {
  70.               printf("Server Accept失败!, client_socket: %d\n", client_socket);
  71.               continue;
  72.           }
  73.           else
  74.           {
  75.               struct epoll_event    ev;
  76.               ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;
  77.               ev.data.fd = client_socket;     //记录socket句柄
  78.               epoll_ctl(pTerminalServer->m_iEpollFd, EPOLL_CTL_ADD, client_socket, &ev);
  79.           }
  80.       }
  81.   }
  82.   
  83.   void CEpollServer::Run()
  84.   {
  85.       while ( true )
  86.       {
  87.           struct epoll_event    events[_MAX_SOCKFD_COUNT];
  88.           int nfds = epoll_wait( m_iEpollFd, events,  _MAX_SOCKFD_COUNT, -1 );
  89.           for (int i = 0; i < nfds; i++)
  90.           {
  91.               int client_socket = events[i].data.fd;
  92.               char buffer[1024];//每次收发的字节数小于1024字节
  93.               memset(buffer, 0, 1024);
  94.               if (events[i].events & EPOLLIN)//监听到读事件,接收数据
  95.               {
  96.                   int rev_size = recv(events[i].data.fd,buffer, 1024,0);
  97.                   if( rev_size <= 0 )
  98.                   {
  99.                       cout << "recv error: recv size: " << rev_size << endl;
  100.                       struct epoll_event event_del;
  101.                       event_del.data.fd = events[i].data.fd;
  102.                       event_del.events = 0;
  103.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del);
  104.                   }
  105.                   else
  106.                   {
  107.                       printf("Terminal Received Msg Content:%s\n",buffer);
  108.                       struct epoll_event    ev;
  109.                       ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
  110.                       ev.data.fd = client_socket;     //记录socket句柄
  111.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, client_socket, &ev);
  112.                   }
  113.               }
  114.   else if(events[i].events & EPOLLOUT)//监听到写事件,发送数据
  115.               {
  116.                   char sendbuff[1024];
  117.                   sprintf(sendbuff, "Hello, client fd: %d\n", client_socket);
  118.                   int sendsize = send(client_socket, sendbuff, strlen(sendbuff)+1, MSG_NOSIGNAL);
  119.                   if(sendsize <= 0)
  120.                   {
  121.                       struct epoll_event event_del;
  122.                       event_del.data.fd = events[i].data.fd;
  123.                       event_del.events = 0;
  124.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del);
  125.                   }
  126.                   else
  127.                   {
  128.                       printf("Server reply msg ok! buffer: %s\n", sendbuff);
  129.                       struct epoll_event    ev;
  130.                       ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;
  131.                       ev.data.fd = client_socket;     //记录socket句柄
  132.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, client_socket, &ev);
  133.                   }
  134.               }
  135.               else
  136.               {
  137.                   cout << "EPOLL ERROR\n" <<endl;
  138.                   epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, events[i].data.fd, &events[i]);
  139.               }
  140.           }
  141.       }
  142.   }

main.cpp

[cpp] view plain copy
  1. #include <iostream>
  2. #include "cepollserver.h"
  3. using namespace std;
  4. int main()
  5. {
  6. CEpollServer  theApp;
  7. theApp.InitServer("127.0.0.1", 8000);
  8. theApp.Run();
  9. return 0;
  10. }

客户端代码:

说明:测试是两个并发进行测试,每一个客户端都是一个长连接。代码中在连接服务器(ConnectToServer)时将用户ID和socketid关联起来。用户ID和socketid是一一对应的关系。

cepollclient.h

[cpp] view plain copy
  1. #ifndef _DEFINE_EPOLLCLIENT_H_
  2. #define _DEFINE_EPOLLCLIENT_H_
  3. #define _MAX_SOCKFD_COUNT 65535
  4. #include<iostream>
  5. #include <sys/epoll.h>
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <fcntl.h>
  9. #include <arpa/inet.h>
  10. #include <errno.h>
  11. #include <sys/ioctl.h>
  12. #include <sys/time.h>
  13. #include <string>
  14. using namespace std;
  15. /**
  16. * @brief 用户状态
  17. */
  18. typedef enum _EPOLL_USER_STATUS_EM
  19. {
  20. FREE = 0,
  21. CONNECT_OK = 1,//连接成功
  22. SEND_OK = 2,//发送成功
  23. RECV_OK = 3,//接收成功
  24. }EPOLL_USER_STATUS_EM;
  25. /*@brief
  26. *@CEpollClient class 用户状态结构体
  27. */
  28. struct UserStatus
  29. {
  30. EPOLL_USER_STATUS_EM iUserStatus;//用户状态
  31. int iSockFd;//用户状态关联的socketfd
  32. char cSendbuff[1024];//发送的数据内容
  33. int iBuffLen;//发送数据内容的长度
  34. unsigned int uEpollEvents;//Epoll events
  35. };
  36. class CEpollClient
  37. {
  38. public:
  39. /**
  40. * @brief
  41. * 函数名:CEpollClient
  42. * 描述:构造函数
  43. * @param [in] iUserCount
  44. * @param [in] pIP IP地址
  45. * @param [in] iPort 端口号
  46. * @return 无返回
  47. */
  48. CEpollClient(int iUserCount, const char* pIP, int iPort);
  49. /**
  50. * @brief
  51. * 函数名:CEpollClient
  52. * 描述:析构函数
  53. * @return 无返回
  54. */
  55. ~CEpollClient();
  56. /**
  57. * @brief
  58. * 函数名:RunFun
  59. * 描述:对外提供的接口,运行epoll类
  60. * @return 无返回值
  61. */
  62. int RunFun();
  63. private:
  64. /**
  65. * @brief
  66. * 函数名:ConnectToServer
  67. * 描述:连接到服务器
  68. * @param [in] iUserId 用户ID
  69. * @param [in] pServerIp 连接的服务器IP
  70. * @param [in] uServerPort 连接的服务器端口号
  71. * @return 成功返回socketfd,失败返回的socketfd为-1
  72. */
  73. int ConnectToServer(int iUserId,const char *pServerIp,unsigned short uServerPort);
  74. /**
  75. * @brief
  76. * 函数名:SendToServerData
  77. * 描述:给服务器发送用户(iUserId)的数据
  78. * @param [in] iUserId 用户ID
  79. * @return 成功返回发送数据长度
  80. */
  81. int SendToServerData(int iUserId);
  82. /**
  83. * @brief
  84. * 函数名:RecvFromServer
  85. * 描述:接收用户回复消息
  86. * @param [in] iUserId 用户ID
  87. * @param [in] pRecvBuff 接收的数据内容
  88. * @param [in] iBuffLen 接收的数据长度
  89. * @return 成功返回接收的数据长度,失败返回长度为-1
  90. */
  91. int RecvFromServer(int iUserid,char *pRecvBuff,int iBuffLen);
  92. /**
  93. * @brief
  94. * 函数名:CloseUser
  95. * 描述:关闭用户
  96. * @param [in] iUserId 用户ID
  97. * @return 成功返回true
  98. */
  99. bool CloseUser(int iUserId);
  100. /**
  101. * @brief
  102. * 函数名:DelEpoll
  103. * 描述:删除epoll事件
  104. * @param [in] iSockFd socket FD
  105. * @return 成功返回true
  106. */
  107. bool DelEpoll(int iSockFd);
  108. private:
  109. int    m_iUserCount;//用户数量;
  110. struct UserStatus *m_pAllUserStatus;//用户状态数组
  111. int    m_iEpollFd;//需要创建epollfd
  112. int    m_iSockFd_UserId[_MAX_SOCKFD_COUNT];//将用户ID和socketid关联起来
  113. int    m_iPort;//端口号
  114. char   m_ip[100];//IP地址
  115. };
  116. #endif

cepollclient.cpp

[cpp] view plain copy
  1. #include "cepollclient.h"
  2. CEpollClient::CEpollClient(int iUserCount, const char* pIP, int iPort)
  3. {
  4. strcpy(m_ip, pIP);
  5. m_iPort = iPort;
  6. m_iUserCount = iUserCount;
  7. m_iEpollFd = epoll_create(_MAX_SOCKFD_COUNT);
  8. m_pAllUserStatus = (struct UserStatus*)malloc(iUserCount*sizeof(struct UserStatus));
  9. for(int iuserid=0; iuserid<iUserCount ; iuserid++)
  10. {
  11. m_pAllUserStatus[iuserid].iUserStatus = FREE;
  12. sprintf(m_pAllUserStatus[iuserid].cSendbuff, "Client: %d send message \"Hello Server!\"\r\n", iuserid);
  13. m_pAllUserStatus[iuserid].iBuffLen = strlen(m_pAllUserStatus[iuserid].cSendbuff) + 1;
  14. m_pAllUserStatus[iuserid].iSockFd = -1;
  15. }
  16. memset(m_iSockFd_UserId, 0xFF, sizeof(m_iSockFd_UserId));
  17. }
  18. CEpollClient::~CEpollClient()
  19. {
  20. free(m_pAllUserStatus);
  21. }
  22. int CEpollClient::ConnectToServer(int iUserId,const char *pServerIp,unsigned short uServerPort)
  23. {
  24. if( (m_pAllUserStatus[iUserId].iSockFd = socket(AF_INET,SOCK_STREAM,0) ) < 0 )
  25. {
  26. cout <<"[CEpollClient error]: init socket fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<<endl;
  27. m_pAllUserStatus[iUserId].iSockFd = -1;
  28. return  m_pAllUserStatus[iUserId].iSockFd;
  29. }
  30. struct sockaddr_in addr;
  31. bzero(&addr, sizeof(addr));
  32. addr.sin_family = AF_INET;
  33. addr.sin_port = htons(uServerPort);
  34. addr.sin_addr.s_addr = inet_addr(pServerIp);
  35. int ireuseadd_on = 1;//支持端口复用
  36. setsockopt(m_pAllUserStatus[iUserId].iSockFd, SOL_SOCKET, SO_REUSEADDR, &ireuseadd_on, sizeof(ireuseadd_on));
  37. unsigned long ul = 1;
  38. ioctl(m_pAllUserStatus[iUserId].iSockFd, FIONBIO, &ul); //设置为非阻塞模式
  39. connect(m_pAllUserStatus[iUserId].iSockFd, (const sockaddr*)&addr, sizeof(addr));
  40. m_pAllUserStatus[iUserId].iUserStatus = CONNECT_OK;
  41. m_pAllUserStatus[iUserId].iSockFd = m_pAllUserStatus[iUserId].iSockFd;
  42. return m_pAllUserStatus[iUserId].iSockFd;
  43. }
  44. int CEpollClient::SendToServerData(int iUserId)
  45. {
  46. sleep(1);//此处控制发送频率,避免狂打日志,正常使用中需要去掉
  47. int isendsize = -1;
  48. if( CONNECT_OK == m_pAllUserStatus[iUserId].iUserStatus || RECV_OK == m_pAllUserStatus[iUserId].iUserStatus)
  49. {
  50. isendsize = send(m_pAllUserStatus[iUserId].iSockFd, m_pAllUserStatus[iUserId].cSendbuff, m_pAllUserStatus[iUserId
  51. ].iBuffLen, MSG_NOSIGNAL);
  52. if(isendsize < 0)
  53. {
  54. cout <<"[CEpollClient error]: SendToServerData, send fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<
  55. <endl;
  56. }
  57. else
  58. {
  59. printf("[CEpollClient info]: iUserId: %d Send Msg Content:%s\n", iUserId, m_pAllUserStatus[iUserId].cSendbuff
  60. );
  61. m_pAllUserStatus[iUserId].iUserStatus = SEND_OK;
  62. }
  63. }
  64. return isendsize;
  65. }
  66. int CEpollClient::RecvFromServer(int iUserId,char *pRecvBuff,int iBuffLen)
  67. {
  68. int irecvsize = -1;
  69. if(SEND_OK == m_pAllUserStatus[iUserId].iUserStatus)
  70. {
  71. irecvsize = recv(m_pAllUserStatus[iUserId].iSockFd, pRecvBuff, iBuffLen, 0);
  72. if(0 > irecvsize)
  73. {
  74. cout <<"[CEpollClient error]: iUserId: " << iUserId << "RecvFromServer, recv fail, reason is:"<<strerror(errn
  75. o)<<",errno is:"<<errno<<endl;
  76. }
  77. else if(0 == irecvsize)
  78. {
  79. cout <<"[warning:] iUserId: "<< iUserId << "RecvFromServer, STB收到数据为0,表示对方断开连接,irecvsize:"<<ire
  80. cvsize<<",iSockFd:"<< m_pAllUserStatus[iUserId].iSockFd << endl;
  81. }
  82. else
  83. {
  84. printf("Recv Server Msg Content:%s\n", pRecvBuff);
  85. m_pAllUserStatus[iUserId].iUserStatus = RECV_OK;
  86. }
  87. }
  88. return irecvsize;
  89. }
  90. bool CEpollClient::CloseUser(int iUserId)
  91. {
  92. close(m_pAllUserStatus[iUserId].iSockFd);
  93. m_pAllUserStatus[iUserId].iUserStatus = FREE;
  94. m_pAllUserStatus[iUserId].iSockFd = -1;
  95. return true;
  96. }
  97. int CEpollClient::RunFun()
  98. {
  99. int isocketfd = -1;
  100. for(int iuserid=0; iuserid<m_iUserCount; iuserid++)
  101. {
  102. struct epoll_event event;
  103. isocketfd = ConnectToServer(iuserid, m_ip, m_iPort);
  104. if(isocketfd < 0)
  105. cout <<"[CEpollClient error]: RunFun, connect fail" <<endl;
  106. m_iSockFd_UserId[isocketfd] = iuserid;//将用户ID和socketid关联起来
  107. event.data.fd = isocketfd;
  108. event.events = EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP;
  109. m_pAllUserStatus[iuserid].uEpollEvents = event.events;
  110. epoll_ctl(m_iEpollFd, EPOLL_CTL_ADD, event.data.fd, &event);
  111.   }
  112.   while(1)
  113.       {
  114.           struct epoll_event events[_MAX_SOCKFD_COUNT];
  115.           char buffer[1024];
  116.           memset(buffer,0,1024);
  117.           int nfds = epoll_wait(m_iEpollFd, events, _MAX_SOCKFD_COUNT, 100 );//等待epoll事件的产生
  118.           for (int ifd=0; ifd<nfds; ifd++)//处理所发生的所有事件
  119.           {
  120.               struct epoll_event event_nfds;
  121.               int iclientsockfd = events[ifd].data.fd;
  122.               cout << "events[ifd].data.fd: " << events[ifd].data.fd << endl;
  123.               int iuserid = m_iSockFd_UserId[iclientsockfd];//根据socketfd得到用户ID
  124.               if( events[ifd].events & EPOLLOUT )
  125.               {
  126.                   int iret = SendToServerData(iuserid);
  127.                   if( 0 < iret )
  128.                   {
  129.                       event_nfds.events = EPOLLIN|EPOLLERR|EPOLLHUP;
  130.                       event_nfds.data.fd = iclientsockfd;
  131.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, event_nfds.data.fd, &event_nfds);
  132.                   }
  133.                   else
  134.                   {
  135.                       cout <<"[CEpollClient error:] EpollWait, SendToServerData fail, send iret:"<<iret<<",iuserid:"<<iuser
  136.   id<<",fd:"<<events[ifd].data.fd<<endl;
  137.                       DelEpoll(events[ifd].data.fd);
  138.                       CloseUser(iuserid);
  139.                   }
  140.               }
  141.   else if( events[ifd].events & EPOLLIN )//监听到读事件,接收数据
  142.               {
  143.                   int ilen = RecvFromServer(iuserid, buffer, 1024);
  144.                   if(0 > ilen)
  145.                   {
  146.                       cout <<"[CEpollClient error]: RunFun, recv fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<<e
  147.   ndl;
  148.                       DelEpoll(events[ifd].data.fd);
  149.                       CloseUser(iuserid);
  150.                   }
  151.                   else if(0 == ilen)
  152.                   {
  153.                       cout <<"[CEpollClient warning:] server disconnect,ilen:"<<ilen<<",iuserid:"<<iuserid<<",fd:"<<events[
  154.   ifd].data.fd<<endl;
  155.                       DelEpoll(events[ifd].data.fd);
  156.                       CloseUser(iuserid);
  157.                   }
  158.                   else
  159.                   {
  160.                       m_iSockFd_UserId[iclientsockfd] = iuserid;//将socketfd和用户ID关联起来
  161.                       event_nfds.data.fd = iclientsockfd;
  162.                       event_nfds.events = EPOLLOUT|EPOLLERR|EPOLLHUP;
  163.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, event_nfds.data.fd, &event_nfds);
  164.                   }
  165.               }
  166.               else
  167.               {
  168.                   cout <<"[CEpollClient error:] other epoll error"<<endl;
  169.                   DelEpoll(events[ifd].data.fd);
  170.                   CloseUser(iuserid);
  171.               }
  172.           }
  173.   }
  174.   }
  175.   
  176.   bool CEpollClient::DelEpoll(int iSockFd)
  177.   {
  178.       bool bret = false;
  179.       struct epoll_event event_del;
  180.       if(0 < iSockFd)
  181.       {
  182.           event_del.data.fd = iSockFd;
  183.           event_del.events = 0;
  184.           if( 0 == epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del) )
  185.           {
  186.               bret = true;
  187.           }
  188.           else
  189.           {
  190.               cout <<"[SimulateStb error:] DelEpoll,epoll_ctl error,iSockFd:"<<iSockFd<<endl;
  191.           }
  192.           m_iSockFd_UserId[iSockFd] = -1;
  193.       }
  194.       else
  195.       {
  196.           bret = true;
  197.   
  198.       }
  199.       return bret;
  200.   }

main.cpp

[cpp] view plain copy
  1. #include "cepollclient.h"
  2. int main(int argc, char *argv[])
  3. {
  4. CEpollClient *pCEpollClient = new CEpollClient(2, "127.0.0.1", 8000);
  5. if(NULL == pCEpollClient)
  6. {
  7. cout<<"[epollclient error]:main init"<<"Init CEpollClient fail"<<endl;
  8. }
  9. pCEpollClient->RunFun();
  10. if(NULL != pCEpollClient)
  11. {
  12. delete pCEpollClient;
  13. pCEpollClient = NULL;
  14. }
  15. return 0;
  16. }

运行结果:

客户端:

服务端:

如是转载,请指明原出处:http://blog.csdn.net/feitianxuxue,谢谢合作!

处理大并发之一 对epoll的理解,epoll客户端服务端代码相关推荐

  1. 处理大并发之二 对epoll的理解,epoll客户端服务端代码

    http://blog.csdn.net/wzjking0929/article/details/51838370 序言: 该博客是一系列的博客,首先从最基础的epoll说起,然后研究libevent ...

  2. 处理大并发之一 对异步非阻塞的理解

    处理大并发之一 对异步非阻塞的理解 在研究nginx和node.js的时候常会遇到异步.非阻塞等,之前自己也经常使用epoll,对其同步与阻塞,异步与非阻塞有了一定的认识,现对参考资料总结下. 首先讨 ...

  3. 处理大并发之四 libevent demo详细分析(对比epoll)

    处理大并发之四 libevent demo详细分析(对比epoll) libevent默认情况下是单线程,每个线程有且仅有一个event_base,对应一个struct event_base结构体,以 ...

  4. 处理大并发之四 libevent demo详细分析(对比epoll)

    处理大并发之四 libevent demo详细分析(对比epoll) libevent默认情况下是单线程,每个线程有且仅有一个event_base,对应一个struct event_base结构体,以 ...

  5. 关于nginx/lighttpd epoll高并发以及apache为何不采用epoll的的疑惑 不指定

    关于nginx/lighttpd epoll高并发以及apache为何不采用epoll的的疑惑 - 向东博客 专注WEB应用 构架之美 --- 构架之美,在于尽态极妍 | 应用之美,在于药到病除 - ...

  6. 如何实现高容量大并发数据库服务 | 数据库分布式架构设计

    袋鼠学院和优云.阿里云联合举办的沙龙结束之后,总是有小伙伴们来问PPT内容,想要进一步了解Topic内容.(哦,对了对了,竟然还有小伙伴专门冲着袋鼠云去听沙龙,感动cry~~) 千呼万唤,忙成狗的袋鼠 ...

  7. 大并发服务器架构 大型网站架构演变

    服务器的三条要求: 高性能:对于大量请求,及时快速的响应 高可用:7*24 不间断,出现故障自动转移,这叫fail over(故障转移) 伸缩性:使用跨机器的通信(TCP) 另外任何网络系统结构都可以 ...

  8. 打造基于大并发通信技术及大数据技术的O2O系统

    2019独角兽企业重金招聘Python工程师标准>>> 本文来自于个推CTO叶新江在2015Qcon的分享整理. 截止2015年6月,个推SDK累计接入总用户数达50亿 (其中海外近 ...

  9. Java多线程系列(七):并发容器的原理,7大并发容器详解、及使用场景

    之前谈过高并发编程系列: 高并发编程系列:4种常用Java线程锁的特点,性能比较.使用场景 高并发编程系列:CountDownLatch.Semaphore等4大并发工具类详解 高并发编程系列:4大J ...

最新文章

  1. Co-occurrence网络图在R中的实现
  2. mysql 开启远程访问
  3. 小程序对象不去重合并
  4. After paper reading.......
  5. 欢迎来到OpenGL的世界
  6. 数据库面试题【十、【非关系型数据库】和【关系型数据库】的【区别】与【优势比较】】
  7. 具有GlassFish和一致性的高性能JPA –第2部分
  8. java 二叉树运用场景_java二叉树有什么作用?有哪些实际应用?
  9. python如何合并txt文件_Python实现将目录中TXT合并成一个大TXT文件的方法
  10. uni-app引入阿里Icon 图标方式(CustomIcon 扩展自定义图标库)
  11. 20200404 时间飞逝 青春不在
  12. http://blog.csdn.net/LANGXINLEN/article/details/50421988
  13. 【苹果群发推iMessage苹果推】位置推在(delegate) 收到connectionDidFinishLoading
  14. 物联网入门教程【上】
  15. 朗强:紧跟时代步伐!HDMI分布式矩阵可以通过手机来控制!
  16. Three.js - 着色器材质(二十七)
  17. 推荐给大家一个下载软件的好网站—MSDN I Tell you
  18. 【Code】代码答案错误怎么办?三种方法教你如何查错
  19. 搜狗输入法如何开启中文简体与繁体切换快捷键
  20. 【汇正财经】1.9日盘面回顾和行情解析

热门文章

  1. 《深入理解Java虚拟机》读书笔记3--垃圾回收算法
  2. C++服务器设计(七):聊天系统服务端实现
  3. So easy Webservice 1.Socket建设web服务
  4. Bundle Identifier
  5. UIControl事件
  6. 公司培训文档-JavaScript[对象.属性]集锦
  7. 教师计算机网络培训工作总结,教师培训工作的自我总结
  8. java dubbo 方案,Missing artifact com.alibaba:dubbo:jar:2.8.4 dubbo解决方案
  9. 使用inetaddress测试目标可达性_PDPS软件机器人虚拟仿真:Smart Place功能介绍与使用方法...
  10. android 日期国际化,Flutter 日期时间选择类控件及国际化