c10k 的IO密集解决方案

  • 0. C10k的一些概念
  • 1. epoll很忙
  • 2.对上篇博文提到问题的改进。
  • 3.特别注意的问题——生产者消费者问题
  • 4.改进方法的实现(代码)

0. C10k的一些概念

http://www.kegel.com/c10k.html#frameworks
译文地址:
1.https://blog.csdn.net/heiyeshuwu/article/details/5642592
2.https://www.cnblogs.com/zxh1210603696/p/3399354.htm

以上文章已经非常详细的解释了C10k的问题。

C10k就是通过服务器程序的编写,让服务器合理的使用硬件资源,让服务器具备同时处理10000个客户端的访问和通讯。

以下的讲解都是建立在阅读过上面文章之上。
本博文解决C10k的方法是 reactor + pthread pool,也就是epoll + 线程池。
可以简单的说,就是创建线程帮助epoll处理读写,让epoll尽可能的只处理IO事件。
C10k要解决的问题是IO密集型的客户端访问 和 计算密集型的同时通信问题。目前我写的代码只通过了IO密集型的测试,由于计算密集型对硬件控制要得当所以暂时先缓一缓,之后会有更新。

1. epoll很忙

这里以回射服务器为例(之后给出的代码也是回射服务器)。

Epoll安排好接待客户的listenfd(socket返回值),然后不断的询问listenfd是否有客户connect发送syn。
很久以后…
· 客户端connect:客户发送syn,发起三次握手的第一步。
· 与此同时,Epoll发现自己管理的listenfd收到了syn,就告诉服务器,于是服务器运行accpet,服务器等待的accept收到syn后,通知客户端,完成三次握手第二步。
· 客户端执行完connect后,完成三次握手建立三次链接。

客户到来(链接成功)后,服务器又把客户扔给epoll管理。
Epoll接管了客户后,不断的询问客户,是不是有数据送过来或者送出去,如果有数据送来epoll,通知服务器,并告知客户的一些必要信息。如果有数据要送出去,epoll也要通知服务器。

所以epoll很忙,为了让他更高效的工作,就不要打断他。在不打断他的情况下,epoll可以很好的处理IO密集。

如何不打断他?就是服务器在读写数据的时候不要在epoll的线程里进行。因为读写对阻塞这个线程。

2.对上篇博文提到问题的改进。

上篇写的是 epoll+线程池,最大的问题就是结构混乱和线程混乱。
所以我对数据处理的过程进行了重构:

简化后的程序流程图为:

运行时时序图为:

3.特别注意的问题——生产者消费者问题

应为这里设计了多个共享链表,如evens,读工作链表,写工作链表,如果对线程控制不好,很容易出现段错误和资源竞争错误。

  • Mian线程,生产者,对events和readList的生产
  • Read线程,消费者,生产者,对对events和readList的消费,以及对writeList的生产
  • Write线程,消费者,对writeList的消费。
    要合理的使用锁和 信号/等待 机制,确保线程的安全。

4.改进方法的实现(代码)

主函数:

#include "myepollpthreadpoolserver.h"typedef std::vector<struct epoll_event> EpollList;
EpollList events(16);
int epollfd;extern pthread_mutex_t lock;
extern pthread_cond_t has_write;
extern pthread_cond_t has_read;
int main()
{int listenfd;listenfd = Socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); // fei zu se IO fu yongstruct sockaddr_in serveraddr;bzero(&serveraddr, sizeof(serveraddr));serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);serveraddr.sin_port = htons(8000);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));Bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));Listen(listenfd, 20);// clinet init datestruct sockaddr_in clientaddr;socklen_t clientlen;int connfd;//epoll//typedef std::vector<struct epoll_event> EpollList;     //int epollfd;epollfd = epoll_create1(EPOLL_CLOEXEC);//Creates a handle to epoll, the size of which tells the kernel how many listeners there are./*ET*/struct epoll_event epfd;epfd.data.fd = listenfd;epfd.events = EPOLLIN | EPOLLET ;epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &epfd);//EpollList events(16);//You can listen for 16 at firstint nready;make_read_worker(1);make_write_worker(3);while(1){  nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1);if (nready == -1){if(errno == EINTR) continue;perror("epoll_wait");}if(nready == 0) continue;if ((size_t)nready == events.size()){events.resize(events.size() * 2);}for(int  i=0; i < nready; ++i){if (events[i].data.fd == listenfd){clientlen = sizeof(clientaddr);connfd = Accept4(listenfd, (struct sockaddr*)&clientaddr, &clientlen, SOCK_NONBLOCK | SOCK_CLOEXEC);// fei zu se IO fu yongstd::cout << connfd << "is come!" << std::endl;/*ET*/epfd.data.fd = connfd;epfd.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &epfd);}else if (events[i].events & EPOLLIN){set_readList(i); }/*ET*///if (events[i].events & EPOLLOUT)//{//std::cout << "write dadt" << std::endl;//   pthread_cond_signal(&has_write);//}}}destroy_pthread();return 0;
}

线程:

#ifndef PTHREAD_POOL_H
#define PTHREAD_POOL_H#include <iostream>
#include <stdlib.h>
#include <vector>
#include <pthread.h>
#include <string.h>
#include <sys/epoll.h>
#include <string>//using namespace std;
typedef struct WorkNode{int connfd;char wBuf[100];
}WorkNode;void set_readList(int);
int get_readList();void set_workList(int, char *);
WorkNode get_workList();int make_read_worker(int pthreadNum);
void *read_worker(void *arg);
int make_write_worker(int pthreadNum);
void *write_worker(void *arg);int destroy_pthread();int size_buf(char *buf);
/**********************************************/
//int make_pthread(int pthreadNum);/**/
//void *pthread_work(void *arg);/**/
//int destroy_pthread();/**/
//bool have_work();/**///int set_work(int i, int connfd);
//ConnFdNode get_work();//void debug_pool();#endif //PTHREAD_POOL_H
#include "pthread_pool.h"extern  std::vector<struct epoll_event> events;
extern int epollfd;typedef std::vector<struct WorkNode> WorkList;
WorkList waitlist;//work listtypedef std::vector<int> ReadList;
ReadList readlist; //read list,[events id]pthread_cond_t has_read = PTHREAD_COND_INITIALIZER;              //条件变量初始化
pthread_cond_t has_write = PTHREAD_COND_INITIALIZER;              //条件变量初始化
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;                   //互斥锁初始化
pthread_t nrTid[4];                                                //初始化,线程描述符,MAX为最大线程创建数
pthread_t nwTid[4];
int prhread_read_num = 0;
int prhread_write_num = 0;void set_readList(int i)
{pthread_mutex_lock(&lock);readlist.push_back(i);pthread_cond_signal(&has_read);pthread_mutex_unlock(&lock);
}
int get_readList()
{int eventId;eventId = readlist.front();readlist.erase(readlist.begin());return eventId;
}void set_workList(int connfd, char* wBuf)
{WorkNode writeWork;writeWork.connfd = connfd;strcpy(writeWork.wBuf,wBuf);// pthread_mutex_lock(&lock);waitlist.push_back(writeWork);//pthread_mutex_unlock(&lock);
}WorkNode get_workList()
{WorkNode writeWork;writeWork.connfd = waitlist.front().connfd;strcpy(writeWork.wBuf ,waitlist.front().wBuf);waitlist.erase(waitlist.begin());return writeWork;
}int make_read_worker(int pthreadNum)
{int err[pthreadNum], error;prhread_read_num = pthreadNum;for(int i=0; i<pthreadNum; i++){err[i] = pthread_create(&nrTid[i], NULL, read_worker, NULL);if(err[i] != 0){std::cout << "make pthread error :" << err[i] << std::endl;exit(1);}error += err[i];std::cout << "\033[32mNO.\033[0m"<< i+1 << "\033[32m, pthread creation successful!\033[0m" << std::endl;     }return error;
}
void *read_worker(void *arg)
{pthread_t tid;tid = pthread_self(); //get pthread idint eventsID;             //iint connfd, n, nread;struct epoll_event epfd;char buf[100];std::cout << "????  I am READ worker ????" << tid << std::endl;while(1){n = 0;pthread_mutex_lock(&lock);pthread_cond_wait(&has_read, &lock);eventsID = get_readList();connfd = events[eventsID].data.fd;while((nread = read(connfd, buf, 100)) > 0) //read to over{n += nread;}if (n > 0){    std::cout << tid <<"::"<< connfd <<" Date: ["<< buf <<"]" << "events "<< eventsID << std::endl;   epfd.data.fd = connfd;epfd.events = events[eventsID].events | EPOLLOUT;if(epoll_ctl(epollfd, EPOLL_CTL_MOD, connfd, &epfd) == -1){std::cout << "epoll_ctl return -1"<< std::endl;exit(1);}set_workList(connfd, buf);pthread_cond_signal(&has_write);}else if (nread == 0){std::cout << connfd << "is go" << std::endl;close(connfd);epfd = events[eventsID];epoll_ctl(epollfd, EPOLL_CTL_DEL, connfd, &epfd);}pthread_mutex_unlock(&lock);}
}
int make_write_worker(int pthreadNum)
{int err[pthreadNum], error;prhread_write_num = pthreadNum;for(int i=0; i<pthreadNum; i++){err[i] = pthread_create(&nwTid[i], NULL, write_worker, NULL);if(err[i] != 0){std::cout << "make pthread error :" << err[i] << std::endl;exit(1);}error += err[i];std::cout << "\033[32mNO.\033[0m"<< i+1 << "\033[32m, pthread creation successful!\033[0m" << std::endl;     }return error;
}
void *write_worker(void *arg)
{pthread_t tid;tid = pthread_self(); //get pthread idWorkNode wJob;int connfd, n, nwrite;char buf[100];std::cout << "????  I am WRITE worker ????" << tid<< std::endl;while(1){    pthread_mutex_lock(&lock);pthread_cond_wait(&has_write, &lock);wJob = get_workList();connfd = wJob.connfd;strcpy(buf, wJob.wBuf);n = size_buf(buf);while(n > 0)//write ot over{nwrite = write(connfd, buf, n);n -= nwrite;}pthread_mutex_unlock(&lock);}
}int destroy_pthread()
{for(int i=0; i< prhread_read_num; ++i){pthread_join(nrTid[i], NULL);}for(int i=0; i< prhread_write_num; ++i){pthread_join(nwTid[i], NULL);}return 0;
}int size_buf(char *buf)
{int i;for ( i = 0; buf[i] != '\0'; i++);return i+1;
}

以上的代码不是成品,只是我实现了我改进想法,博文依然在跟新中。

EPOLL使用的简单总结4——epoll+线程池解决c10k问题相关推荐

  1. 简单实现Linux下线程池

    最近在Linux下使用mysql时有时会报查询异常,看网上解决方案是多次并发使用,通过gdb调试也找到问题,主要是上次查询结果集未释放,最终导致如此. 大佬说根本解决方案还是线程池,就去看了线程池的一 ...

  2. 从3个简单的问题了解线程池的使用

    目录 1.为什么最大线程数没满,但是runnable无法立即执行或者无法执行? 2.为什么禁止使用 Executors创建线程池? 3.不能使用Executors创建线程池,那该怎么使用线程池? 前言 ...

  3. java阻塞线程池_线程池解决阻塞方法

    一.序言 当我们需要使用线程的时候,我们可以新建一个线程,然后显式调用线程的start()方法,这样实现起来非常简便,但在某些场景下存在缺陷:如果需要同时执行多个任务(即并发的线程数量很多),频繁地创 ...

  4. SpringBoot——@Scheduled的自定义周期性线程池解决任务延时执行问题

    关注微信公众号:CodingTechWork,一起学习进步. 问题   在使用Spring中的@Scheduled注解设置定时任务时,遇到这样2个问题: 定时任务未按时执行,现象是延后了一段时间才执行 ...

  5. 网络io,磁盘io,线程池解决思想

    网络Io,发送一次网络请求一般1毫秒到几百毫秒. 磁盘io,一般5毫秒到几百毫秒.两种io每次时间都不固定,即使是取相同的数据.网络Io可能有网络波动导致. io密集型.cpu不断的发起请求,每次都需 ...

  6. java基础:简单实现线程池

    先上原理图:为了更好的在手机上显示,我重新把图画了一遍 上代码之前,要先补充一下线程池构造的核心几个点 线程池里的核心线程数与最大线程数 线程池里真正工作的线程worker 线程池里用来存取任务的队列 ...

  7. 分页缓冲池如何关闭_线程池没你想的那么简单

    前言 原以为线程池还挺简单的(平时常用,也分析过原理),这次是想自己动手写一个线程池来更加深入的了解它:但在动手写的过程中落地到细节时发现并没想的那么容易.结合源码对比后确实不得不佩服 Doug Le ...

  8. 一个简单的linux线程池

    线程池:简单地说,线程池 就是预先创建好一批线程,方便.快速地处理收到的业务.比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高. 在linux中,使用的 ...

  9. 线程池,远没你想象的那么简单

    本文转载自微信公众号:crossoverJie 前言 原以为线程池还挺简单的(平时常用,也分析过原理),这次是想自己动手写一个线程池来更加深入的了解它:但在动手写的过程中落地到细节时发现并没想的那么容 ...

  10. mongodb线程池_常用高并发网络线程模型设计及MongoDB线程模型优化实践

    服务端通常需要支持高并发业务访问,如何设计优秀的服务端网络IO工作线程/进程模型对业务的高并发访问需求起着至关重要的核心作用. 本文总结了了不同场景下的多种网络IO线程/进程模型,并给出了各种模型的优 ...

最新文章

  1. php和python哪个工资高-学python和php哪个前景好
  2. 在VS2003中以ClassLibrary工程的方式管理Web工程.
  3. html5简介的文本框,HTML5实战与剖析之表单——文本框脚本
  4. javascript中的字符串编码、字符串方法详解
  5. Linux 6.4 partprobe出现warning问题
  6. mysql可视化创建外键说明_关于使用可视化图形工具navicat for mysql来创建外键的步骤...
  7. BZOJ 2560(子集DP+容斥原理)
  8. 期末Linux课程设计(5)—.netrc.handin部分
  9. python烧录单片机_mac实现烧写51单片机
  10. 〖Python 数据库开发实战 - MySQL篇㉕〗- 数据更新操作 - UPDATE 语句
  11. 方差标准差,均方误差均方根误差,平均绝对误差
  12. 每天一个小技巧(新建桌面)
  13. 容斥原理和概率与数学期望
  14. java.lang.ClassCastException: de.odysseus.el.ExpressionFactoryImpl cannot be cast to javax.el.Expres
  15. YOLOv3源码解析2-数据预处理Dataset()
  16. 华为语音解锁设置_华为手机该怎么实现语音翻译?其实超级简单,这里教你
  17. [信息论与编码]离散信道及信道容量(三)
  18. 设计美好的服务器(6)--SEDA架构笔记
  19. tcl基本语法:中括号[ ]、大括号{ }、双引号“ ”
  20. 03基础自绘-13滑动选择-tumbler

热门文章

  1. 2012中国云实践之企业总评榜
  2. 狮子鱼小程序独立版安装配置教程
  3. 反恐精英online单机版有各种武器
  4. 汇编大作业(课程设计):简易英英词典
  5. linux 鼠标光标由箭头变成十字形恢复方法
  6. 红帽子 linux 声卡驱动,RedHat Linux系统下安装ALSA驱动的方法
  7. 翻译www.djangobook.com之第一章:Django介绍
  8. 计算机网络专业运动会入场式,高校运动会方阵入场式花样百出
  9. xmpp即时通讯协议的特性---长处和缺点!
  10. PB:玉米气生根分泌物支持的高效生物固氮