网络编程:C10K问题而引出的reactor模型
C10问题
随着互联网的蓬勃发展,一个非常重要的问题摆在计算机工业界面前。这个问题就是如何使用最低的成本满足高性能和高并发的需求。这个问题在过去可能不是一个严重的问题,但是在2000年前后,互联网用户的人数井喷,如果说之前单机服务的用户数量还保持在一个比较低的水平,比如说只有上百个用户,那么在互联网逐渐普及的情况下,服务于成千上万的用户就将是非常普遍的情形,在这种情形下,如何还按照之前单机的玩法,成本就将超过人们想象,只有超级有钱的大玩家才可以继续下去。
于是,C10K问题应运而生。C10问题是这样的:如何在一台物理机上同时服务10000个用户?这里的C表示并发,10K等于10000。得益于操作系统、编程语言的发展,在现在的条件下,普通用户使用java netty、libevent等框架或者库就可以轻松写出支持并发超过1000的服务器程序,甚至与经过优化之后可以达到十万、乃至百万的并发,但在二十年前,图谱C10K问题可费了不少的心思,是一个了不起的突破。
操作系统层面
C10问题本质上是一个操作系统问题,要在一台主机上同时支持1万个连接,意味着什么呢?需要考虑哪些方面
文件句柄
首先,我们知道每个客户连接都代表一个文件描述符,一旦文件描述符不够用了,新的连接就会被放弃,产生如下错误:
Socket/File:Can't open so many files
在linux下,单个进程打开的文件句柄数是有限制的,没有经过修改的值一般是1024
$ulimit -n
1024
这意味着最多可以服务的连接数上限只能是 1024。不过,我们可以对这个值进行修改,比如用 root 权限修改 /etc/sysctl.conf 文件,使得系统可用支持 10000 个描述符上限。
fs.file-max = 10000
net.ipv4.ip_conntrack_max = 10000
net.ipv4.netfilter.ip_conntrack_max = 10000
系统内存
每个TCP连接占用的资源可不止一个连接套接字这么简单,每个TCP连接都需要占用一定的发送缓冲区和接收缓冲区。
而每个系统上的缓冲区是一定的,如下显示了在 Linux 4.4.0 下发送缓冲区和接收缓冲区的值。
$cat /proc/sys/net/ipv4/tcp_wmem
4096 16384 4194304
$ cat /proc/sys/net/ipv4/tcp_rmem
4096 87380 6291456
这三个值分别表示了最小分配值、默认分配值和最大分配值。按照默认分配值计算,一万个连接需要消耗的内存为:
发送缓冲区: 16384*10000/8 = 20M bytes
接收缓冲区: 87380*10000/8 = 110M bytes
当然,我们的应用程序本身也需要一定的缓冲区来进行数据的收发,为了方便,我们假设每个连接需要 128K 的缓冲区,那么 1 万个链接就需要大约 1.2G 的应用层缓冲。
这样,我们可以得出大致的结论,支持1万个并发连接,内存并不是一个巨大的瓶颈。
网络带宽
假设1万个连接,每个连接每秒传输大约1KB的数据,那么带宽需要100001KB/s8=80Mbps。这在今天的动辄万兆网卡的时代简直小菜一碟。
C10问题怎么解决
通过上面我们对操作系统层面的分析,可以得出一个结论,在系统资源层面,C10K问题是可以解决的
但是,能解决并不意味着可以很好的解决。我们知道,在网络编程中,涉及到频繁的用户态-内核态数据拷贝,设计不够好的程序可能在低并发的情况下工作良好,一旦到了高并发情形,其性能可能出现指数级别的损失。
举一个例子,如果没有考虑好C10K问题,一个基于select的经典程序可能在一台服务器上可以很好的处理1000的并发用户,但是在性能2倍的服务器上,却往往并不能很好的处理2000的并发用户。
要想解决C10K问题,就需要从两个层面上来考虑:
- 第一个层面,应用程序如何和操作系统配合,感知IO事件的发生,并调度处理在上万个套接字上的IO操作?
- 第二个层面,应用程序如何分配进程、线程资源来服务上万个连接?
这两个层面的组合就形成了解决C10K问题的几种解决方案。如下
阻塞IO + 进程
这种方式最为简单直接:
- 每个连接通过fork派生一个子进程进行处理
- 因为一个独立的子进程负责处理了该连接所有的IO,所以即使是阻塞IO,多个连接之间也不会相互影响。
这种方法虽然简单,但是效率不高,扩展性差,资源占用率高。
下面的伪代码描述了使用阻塞 I/O,为每个连接 fork 一个进程的做法:
do{accept connectionsfork for conneced connection fdprocess_run(fd)
}
前置知识
父进程想要派生出一个子进程,只需要调用fork就可以:
pid_t fork(void)
返回:在子进程中为 0,在父进程中为子进程 ID,若出错则为 -1
如果你是第一次使用这个函数,你会觉得难以理解的地方在于,虽然我们的程序调用 fork 一次,它却在父、子进程里各返回一次。在调用该函数的进程(即为父进程)中返回的是新派生的进程 ID 号,在子进程中返回的值为 0。想要知道当前执行的进程到底是父进程,还是子进程,只能通过返回值来进行判断。
fork函数实现的时候,实际上会把当前父进程的所有相关值都克隆一份,包括地址空间、打开的文件描述符、程序计数器(程序从哪里开始执行)等,就连执行代码中也会拷贝一份。于是,就可以这么编写:
if(fork() == 0){do_child_process(); // 子进程执行代码
}else{do_parent_process(); // 父进程执行代码
}
当一个子进程退出时,系统内核还保留了该进程的若干信息,比如退出状态。这样的进程如何不回收,就会变成僵尸进程。在linux下,这样的“僵尸”进程会被挂到进程号为1的init进程上。所以,由父进程派生出来的子进程,也必须由父进程负责回收,否则子进程就会变成僵尸进程。僵尸进程会占用不必要的内存空间,如果数量多到了一定的数量级,就会耗尽我们的系统资源。
有两种方式可以在子进程退出后回收资源,分别是调用wait和waitpid函数。
pid_t wait(int *statloc);
pid_t waitpid(pid_t pid, int *statloc, int options);
函数 wait 和 waitpid 都可以返回两个值,一个是函数返回值,表示已终止子进程的进程 ID 号,另一个则是通过 statloc 指针返回子进程终止的实际状态。这个状态可能的值为正常终止、被信号杀死、作业控制停止等。
如果没有已终止的子进程,而是有一个或者多个子进程在正常运行,那么wait将阻塞,直到第一个子进程终止。
waitpid 可以认为是 wait 函数的升级版,它的参数更多,提供的控制权也更多。pid 参数允许我们指定任意想等待终止的进程 ID,值 -1 表示等待第一个终止的子进程。options 参数给了我们更多的控制选项。
一个wait不足够阻止僵尸进程,如果n个子进程同时停止,那么会同时发出n个SIGCHILD信号给父进程,但是信号处理函数执行一次,因为信号一般是不排队的,多个SIGCHILD只会发送一次给父进程。所以需要用循环waitpid处理,获取所有终止子进程状态。
处理子进程退出的方式一般是注册一个信号处理函数,捕捉信号 SIGCHILD 信号,然后再在信号处理函数里调用 waitpid 函数来完成子进程资源的回收。SIGCHLD 是子进程退出或者中断时由内核向父进程发出的信号,默认这个信号是忽略的。所以,如果想在子进程退出时能回收它,需要像下面一样,注册一个 SIGCHOLD 函数。
signal(SIGCHLD, sigchld_handler);
图示模型
为了说明使用阻塞IO和进程模式,我们假设有两个客户端,服务器初始监听在套接字listened_fd上。当第一个客户端发起连接请求,连接建立后产生出连接套接字,此时,父进程派生出一个子进程:
- 在子进程中,使用连接套接字与客户端通信,因此子进程不需要关心监听套机字,只需要关心连接套接字
- 父进程中则相反,将客户服务交给子进程来处理,因此父进程不需要关心连接套接字,只需要关心监听套接字
下图描述了从连接请求到连接建立,父进程派生子进程为客户服务。
假设父进程之后又接收了新的连接请求,从 accept 调用返回新的已连接套接字,父进程又派生出另一个子进程,这个子进程用第二个已连接套接字为客户端服务。
现在,服务器端的父进程继续监听在套接字上,等待新的客户连接到来;两个子进程分别使用两个不同的连接套接字为两个客户服务。
服务端代码:每个连接一个协程处理
#define MAX_LINE 4096char rot13_char(char c) {if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}void child_run(int fd) {char outbuf[MAX_LINE + 1];size_t outbuf_used = 0;ssize_t result;while (1) {char ch;result = recv(fd, &ch, 1, 0);if (result == 0) {break;} else if (result == -1) {perror("read");break;}if (outbuf_used < sizeof(outbuf)) {outbuf[outbuf_used++] = rot13_char(ch);}if (ch == '\n') {send(fd, outbuf, outbuf_used, 0);outbuf_used = 0;continue;}}
}void sigchld_handler(int sig) {while (waitpid(-1, 0, WNOHANG) > 0); // 在一个循环体内调用了 waitpid 函数,以便回收所有已终止的子进程。这里选项 WNOHANG 用来告诉内核,即使还有未终止的子进程也不要阻塞在 waitpid 上。注意这里不可以使用 wait,因为 wait 函数在有未终止子进程的情况下,没有办法不阻塞。return;
}int main(int c, char **v) {int listener_fd = tcp_server_listen(SERV_PORT);signal(SIGCHLD, sigchld_handler); // 注册了一个信号处理函数,用来回收子进程资源while (1) {struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);if (fd < 0) {error(1, errno, "accept failed");exit(1);}if (fork() == 0) {close(listener_fd); // 子进程不需要关心监听套接字,故而在这里关闭掉监听套接字 listen_fdchild_run(fd); // 使用已连接套接字 fd 来进行数据读写exit(0); // 处理完之后退出子进程} else {close(fd); // 父进程不需要关心连接套接字}}return 0;
}
实验
我们启动该服务器,监听在对应的端口 43211 上。
./fork01
再启动两个 telnet 客户端,连接到 43211 端口,每次通过标准输入和服务器端传输一些数据,我们看到,服务器和客户端的交互正常。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
afasfa
nsnfsn
]
telnet> quit
Connection closed.
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
agasgasg
ntnftnft
]
telnet> quit
Connection closed.
客户端退出,服务器端也在正常工作,此时如果再通过 telnet 建立新的连接,客户端和服务器端的数据传输也会正常进行。
至此,我们构建了一个完整的服务器端程序,可以并发处理多个不同的客户连接,互不干扰。
小结
使用阻塞IO和进程模型,为每一个连接创建一个独立的子进程来进行服务,是一个非常简单有效的实现方式,这种方式可能很难足以满足高性能程序的需求,但好处是实现简单。在实现这样的程序时,我们需要注意两点:
- 要注意对套接字的关闭梳理;
- 要注意对子进程进行回收,避免产生不必要的僵尸进程
- 避免僵尸进程,两种方法:1 忽略SIGCHLD, 2 调用wait或waitpid
- 应该说 忽略SIGCHLD,最省事吧
阻塞IO + 进程
进程模型占用的资源太大,幸运的是,还有一种轻量级的资源模型,这就是线程。
通过为每个连接调用pthread_create创建一个单独的线程,也可以达到上面使用进程的效果
do{accept connectionspthread_create for conneced connection fdthread_run(fd)
}while(true)
因为线程的创建是比较消耗资源的,而且不是每个连接在每个时刻都需要服务,因此,我们可以预先通过创建一个线程池,并在多个连接中复用线程池来获得某种效率上的提升
create thread pool
do{accept connectionsget connection fdpush_queue(fd)
}while(true)
服务端代码:每个连接一个线程处理
void thread_run(void *arg) {pthread_detach(pthread_self()); // 将子线程转变为分离的,也就意味着子线程独自负责线程资源回收。int fd = (int) arg;loop_echo(fd);
}int main(int c, char **v) {int listener_fd = tcp_server_listen(SERV_PORT);pthread_t tid;while (1) {struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen); //阻塞调用在 accept 上,一旦有新连接建立,阻塞调用返回,调用 pthread_create 创建一个子线程来处理这个连接。if (fd < 0) {error(1, errno, "accept failed");} else {pthread_create(&tid, NULL, &thread_run, (void *) fd);}}return 0;
}
关于第二行pthread_detach(pthread_self())
的说明:
- 一个线程的重要属性是可结合的,或者是分离的。一个可结合的线程是能够被其他线程杀死和回收资源的;而一个分离的线程不能被其他线程杀死或者回收资源
- 在高并发例子中,每个连接都由一个线程单独处理,在这种情况下,服务器程序并不需要对每个子线程进行终止,这样的话,每个子进程就可以在入口函数开始的地方,把自己设置为分离的,这样就能够在它终止之后自动回收相关的线程资源了
- 也就是说调度
pthread_detach(pthread_self())
之后,子线程独自负责线程资源回收
char rot13_char(char c) {if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}void loop_echo(int fd) {char outbuf[MAX_LINE + 1];size_t outbuf_used = 0;ssize_t result;while (1) {char ch;result = recv(fd, &ch, 1, 0);// 断开连接或者出错if (result == 0) {break;} else if (result == -1) {error(1, errno, "read error");break;}if (outbuf_used < sizeof(outbuf)) {outbuf[outbuf_used++] = rot13_char(ch);}if (ch == '\n') {send(fd, outbuf, outbuf_used, 0);outbuf_used = 0;continue;}}
}
改进:构建线程池处理多个连接
上面的服务器程序虽然可以正常工作,不过它有一个缺点,那就是如何并发连接过多,就会引起线程的频繁创建和销毁。虽然线程切换的上下文开销不大,但是线程创建和销毁的开销确是不小的。
能不能对这个程序进行一些优化呢?
我们可以使用预创建线程池的方式来进行优化。在服务端启动时,可以先按照固定大小预创建出多个线程,当有新连接建立时,往连接字队列里放置这个新连接描述字,线程池里的线程负责从连接字队列中取出连接描述符进行处理。
这个程序的关键是连接字队列的设计,因为这里既有往这个队列里放置描述符的操作,也有从这个队列里取出描述符的操作。
对此,需要引入两个重要的概念,一个是锁mutex,一个是条件变量condition
// 定义一个队列
typedef struct {int number; // 队列里的描述字最大个数int *fd; // 这是一个数组指针int front; // 当前队列的头位置int rear; // 当前队列的尾位置pthread_mutex_t mutex; // 锁pthread_cond_t cond; // 条件变量
} block_queue;// 初始化队列
void block_queue_init(block_queue *blockQueue, int number) {blockQueue->number = number;blockQueue->fd = calloc(number, sizeof(int));blockQueue->front = blockQueue->rear = 0;pthread_mutex_init(&blockQueue->mutex, NULL);pthread_cond_init(&blockQueue->cond, NULL);
}// 往队列里放置一个描述字 fd
void block_queue_push(block_queue *blockQueue, int fd) {// 一定要先加锁,因为有多个线程需要读写队列pthread_mutex_lock(&blockQueue->mutex);// 将描述字放到队列尾的位置blockQueue->fd[blockQueue->rear] = fd;// 如果已经到最后,重置尾的位置if (++blockQueue->rear == blockQueue->number) {blockQueue->rear = 0;}printf("push fd %d", fd);// 通知其他等待读的线程,有新的连接字等待处理pthread_cond_signal(&blockQueue->cond);// 解锁pthread_mutex_unlock(&blockQueue->mutex);
}// 从队列里读出描述字进行处理
int block_queue_pop(block_queue *blockQueue) {// 加锁pthread_mutex_lock(&blockQueue->mutex);// 判断队列里没有新的连接字可以处理,就一直条件等待,直到有新的连接字入队列while (blockQueue->front == blockQueue->rear)pthread_cond_wait(&blockQueue->cond, &blockQueue->mutex);// 取出队列头的连接字int fd = blockQueue->fd[blockQueue->front];// 如果已经到最后,重置头的位置if (++blockQueue->front == blockQueue->number) {blockQueue->front = 0;}printf("pop fd %d", fd);// 解锁pthread_mutex_unlock(&blockQueue->mutex);// 返回连接字return fd;
}
服务端的程序如下:
void thread_run(void *arg) {pthread_t tid = pthread_self();pthread_detach(tid);block_queue *blockQueue = (block_queue *) arg;while (1) {int fd = block_queue_pop(blockQueue);printf("get fd in thread, fd==%d, tid == %d", fd, tid);loop_echo(fd);}
}int main(int c, char **v) {int listener_fd = tcp_server_listen(SERV_PORT);block_queue blockQueue;block_queue_init(&blockQueue, BLOCK_QUEUE_SIZE);thread_array = calloc(THREAD_NUMBER, sizeof(Thread));int i;for (i = 0; i < THREAD_NUMBER; i++) {pthread_create(&(thread_array[i].thread_tid), NULL, &thread_run, (void *) &blockQueue);}while (1) {struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);if (fd < 0) {error(1, errno, "accept failed");} else {block_queue_push(&blockQueue, fd);}}return 0;
}
和前面的程序相比,线程的创建和销毁的开销大大降低,但是因为线程池大小固定,又因为使用了阻塞套接字,肯定会出现有连接得不到及时服务的场景。这个问题的解决就需要用到【多路IO复用+线程】,仅仅使用阻塞IO模型和线程是没有办法达到极致的高并发处理能力。
问题:队列里没有可用的位置了,想想看,如何对这种情况进行优化?
回答:没位置可用可以选择丢弃,取出来直接关闭,等待对方重连,或者先判断队列是否有位置,没位置的话直接就不取出套接字,让它留在内核队列中,让内核处理。
非阻塞IO + readiness notification + 单线程
应用程序其实可以通过采用轮询的方式来对保存的套接字集合就那些挨个询问,从而找出需要进行IO处理的套接字。伪代码如下:
for fd in fdset{if(is_readable(fd) == true){handle_read(fd)}else if(is_writeable(fd)==true){handle_write(fd)}
}
但是这个方法有一个问题,如何这个fdset有一万个之多,每次循环都会消耗大量的CPU时间,而且极有可能在一个循环之内,没有任何一个套接字准备好可读,或者可写。
既然这样,CPU的消耗太大,那么干脆让操作系统来高速我们哪个套接字可读,哪个套接字可写。在这个结果发生之前,我们把CPU的控制权交出去,让操作系统来把CPU时间调度给那些需要的进程,这就是select、poll这样的IO分发技术。伪代码如下:
do {poller.dispatch()for fd in registered_fdset{if(is_readable(fd) == true){handle_read(fd)}else if(is_writeable(fd)==true){handle_write(fd)}
}while(ture)
但是,这样的方法需要每次每次dispatch之后,对所有注册的套接字进行逐个排查,效率并不是最高的。如果dispatch调用返回之后只提供有IO事件或者IO变化的套接字,这样排查的效率不会高很多了吗?这就是epoll设计。其伪代码如下:
do {poller.dispatch()for fd_event in active_event_set{if(is_readable_event(fd_event) == true){handle_read(fd_event)}else if(is_writeable_event(fd_event)==true){handle_write(fd_event)}
}while(ture)
Linux 是互联网的基石,epoll 也就成为了解决 C10K 问题的钥匙。FreeBSD 上的 kqueue,Windows 上的 IOCP,Solaris 上的 /dev/poll,这些不同的操作系统提供的功能都是为了解决C10K问题的
基于事件的程序设计
通过使用poll、epoll等IO分发技术,可以设计出基于套接字的事件驱动模型,从而满足高性能、高并发的需求。
基于epoll/poll/select的IO事件分发器可以叫做reactor,也可以叫做事件驱动,或者事件轮询。
事件驱动模型,也叫做反应堆模型(reactor),或者是event loop模型。这个模型的核心有两点:
- 第一,它存在一个无限循环的事件分发线程,或者叫做reactor线程、event loop线程。这个事件分发线程的背后,就是poll、epoll等IO分发技术的使用
- 第二,所有的IO操作都可以抽象成事件,每个事件都必须有回调函数来处理。accept上有连接建立成功、已连接套接字上发送缓冲区空出可以写、通信管道pipe上有数据可以读,这些都是一个个事件,通过事件分发,这些事件都可以一一被检测,并调用对应的回调函数加以处理
几种IO模型和线程模型设计
任何一个网络程序,所做的事情可以总结成下面几种:
- read:从套接字收取数据
- decode:对收到的数据进行解析
- compute:根据解析后的内容,进行计算和处理
- encode:将处理后的结果,按照约定的格式进行编码
- send:最后,通过套接字把结果发送出去
fork
我们可以通过fork来创建子线程,为每个到达的客户端连接服务,如下图。可想而知的是,随着客户数的变多,fork的子进程也越来越多,即使客户和服务器之间的交互比较少,这样的子进程也不能被销毁,一直需要存在。
thread
改进方法是使用pthread_create创建子线程,因为线程是比进程更轻量级的执行单位,所以它的效率比fork的方式,有一定的提高。但是,每次创建一个线程的开销仍然是不小的,因此,引入了线程池的概念,预先创建出一个线程池,在每次新连接到达时,从线程池peek出一个线程为之服务,很好的解决了线程创建的开销。但是,这个模式还是没有解决空闲连接占用资源的问题,如果一个连接在一定时间内容没有数据交互,这个连接还是要占用一定的线程资源,直到这个连接消亡为止。
single reactor thread
事件驱动模式是解决高性能、高并发比较好的一种方式,为什么呢?
因为这种模式是符合大规模生产的需求的。我们的生活中遍历都是类似的模式。比如你去咖啡店喝咖啡,你点了一杯咖啡在一旁喝着,服务员也不会管你,等你有续杯需求的时候,再去和服务员提(触发事件),服务员满足了你的需求,你就继续可以喝着咖啡玩手机。整个柜台的服务方式就是一个事件驱动方式。
如下,一个reactor线程上同时负责分发acceptor的事件、已连接套接字的IO事件
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>#include <fcntl.h>
#include <unistd.h>
#include <errno.h>#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8888typedef int NCALLBACK(int ,int, void*);struct ntyevent {/** epoll要管理的句柄:* (1)nty_event_set:设置其值为要管理的句柄(listenfd/clientfd)* (2)nty_event_add:使用epoll_ctl(添加或者修改)管理【ntyevent->fd】句柄的(ntyevent->events)[EPOLLIN或者EPOLLOUT]事件* (3)nty_event_del:使用epoll_ctl将【ntyevent->fd】从监听树中删除* (4)recv_cb: * 使用:如果接收不到数据或者接收出错就close(ev->fd); * 怎么得到:这个ev是根据参数reactor和fd计算出出来的: struct ntyevent *ev = reactor->events+fd;* (5)send_cb:* 使用:如果数据无法发送或者发送出错就close(ev->fd); * 怎么得到:这个ev是根据参数reactor和fd计算出出来的: struct ntyevent *ev = reactor->events+fd;* (6)ntyreactor_run:* 使用:在每次epoll_wait之前都要关闭很久没有发送数据的ev->fd(每次检查100个)* 怎么得到:for循环reactor->events[checkpos]数组,如果当前ntyevent的status为1而且根据last_active计算出的时间大于60,* 就关闭并将reactor->events[checkpos]从监听树中删除* 使用:每次epoll_wait返回后都会得到已经准备好的事件,从就绪数组中得到 struct ntyevent *ev,并根据是否发生了EPOLLIN* 或者EPOLLOUT调用回调函数,在调用回调函数时将ev->fd传给回调函数,以便回调函数知道是哪个fd发生了事件* 怎么得到:在while循环之前,会申请一个struct epoll_event events[MAX_EPOLL_EVENTS+1];然后每次epoll_wait返回后都会* 填充这个数组,然后循环nready就可以得到 struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; 而* events[i].data.ptr是在nty_event_add时设置的ep_ev.data.ptr = ev;* 也就是说:struct ntyevent *ev会在nty_event_set填充值,在nty_event_add就添加到ep_ev.data.ptr,在epoll_wait* 返回之后就可以通过events[i].data.ptr获取到就绪的ntyevent* 从这里可以推测出,每次epoll_wait都会从events数组的最开始填充就绪*/int fd; /** 有了int (*callback)(..., ..., void *arg);为什么还要有arg* 这里的调度顺序为: * nty_event_set(ev, fd, xxx, reactor); --> nty_event_set中设置:ev->arg = arg;* ntyreactor_run:epoll_wait返回之后,并且发生了感兴趣的事件,就 ev->callback(ev->fd, events[i].events, ev->arg);* ev->callback中: struct ntyreactor *reactor = (struct ntyreactor*)arg; * 因为在epoll_xxx中,只有ntyevent *ev会通过ep_ev.data.ptr传来传去,而ev中的callback只会记录当前设置的回调函数的地址,* 不会记录其回调函数的参数值arg,所以需要一个ev中需要一个指针用来记录要传递给回调函数的参数*/ /** epoll是管理EPOLLIN事件还是EPOLLOUT事件:* (1) nty_event_set:设置ev->events = 0;也就是什么也不监听* (2)nty_event_add:设置ep_ev.events = ev->events = events;(记录要监听EPOLLIN事件还是EPOLLOUT事件)* 为什么要记录呢?这样才能在epoll_wait返回之后,根据其记录是否监听了这个事件(是否发生了事件&&是否对这个事件感兴趣)* 来调用回调函数* if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {* ev->callback(ev->fd, events[i].events, ev->arg); //events[i].events是epoll_wait填充的* }* (3)ntyreactor_run:epoll_wait返回之后,根据其记录是否监听了这个事件(是否发生了事件&&是否对这个事件感兴趣)来调用回调函数*/int events; /** 调用回调函数时,需要传递给回调函数的参数:* (1) nty_event_set时设置:ev->arg = arg;* ntyreactor_addlistener中调度nty_event_set设置arg为reactor* (2) ntyreactor_run:epoll_wait返回之后,并且发生了感兴趣的事件,就 ev->callback(ev->fd, events[i].events, ev->arg);*/void *arg;/** 事件发生时,要调用的回调函数* (1) nty_event_set时设置: ev->callback = callback;* recv_cb时调用nty_event_set,其arg为reactor,也就是回调函数时可以得到reactor* (2) ntyreactor_run:epoll_wait返回之后,并且发生了感兴趣的事件,就 ev->callback(ev->fd, events[i].events, ev->arg);*/int (*callback)(int fd, int events, void *arg);/** 当前ntyevent是否已经被监听了* (1) nty_event_add:* 如果status为1,表示当前已经被监听了,那么就是要修改(epoll_ctl(..,EPOLL_CTL_MOD))这个fd要监听的事件(为参数event)* #如果status不为1,表示当前fd尚未被监听,那么【设置status为1】且使用epoll_ctl(..,EPOLL_CTL_ADD)设置fd要监听event事件* (2)nty_event_del:* 如果status不为1,表示当前fd尚未被监听,那么直接返回* 如果status为1,那么【重置status为0】,并且使用epoll_ctl(..,EPOLL_CTL_DEL)移除监听的fd* (3)accept_cb:* ??????* (4)ntyreactor_run:* 在epoll_wait之前,要关闭很久没有数据到来的连接。具体做法是检查reactor->events[checkpos].last_active,但是我们只检测* 已经监听的ntyevent,也就是reactor->events[checkpos].status为1才被检查*/int status;/** 缓冲区:* (1) recv_cb:有可读事件发生时,使用recv(fd, ev->buffer, BUFFER_LENGTH, 0);将数据读取到 ev->buffer中* (2) send_cb:当fd可以写时,使用send(fd, ev->buffer, ev->length, 0);将ev->buffer上的ev->length发送出去*/char buffer[BUFFER_LENGTH];/** 已用缓冲区长度* (1) recv_cb:有可读事件发生时,要记录本地回调读取到了多少数据,也就是ev->length = len* (2) send_cb:当fd可以写时,使用send(fd, ev->buffer, ev->length, 0);将ev->buffer上的ev->length发送出去*/int length;/** (1) nty_event_set:记录当前时间ev->last_active = time(NULL);* nty_event_set将在recv_cb成功读到数据之后nty_event_set(ev, fd, send_cb, reactor);激活可写事件并加入监听集* nty_event_set将在send_cb成功读到数据之后nty_event_set(ev, fd, recv_cb, reactor);激活可读事件并加入监听集* nty_event_set将在accept_cb中accept之后并且检测到没有达到最大连接数时nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);* 设置回调函数recv_cb并更新last_active,将clientfd的EPOLLIN事件加入监听集中* nty_event_set将在ntyreactor_addlistener中设置listenfd的接收事件nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);* 顺序: listenfd --->ntyreactor_addlistener[acceptor]--> accept_cb[nty_event_set(...clientfd, recv_cb...)]-->recv_cb(读取成功后nty_event_set* (...clientfd, send_cb....)等待发送)--->send_cb[发送成功之后nty_event_set(...clientfd,recv_cb, ...)]-->recv_cb......* (2)ntyreactor_run:在epoll_wait之前,要关闭很久没有数据到来的连接。具体做法是检查reactor->events[checkpos].last_active*/long last_active;
};struct ntyreactor {/*** epoll事件的句柄*/int epfd;/** (1) recv_cb:* a. 通过参数reactor和fd计算出发生可读事件的组件struct ntyevent *ev = reactor->events+fd;* b. 如果没有读取到数据,可以通过ev-reactor->events获取ev在reactor->events中的位置?* (2) send_cb:* a. 通过参数reactor和fd计算出发生可读事件的组件struct ntyevent *ev = reactor->events+fd;* (3) accept_cb:* a. 在accept之后将clientfd加入监听树之前,需要确保已经连接数<最大限制数,方法是遍历ntyreactor->events* 数组[3, MAX_EPOLL_EVENTS),检测第一个状态不为0的索引* 如果i=MAX_EPOLL_EVENTS,说明已经达到最大连接数量,那么就直接跳出循环,不再将clientfd加入监听树* 如果i!=MAX_EPOLL_EVENTS,那么就将reactor->events数组的第clientfd个盒子作为client的包裹(events)* 以及常规操作设置recv_fd之类的 nty_event_set(&reactor->events[clientfd], clientfd, recv_* cb, reactor);并将当前clientfd加入监听树nty_event_add(reactor->epfd, EPOLLIN, &reactor* ->events[clientfd]);* 从上面可以推测出:每次操作系统分配fd时都会从3开始分配,知道找出第一个未使用的fd* (4) ntyreactor_init:* a. 初始化反应堆的时候会申请MAX_EPOLL_EVENTS个盒子:reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * * sizeof(struct ntyevent));* (5) ntyreactor_destory:* a. 销毁ntyreactor_init时申请到的所有快递盒子* (6) ntyreactor_addlistener:* a. 处理监听句柄时需要设置listenfd的快递盒子并将查看快递盒子中的东西以加入监听树* 处理listenfd的快递盒子为reactor->events[listenfd],要处理的是listenfd,当事件发生是要回调的是acceptor:* nty_event_set(&reactor->events[listenfd], listenfd, acceptor, reactor);* b. nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);* ep_ev.data.ptr = reactor->events[sockfd],在epoll_wait的事件发生,就能通过就绪队列得到是哪个盒子发送了事件* ep_ev.events = ev->events = events; 设置对listenfd的输入事件感兴趣* epoll_ctl(epfd, op, ev->fd, &ep_ev) epfd对盒子里面的fd句柄感兴趣* c. ntyreactor_run:* 在epoll_wait之前要关闭很久没有使用的句柄,每次检测100个:* 根据reactor->events[checkpos].status判断当前是否被监听* 根据reactor->events[checkpos].last_active得到上一次使用的时间,从而判断使用时间* 如果需要关闭,根据reactor->events[checkpos].fd判断要删除哪一个fd* 将&reactor->events[checkpos]从监听树上取下:nty_event_del(reactor->epfd, &reactor->events[checkpos]);* 在epoll之后,会得到一个就绪队列events,遍历这个就绪队列:* 得到哪个盒子上发生了事件:struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;* 判断这个盒子上是否发生了可读事件:(events[i].events & EPOLLIN) * 判断这个盒子是否可以输出:(events[i].events & EPOLLOUT)* */struct ntyevent *events;
};int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);// 设置快递盒子里的值
/*
* called:
* a. recv_cb:当成功读取到数据之后要就可以对当前fd写了:
* 将快递盒子从epfd中取出: nty_event_del(reactor->epfd, ev);
* 重新设置快递盒子的内容:nty_event_set(ev, fd, send_cb, reactor);
* 将快递盒子以及感兴趣事件EPOLLOUT重新交给epfd管理: nty_event_add(reactor->epfd, EPOLLOUT, ev);
* b. send_cb: 当发送完数据之后就等待fd发来数量了:
* 将快递盒子从epfd中取出:nty_event_del(reactor->epfd, ev);
* 重新设置快递盒子的内容:nty_event_set(ev, fd, recv_cb, reactor);
* 将快递盒子以及感兴趣事件EPOLLIN交给epfd管理:nty_event_add(reactor->epfd, EPOLLIN, ev);
* b. accept_cb:当accept获取到clientfd之后需要将clientfd管理起来
* clientfd肯定是第一次的,可以对应的盒子被epfd管理,也就不需要移除了
* 设置快递盒子的内容: nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
* 将快递盒子以及感兴趣事件EPOLLIN交给epfd管理:nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
* c. ntyreactor_addlistener:将listenfd加入epfd管理
* listenfd肯定是第一次的,可以对应的盒子被epfd管理,也就不需要移除了
* 设置快递盒子的内容:nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
* 将快递盒子以及感兴趣事件EPOLLIN交给epfd管理: nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
* 总结:nty_event_set的功能是:设置感兴趣的fd,当感兴趣事件发生后的回调函数以及回调函数的参数,设置什么时候对这些事件感兴趣的,清空
* 之前感兴趣的事件(以便将快递盒子交给epfd时当面告知其感兴趣的事件)
* 它的调用时机:监听之后要设置【监听事件来了的回调函数】,监听事件来了之后管理得到的clientfd【等待clientfd的发送了数据到来之后的
* 接收回调函数】,接收事件来了接收完数据之后设置(等待)【对clientfd可写】,可写--->可读--->
*/
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {ev->fd = fd; //这个快递盒子是句柄fd的ev->callback = callback; //这个快递盒子当事件发生时的回调函数ev->events = 0; //清空所有感兴趣事件ev->arg = arg; //ev->last_active = time(NULL);return ;}/*
* 将快递盒子ev和这个ev感兴趣的事件events使用epfd管理起来
* 设置快递盒子中【当前感兴趣的事件】
* 申请一个epoll_event以设置【感兴趣的事件】以及【事件发生时需要知道的快递盒子的地址】
* 根据ev->status判断当前是要添加还是要修改这个盒子感兴趣的事件
* 使用epoll_ctll根据盒子中的ev->fd设置/修改对这个fd哪些事件感兴趣
*/
int nty_event_add(int epfd, int events, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};ep_ev.data.ptr = ev;ep_ev.events = ev->events = events;int op;if (ev->status == 1) {op = EPOLL_CTL_MOD;} else {op = EPOLL_CTL_ADD;ev->status = 1;}if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);return -1;}return 0;
}/**
* 将快递盒子从epfd中移除:
* 使用epoll_ctl的EPOLL_CTL_DEL选项将ev->fd从监听树中移除
*/
int nty_event_del(int epfd, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};if (ev->status != 1) {return -1;}ep_ev.data.ptr = ev; //这里是不是可以设置为NULL呀ev->status = 0;epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);return 0;
}/*
* 当fd上有可读事件发生时的回调函数。arg参数都是ntyreactor
* 计算fd对应的快递盒子:struct ntyevent *ev = reactor->events+fd;
* 先从fd中读取数据到快递盒子的缓冲区中:recv(fd, ev->buffer, BUFFER_LENGTH, 0);
* 如果读到了数据:
* 设置读到了多少数据:ev->length = len;ev->buffer[len] = '\0';
* 将快递盒子从epfd中移除:nty_event_del(reactor->epfd, ev);
* 现在要将回应返回给客户端了,设置数据可发送时的回调函数:nty_event_set(ev, fd, send_cb, reactor);
* 将快递盒子和可写事件加入监控树:nty_event_add(reactor->epfd, EPOLLOUT, ev);(然后epoll_wait就等数据可发送通知我们)
* 如果读不到数据(这种情况一般是对端关闭了):
* 将快递盒子从epfd中移除:nty_event_del(reactor->epfd, ev);
* 关闭clientfd:close(ev->fd);
* 如果读数据出错(这种情况是有错误发生):
* 将快递盒子从epfd中移除:nty_event_del(reactor->epfd, ev);
* 关闭clientfd:close(ev->fd);
* 返回读到的数据长度
*/
int recv_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = reactor->events+fd;int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);nty_event_del(reactor->epfd, ev);if (len > 0) {ev->length = len;ev->buffer[len] = '\0';printf("C[%d]:%s\n", fd, ev->buffer);nty_event_set(ev, fd, send_cb, reactor);nty_event_add(reactor->epfd, EPOLLOUT, ev);} else if (len == 0) {close(ev->fd);printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return len;
}/**
* 待clientfd可读时的回调函数:
* 计算fd对应的快递盒子:struct ntyevent *ev = reactor->events+fd;
* 将盒子ev->buffer中ev->length长度的数据发送出去:int len = send(fd, ev->buffer, ev->length, 0);
* 如果发送成功(len > 0):
* 将快递盒子从监听树中取出:nty_event_del(reactor->epfd, ev);
* 等待clientfd请求时,要回调什么数据:nty_event_set(ev, fd, recv_cb, reactor);
* 将盒子交给epfd处理并告知epfd对fd的EPOLLIN感兴趣:nty_event_add(reactor->epfd, EPOLLIN, ev);
* 如果发送失败(len <= 0):
* 关闭clientfd:close(ev->fd);
* 将快递盒子从监听树中移除:nty_event_del(reactor->epfd, ev);
*/
int send_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = reactor->events+fd;int len = send(fd, ev->buffer, ev->length, 0);if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);nty_event_del(reactor->epfd, ev);nty_event_set(ev, fd, recv_cb, reactor);nty_event_add(reactor->epfd, EPOLLIN, ev);} else {close(ev->fd);nty_event_del(reactor->epfd, ev);printf("send[fd=%d] error %s\n", fd, strerror(errno));}return len;
}/** 有连接到来时的回调函数:* 取出clientfd* 将clientfd加入epfd管理
*/
int accept_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;if (reactor == NULL) return -1;struct sockaddr_in client_addr;socklen_t len = sizeof(client_addr);int clientfd;//有连接到来了就先通过accept获取到对应的clientfd:clientfd = accept(fd, (struct sockaddr*)&client_addr, &len))if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {}printf("accept: %s\n", strerror(errno));return -1;}int i = 0;do {for (i = 3;i < MAX_EPOLL_EVENTS;i ++) {if (reactor->events[i].status == 0) { //只要有一个快递柜没有被使用,那么就说明还有空闲地方,因此就可以break了break;}}if (i == MAX_EPOLL_EVENTS) { //这说明所有的快递柜都已经被使用完了,直接break掉,可以printf但是并没有处理这个clientfd// 一直不去处理这个clientfd有什么后果吗?就是对端发送接收数据都不管,相当于没有连接,因为已经从准备好队列中取出了//如果不去管的话过一段时间OS就会关闭这个clientfdprintf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);break;}/** 还没有达到最大连接数量,可以将之加入epfd管理中:* 设置这个clientfd为非阻塞* 等待clientfd发送数据:* 先设置当发送数据到来了时的回调函数以及快递盒子:nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);* 通知epfd来管理快递柜以及告知其感兴趣事件:nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);*/int flag = 0;if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);break;}nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);} while (0);printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);return 0;}int init_sock(short port) {int fd = socket(AF_INET, SOCK_STREAM, 0);fcntl(fd, F_SETFL, O_NONBLOCK);struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = htonl(INADDR_ANY);server_addr.sin_port = htons(port);bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));if (listen(fd, 20) < 0) {printf("listen failed : %s\n", strerror(errno));}return fd;
}/**
* 初始化反应堆:
* 招聘快递员:reactor->epfd = epoll_create(1);
* 创建这个快递员管理的所有快递柜:reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
*/
int ntyreactor_init(struct ntyreactor *reactor) {if (reactor == NULL) return -1;memset(reactor, 0, sizeof(struct ntyreactor));reactor->epfd = epoll_create(1);if (reactor->epfd <= 0) {printf("create epfd in %s err %s\n", __func__, strerror(errno));return -2;}reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));if (reactor->events == NULL) {printf("create epfd in %s err %s\n", __func__, strerror(errno));close(reactor->epfd);return -3;}
}/*
* 销毁反应堆:
* 解聘快递员:close(reactor->epfd);
* 回收快递柜:free(reactor->events);
*/
int ntyreactor_destory(struct ntyreactor *reactor) {close(reactor->epfd);free(reactor->events);}/*
* 将listenfd交给快递员管理:
* 设置连接到来时快递员应该怎么处理(也就是回调函数):nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
* 快递员将[编号listenfd的快递柜的EPOLLIN事件]加入关注列表:nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
*/
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {if (reactor == NULL) return -1;if (reactor->events == NULL) return -1;nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);return 0;
}/*
* 初始化一个列表,用来装载就绪的盒子
* 持续关注中(while)
* 在关注之前先将很久没有访问的柜子清空(每次清理100个柜子,循环清理):
* 如果这个柜子之前没有加入关注列表就不用清理
* 否则,计算出最近一次使用时间距离现在的间隔:long duration = now - reactor->events[checkpos].last_active;
* 如果间隔超过1分钟了,就:
* 关闭专线(从这个快递盒子中取出对应的专线fd):close(reactor->events[checkpos].fd);
* 将这个快递盒子从关注列表中清空:nty_event_del(reactor->epfd, &reactor->events[checkpos]);
* 等待事件发生,当事件发生后,会填充events列表,最多填充MAX_EPOLL_EVENTS个
* 遍历就绪数组
* 获取就绪数组[i]对应的快递盒子
* 这个就绪数组[i]是否发生了EPOLLIN事件 && 快递盒子中是否设置了关注这个
* 如果答案全部是是,那么调度回调函数。这里回调函数我们主要关注哪个fd上发送了什么事件events[i].events,以及回调函数的参数
*/
int ntyreactor_run(struct ntyreactor *reactor) {if (reactor == NULL) return -1;if (reactor->epfd < 0) return -1;if (reactor->events == NULL) return -1;struct epoll_event events[MAX_EPOLL_EVENTS+1];int checkpos = 0, i;while (1) {long now = time(NULL);for (i = 0;i < 100;i ++, checkpos ++) {if (checkpos == MAX_EPOLL_EVENTS) {checkpos = 0;}if (reactor->events[checkpos].status != 1) {continue;}long duration = now - reactor->events[checkpos].last_active;if (duration >= 60) {close(reactor->events[checkpos].fd);printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);nty_event_del(reactor->epfd, &reactor->events[checkpos]);}}int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);if (nready < 0) {printf("epoll_wait error, exit\n");continue;}for (i = 0;i < nready;i ++) {struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;if ((events[i].events & EPOLLIN ) && (ev->events & EPOLLIN)) {ev->callback(ev->fd, events[i].events, ev->arg); }if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {ev->callback(ev->fd, events[i].events, ev->arg);}}}
}int main(int argc, char *argv[]) {unsigned short port = SERVER_PORT;if (argc == 2) {port = atoi(argv[1]);}int sockfd = init_sock(port);struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));ntyreactor_init(reactor);ntyreactor_addlistener(reactor, sockfd, accept_cb);ntyreactor_run(reactor);ntyreactor_destory(reactor);close(sockfd);return 0;
}
- 第二版:百万并发实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8888
#define PORT_COUNT 100typedef int NCALLBACK(int ,int, void*);struct ntyevent {int fd;int events;void *arg;int (*callback)(int fd, int events, void *arg);int status;char buffer[BUFFER_LENGTH];int length;long last_active;
};struct eventblock {struct eventblock *next;struct ntyevent *events;};struct ntyreactor {int epfd;int blkcnt;struct eventblock *evblk; //fd --> 100w
};int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd);void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {ev->fd = fd;ev->callback = callback;ev->events = 0;ev->arg = arg;ev->last_active = time(NULL);return ;}int nty_event_add(int epfd, int events, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};ep_ev.data.ptr = ev;ep_ev.events = ev->events = events;int op;if (ev->status == 1) {op = EPOLL_CTL_MOD;} else {op = EPOLL_CTL_ADD;ev->status = 1;}if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);return -1;}return 0;
}int nty_event_del(int epfd, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};if (ev->status != 1) {return -1;}ev->status = 0;epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);return 0;
}int recv_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = ntyreactor_idx(reactor, fd);int len = recv(fd, ev->buffer, BUFFER_LENGTH , 0); // nty_event_del(reactor->epfd, ev);if (len > 0) {ev->length = len;ev->buffer[len] = '\0';printf("C[%d]:%s\n", fd, ev->buffer);nty_event_set(ev, fd, send_cb, reactor);nty_event_add(reactor->epfd, EPOLLOUT, ev);} else if (len == 0) {close(ev->fd);//printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return len;
}// 可以向fd发送数据了
/** 先获取反应堆* 查询fd对应的快递柜地址* 将ev->buffer中ev->length长度的数据发送给fd* 如果发送成功了,就重新等待clientfd请求:* 先* 重新设置正确的反应(事件发生时,会快递盒子中拿出这个反应)* 告知reactor->epfd关注快递盒子ev的EPOLLIN事件* 如果发送失败* 关闭专线:close(ev->fd);* 将快递盒子拿出了:nty_event_del(reactor->epfd, ev);
*/
int send_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = ntyreactor_idx(reactor, fd);int len = send(fd, ev->buffer, ev->length, 0);if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);nty_event_del(reactor->epfd, ev);nty_event_set(ev, fd, recv_cb, reactor);nty_event_add(reactor->epfd, EPOLLIN, ev);} else {close(ev->fd);nty_event_del(reactor->epfd, ev);printf("send[fd=%d] error %s\n", fd, strerror(errno));}return len;
}// 有连接来了
int accept_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;if (reactor == NULL) return -1;struct sockaddr_in client_addr;socklen_t len = sizeof(client_addr);int clientfd;//获取连接句柄,并设置其为非阻塞if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {}printf("accept: %s\n", strerror(errno));return -1;}int flag = 0;if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);return -1;}// 获取clientfd对应的快递柜struct ntyevent *event = ntyreactor_idx(reactor, clientfd);// 将clientfd以及对应快递柜加入关注列表,告知其EPOLLIN事件来了就做出recv_cb的反应nty_event_set(event, clientfd, recv_cb, reactor);nty_event_add(reactor->epfd, EPOLLIN, event);printf("new connect [%s:%d], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);return 0;}int init_sock(short port) {int fd = socket(AF_INET, SOCK_STREAM, 0);fcntl(fd, F_SETFL, O_NONBLOCK);struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = htonl(INADDR_ANY);server_addr.sin_port = htons(port);bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));if (listen(fd, 20) < 0) {printf("listen failed : %s\n", strerror(errno));}return fd;
}int ntyreactor_alloc(struct ntyreactor *reactor) {if (reactor == NULL) return -1;if (reactor->evblk == NULL) return -1;//找到快递公司创建的最后一个机房的地址,下一个机房的地址将会挨着它建造(实际上没有挨着,只是走逻辑)struct eventblock *blk = reactor->evblk; // 快递公司记录的第一个机房的地址while (blk->next != NULL) {blk = blk->next;}//申请创建一排快递柜用来存放某个小区的地址struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));if (evs == NULL) {printf("ntyreactor_alloc ntyevents failed\n");return -2;}memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));//申请创建一个机房,这个机房里存放着快递柜struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));if (block == NULL) {printf("ntyreactor_alloc eventblock failed\n");return -2;}memset(block, 0, sizeof(struct eventblock));block->events = evs; //存放快递柜block->next = NULL; //这是最后一个机房// 这个机房归属于这个快递公司了blk->next = block;reactor->blkcnt ++; //快递公司喜提不动产return 0;
}/*
* 去快递公司查询fd对应的快递柜的地址
*/
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {// 根据fd计算出对应的机房(一个机房对应一个小区)int blkidx = sockfd / MAX_EPOLL_EVENTS;/* 如果这个小区对应的机房还没有创建,就创建:* [3]/1024=0 0 >= 0 : 申请机房1, reactor->blkcnt = 1 * [4]/1024=0 0 >= 1, 不需要申请* ...* [1024]/1024=1 1>=1, 申请机房2, reactor->blkcnt=2*/while (blkidx >= reactor->blkcnt) { //感觉这里if更加好,为什么要用wherentyreactor_alloc(reactor);} int i = 0;struct eventblock *blk = reactor->evblk; //从快递公司找到第一个机房的地址while(i ++ < blkidx && blk != NULL) { //找到第blkidx个机房对应的地址blk = blk->next; }return &blk->events[sockfd % MAX_EPOLL_EVENTS]; //去对应机房中找到sockfd对应的快递柜
}/*
* 初始化反应堆:
* 招聘快递员
* 申请足以存放小区A所有快递柜的空间: struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
* 快递员管理的第一间机房: struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
* 这个房子里放的是小区A的快递柜:block->events = evs;
* 现在已经空间已经足够了,不需要给这个快递员在分配一个机房了:block->next = NULL;
* 快递员记录自己管理的第一个机房的地址: reactor->evblk = block; (下一个机房的地址只能顺着机房找机房)
* 记录下自己管理了多少间机房:reactor->blkcnt = 1;
* 总的来说:就是申请一排快递柜用来放小区A的快递,申请一个机房用来放某个小区的快递,快递员记录自己管理的机房
*/
int ntyreactor_init(struct ntyreactor *reactor) {if (reactor == NULL) return -1;memset(reactor, 0, sizeof(struct ntyreactor));//招聘快递员reactor->epfd = epoll_create(1);if (reactor->epfd <= 0) {printf("create epfd in %s err %s\n", __func__, strerror(errno));return -2;}// 申请足以存放小区A所有快递柜的空间struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));if (evs == NULL) {printf("ntyreactor_alloc ntyevents failed\n");return -2;}memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));//快递公司创建的的第一间机房struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));if (block == NULL) {printf("ntyreactor_alloc eventblock failed\n");return -2;}memset(block, 0, sizeof(struct eventblock));block->events = evs;block->next = NULL;reactor->evblk = block;reactor->blkcnt = 1;return 0;
}/*
* 销毁反应对:
* 解雇快递员: close(reactor->epfd);
* 顺着反应堆记录的第一个机房的地址,一一回收快递柜和机房
*/
int ntyreactor_destory(struct ntyreactor *reactor) {close(reactor->epfd);struct eventblock *blk = reactor->evblk;struct eventblock *blk_next = NULL;while (blk != NULL) {blk_next = blk->next;free(blk->events);free(blk);blk = blk_next;}return 0;
}/*
* 将listenfd加入监听树并记录
*/
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {if (reactor == NULL) return -1;if (reactor->evblk == NULL) return -1;//reactor->evblk->events[sockfd];struct ntyevent *event = ntyreactor_idx(reactor, sockfd);nty_event_set(event, sockfd, acceptor, reactor);nty_event_add(reactor->epfd, EPOLLIN, event);return 0;
}int ntyreactor_run(struct ntyreactor *reactor) {if (reactor == NULL) return -1;if (reactor->epfd < 0) return -1;if (reactor->evblk == NULL) return -1;struct epoll_event events[MAX_EPOLL_EVENTS+1];int checkpos = 0, i;while (1) {int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);if (nready < 0) {printf("epoll_wait error, exit\n");continue;}for (i = 0;i < nready;i ++) {struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {ev->callback(ev->fd, events[i].events, ev->arg);}if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {ev->callback(ev->fd, events[i].events, ev->arg);}}}
}// 3, 6w, 1, 100 ==
// <remoteip, remoteport, localip, localport>
int main(int argc, char *argv[]) {unsigned short port = SERVER_PORT; // listen 8888if (argc == 2) {port = atoi(argv[1]);}struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));ntyreactor_init(reactor);int i = 0;int sockfds[PORT_COUNT] = {0};for (i = 0;i < PORT_COUNT;i ++) {sockfds[i] = init_sock(port+i);ntyreactor_addlistener(reactor, sockfds[i], accept_cb);}ntyreactor_run(reactor);ntyreactor_destory(reactor);for (i = 0;i < PORT_COUNT;i ++) {close(sockfds[i]);}free(reactor);return 0;
}
single reactor thread + worker threads
但是上面的设计有一个问题,和IO事件相比,应用程序的业务逻辑处理是比较耗时的,比如XML文件的解析、数据库记录的查找、文件资料的读取和传输、计算型工作的处理等,这些工作相对而言比较独立,它们会拖慢整个反应堆模型的执行效率。
所以,将这些decode、compute、encode型工作放置到另外的线程池中,和反应堆线程解耦,是一个比较明智的选择。如下图,反应堆线程只负责处理IO相关的工作,业务逻辑相关的工作都被裁剪成一个一个的小任务,放到线程池中由空闲的线程来执行。当结果完成后,再交给反应堆线程,由反应堆线程通过套接字将结果发送出去。
示例程序
#include <lib/acceptor.h>
#include "lib/common.h"
#include "lib/event_loop.h"
#include "lib/tcp_server.h"char rot13_char(char c) {if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}// 连接建立之后的 callback
int onConnectionCompleted(struct tcp_connection *tcpConnection) {printf("connection completed\n");return 0;
}// 数据读到 buffer 之后的 callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {printf("get message from tcp connection %s\n", tcpConnection->name);printf("%s", input->data);struct buffer *output = buffer_new();int size = buffer_readable_size(input);for (int i = 0; i < size; i++) {buffer_append_char(output, rot13_char(buffer_read_char(input)));}tcp_connection_send_buffer(tcpConnection, output);return 0;
}// 数据通过 buffer 写完之后的 callback
int onWriteCompleted(struct tcp_connection *tcpConnection) {printf("write completed\n");return 0;
}// 连接关闭之后的 callback
int onConnectionClosed(struct tcp_connection *tcpConnection) {printf("connection closed\n");return 0;
}int main(int c, char **v) {// 主线程 event_loopstruct event_loop *eventLoop = event_loop_init(); //创建一个event_loop,也就是reactor对象,这个event_loop和线程相关联,每个event_loop在线程里执行的是一个无限循环,以便完成事件的分发// 初始化 acceptor,用来监听某个端口struct acceptor *acceptor = acceptor_init(SERV_PORT);// 初始 tcp_server,可以指定线程数目,如果线程是 0,就只有一个线程,既负责 acceptor,也负责 I/Ostruct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage, onWriteCompleted, onConnectionClosed, 0); // 这里比较重要的是传入了几个回调函数,分别对应了连接建立完成、数据读取完成、数据发送完成、连接关闭完成几种操作,通过回调函数,让业务程序可以聚集在业务层开发tcp_server_start(tcpServer); //开启监听// main thread for acceptorevent_loop_run(eventLoop); // 运行event_loop无限循环,等待acceptor上有连接建立、新连接上有数据可读
}
这里自始至终都只有一个main thread在工作,这个reactor反应堆同时分发acceptor上的连接建立时间和已经连接的IO事件
非阻塞IO + readiness notification + 多线程
前面的做法是所有的IO事件都在一个线程里分发,前面的做法是所有的IO事件都在一个线程里分发,reactor反应堆同时分发acceptor上的连接建立时间和已经连接的IO事件
但是这种模式,在发起连接请求的客户端非常多的情况下,有一个地方是有问题的,那就是单reactor线程既要分发连接建立,又要分发已建立连接的IO,优点忙不过来,在实际中的表现就是客户端连接成功率偏低。
再者,新的硬件技术在不断发展,多核多路CPU已经得到了极大的应用,单reactor反应堆看着大把的 CPU 资源却不用,有点可惜。
将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离(如果我们把线程引入进来,可以利用现代CPU多核的能力,让每个核都可以作为一个IO分发器进行IO事件的分发),形成所谓的主从reactor模式。
主从reactor模式
下图描述了主从reactor模式是如何工作的。
主-从reactor模式的核心思想就是,主反应堆只负责分发accept连接建立,已连接套接字上的IO事件交给sub-reactor负责分发。其中sub-reactor的数量,可以根据CPU的核数来灵活设置。
比如一个四核CPU,我们可以设置sub-reactor为4.相当于有4个身手不凡的反应堆线程同时在工作,这大大降低了IO分发处理的效率。而且,同一个套接字事件分发只会出现在一个反应堆线程中,这会大大减少并发处理的锁开销。
从上图可以看出,我们的反应堆线程一直在感知连接建立的事件,如果有连接成功建立,主反应堆线程通过accept分发获得已连接套接字,接下来会按照一定的算法选取一个从反应堆线程,并把已连接套接字加入到选择好的从反应堆线程中。
主反应堆线程的唯一工作,就是调用accept获取已连接套接字,以及将已连接套接字加入到从反应堆线程中。不过,这里还有一个小问题,主反应堆线程和从反应堆线程,是两个不同的线程,如何把已连接套接字加入到另外一个线程中呢?而且,此时从反应堆线程或者处于事件分发的无限循环中,这种情况下应该怎么办呢?
主 - 从 reactor+worker threads 模式
如果说主-从reactor模式解决了IO分发的高效率问题,那么work threads就解决了业务逻辑和IO分发之间的耦合问题。把这两个策略组织在一起,就是实战中普遍采用的模式。
上图显示了主 - 从反应堆下加上 worker 线程池的处理模式。主 - 从反应堆跟上面介绍的做法是一样的。和上面不一样的是,这里将decode、compute、encode等CPU密集型的工作从IO线程中拿走,这些工作交给worker线程池来处理,而且这些工作拆分成了一个个子任务进行。encode之后完成的结果再由sub-reactor的IO线程发送出去。
示例程序
#include <lib/acceptor.h>
#include "lib/common.h"
#include "lib/event_loop.h"
#include "lib/tcp_server.h"char rot13_char(char c) {if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}// 连接建立之后的 callback
int onConnectionCompleted(struct tcp_connection *tcpConnection) {printf("connection completed\n");return 0;
}// 数据读到 buffer 之后的 callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {printf("get message from tcp connection %s\n", tcpConnection->name);printf("%s", input->data);struct buffer *output = buffer_new();int size = buffer_readable_size(input);for (int i = 0; i < size; i++) {buffer_append_char(output, rot13_char(buffer_read_char(input)));}tcp_connection_send_buffer(tcpConnection, output);return 0;
}// 数据通过 buffer 写完之后的 callback
int onWriteCompleted(struct tcp_connection *tcpConnection) {printf("write completed\n");return 0;
}// 连接关闭之后的 callback
int onConnectionClosed(struct tcp_connection *tcpConnection) {printf("connection closed\n");return 0;
}int main(int c, char **v) {// 主线程 event_loopstruct event_loop *eventLoop = event_loop_init();// 初始化 acceptorstruct acceptor *acceptor = acceptor_init(SERV_PORT);// 初始 tcp_server,可以指定线程数目,这里线程是 4,说明是一个 acceptor 线程,4 个 I/O 线程,没一个 I/O 线程//tcp_server 自己带一个 event_loopstruct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,onWriteCompleted, onConnectionClosed, 4);tcp_server_start(tcpServer);// main thread for acceptorevent_loop_run(eventLoop);
}
这个示例几乎和上个示例一样,唯一的不同是在创建TCPServer时,线程的数量设置不再是0,而是4。这里线程是4,说明是一个主acceptor线程,4个从reactor线程,每一个线程都跟一个event_loop意义绑定。
你可能会问,这么简单就完成了主、从线程的配置?
答案是YES。这其实是设计框架要考虑的地方,一个框架不仅要考虑性能、扩展性,也需要考虑可用性。可用性部分就是程序开发者如何使用框架。如果我是一个开发者,我肯定关心框架的使用方式是不是足够方便,配置是不是足够灵活等。
像这里,可以根据需求灵活地配置主、从反应堆线程,就是一个易用性的体现。当然,因为时间有限,我没有考虑 woker 线程的部分,这部分其实应该是应用程序自己来设计考虑。网络编程框架通过回调函数暴露了交互的接口,这里应用程序开发者完全可以在 onMessage 方法里面获取一个子线程来处理 encode、compute 和 encode 的工作,像下面的示范代码一样。
// 数据读到 buffer 之后的 callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {printf("get message from tcp connection %s\n", tcpConnection->name);printf("%s", input->data);// 取出一个线程来负责 decode、compute 和 encodestruct buffer *output = thread_handle(input);// 处理完之后再通过 reactor I/O 线程发送数据tcp_connection_send_buffer(tcpConnection, output);return
epoll改进
主线程的 epoll_wait 只处理 acceptor 套接字的事件,表示的是连接的建立;反应堆子线程的 epoll_wait 主要处理的是已连接套接字的读写事件。
epoll的性能凭什么就要比poll或者select好呢?这要从两个角度来说明:
- 第一个角度是事件集合。在每次使用epoll或select之前,都需要准备一个感兴趣的事件集合的注册。而epoll则不是这样,epoll维护了一个全局的事件集合,通过epoll句柄,可以操作这个事件集合,增加、删除或修改这个事件集合里的某个元素。要知道在绝大多数情况下,事件集合的变化没有那么的大,这样操作系统就不需要每次重新扫描事件集合,构建内核空间数据结构。
- 第二个角度是就绪列表。每次在使用poll或者select之后,应用程序都需要扫描整个感兴趣的事件集合,从中找出真正活动的事件,这个列表如果增长到10K以上,每次扫描的时间损耗也是惊人的。事实上,很多情况下扫描完一圈,可能发现只有几个真正活动的事件。而epoll则不是这样,epoll返回的时候直接就是活动的事件列表,应用程序减少了大量的扫描时间。
此外,epoll还提供了更高级的能力----边缘触发。举个例子:
- 如果某个套接字有100个字节可读,边缘触发和条件触发都会产生read ready notification事件,如果应用程序只读取了50个字节,边缘触发就会陷入等待;而条件触发则会因为还有50个字节没有读取完,不断的产生read ready notification事件。
- 在边缘触发下,如果某个套接字缓冲区可写,会无限次返回write ready notification事件,在这种情况下,如果应用程序没有准备好,不需要发送数据,一定要解除套接字上的ready notification,否则CPU就直接跪了。
从上面可以看出,边缘触发只会产生一次活动事件,性能和效率更高。不过,程序处理起来要更为小心。
异步IO + 多线程
异步非阻塞IO模型是一种更为高效的方式 ,当调用结束之后,请求立即返回,由操作系统后台完成赌赢的操作,当最终操作完成,就会产生这个信号,或者执行一个回调函数来完成IO处理。
这就涉及到了 Linux 下的 aio 机制
ps:Linux 的 AIO 机制可能后面逐渐不用了,可以关注 5.1 的 io_uring 机制,大杀器
引言
在上面几部分中,我们谈到了阻塞IO、非阻塞IO以及像select、poll、epoll等IO多路复用技术,并在此基础上结合线程技术,实现了以事件分发为核心的reactor反应堆模式。你或许还听说过一个叫做proactor的网络事件驱动模式,这个 Proactor 模式和 reactor 模式到底有什么区别和联系呢?
阻塞 / 非阻塞 VS 同步 / 异步
阻塞IO:阻塞IO发起read请求,线程会被挂起,一直等到内核数据准备好,并把数据从内核区域拷贝到应用程序的缓冲区中,当拷贝过程完成,read请求调用才返回。接下来,应用程序就可以对缓冲区的数据进行数据解析
非阻塞IO:非阻塞IO的read请求在数据未准备好的情况下立即返回,应用程序可以不断轮询内核,直到数据准备好,内核将数据拷贝到应用程序缓冲,并完成这次read调用。注意,这里最后一次read调用,获取数据的过程,是一个同步的过程。这里的同步指的是内核区域的数据拷贝到缓冲区这个过程。
每次让应用程序去轮询内核的IO是否准备好,是一个不经济的做法,因为在轮询的过程中应用进程啥也不能干。于是,像select、poll这样的IO多路复用技术就隆重登场了。通过IO事件分发,当内核数据准备好时,在通知应用程序进行操作。这个做法大大改善了应用进程对CPU的利用率,在没有被通知的情况下,应用程序可以使用CPU做其他的事情。
注意,这里的read调用,获取数据的过程,也是一个同步的过程
第一种阻塞IO情况下,应用程序会被挂起,直到获取数据,第二种非阻塞IO和第三种基于非阻塞IO的多路复用技术,获取数据的操作不会被阻塞。
无论第一种阻塞IO,还是第二种非阻塞IO,第三种基于非阻塞IO的多路复用技术都是同步调用技术。为什么这么说呢?因为同步调用,异步调用的说法,是对于获取数据的过程而言的,前面几种最后获取数据的read操作调用,都是同步的,在read调用时,内核将数据从内核空间拷贝到应用程序空间,这个过程是在read函数中同步进行的,如果内核实现的拷贝效率很差,read调用就会在这个同步过程中消耗比较长的时间。
而真正的异步调用则不用担心这个问题,当我们发起aio_read之后,就立即返回,内核自动将数据从内核空间拷贝到应用程序空间,这个拷贝过程是异步的,内核自动完成的,和前面的同步不一样,应用程序并不需要主动发起拷贝动作。
举个生活中的例子:
第一种阻塞 I/O 就是你去了书店,告诉老板你想要某本书,然后你就一直在那里等着,直到书店老板翻箱倒柜找到你想要的书。
第二种非阻塞 I/O 类似于你去了书店,问老板有没有一本书,老板告诉你没有,你就离开了。一周以后,你又来这个书店,再问这个老板,老板一查,有了,于是你买了这本书。
第三种基于非阻塞的 I/O 多路复用,你来到书店告诉老板:“老板,到货给我打电话吧,我再来付钱取书。”
第四种异步 I/O 就是你连去书店取书的过程也想省了,你留下地址,付了书费,让老板到货时寄给你,你直接在家里拿到就可以看了。
这里放置了一张表格,总结了以上几种 I/O 模型。
proactor 模式是以异步IO为基础的
linux下socket套接字的异步支持
aio系列函数是由POSIX定义的异步操作接口,可惜的是,linux下的aio操作,不是真正的操作系统级别支持的,它只是由GNU libc库函数在用户空间借由pthread方式实现的,而且仅仅针对磁盘类IO,套接字IO不支持。
也有很多 Linux 的开发者尝试在操作系统内核中直接支持 aio,例如一个叫做 Ben LaHaise 的人,就将 aio 实现成功 merge 到 2.5.32 中,这部分能力是作为 patch 存在的,但是,它依旧不支持套接字。
Solaris 倒是有真正的系统系别的 aio,不过还不是很确定它在套接字上的性能表现,特别是和磁盘 I/O 相比效果如何。
综合以上结论就是,linux下对异步操作的支持非常有限,这也是为什么使用epoll等多路复用技术加上非阻塞IO来解决linux下高并发高性能网络IO问题的根本原因。
Windows 下的 IOCP 和 proactor 模式
和 Linux 不同,Windows 下实现了一套完整的支持套接字的异步编程接口,这套接口一般被叫做 IOCompletetionPort(IOCP)。
这样,就产生了基于 IOCP 的所谓 proactor 模式。
和reactor模式一样,proactor模式也存在一个无限循环运行的event loop线程,但是不同于reactor模式,这个线程并不负责处理IO调用,它只是负责在对应的read、write操作完成的情况下,分发完成事件到不同的处理函数
这里举一个 HTTP 服务请求的例子来说明:
- 客户端发起一个GET请求
- 这个GET请求对应的字节流被内核读取完成,内核将这个完成事件放置到一个队列中
- event loop线程,也就是poractor从这个队列里获取事件,根据事件类型,分发到不同的处理函数上,比如一个http handle的onMessage解析函数
- HTTP request解析函数完成报文解析
- 业务逻辑处理,比如读取数据库的记录
- 业务逻辑处理完成,开始encode,完成之后,发起一个异步写操作
- 这个异步写操作被内核执行,完成之后这个异步写操作被放置在内核的队列中
- proactor线程获取这个完成事件,分发到HTTP handler 的 onWriteCompled 方法执行。
从这个例子可以看出,由于系统内核提供了真正的“异步”操作,proactor不会再向reactor一样,每次感知事件后再调用read、write分发完成数据的读写,它只负责感知事件完成,并由对应的handle发起异步读写请求,IO读写操作本身是由系统内核完成的。因此这里需要传入数据缓冲区的地址等信息,这样,系统内核才能帮助我们自动完成数据的读写工作。
无论是reactor模式,还是proactor模式,都是一种基于事件分发的网络编程模式。reactor模式是基于待完成的IO事件,proactor模式则是基于已完成的IO事件,两者的本质,都是借由事件分发的思想,设计出可兼容、可扩展、接口友好的一套程序框架。
小结
和同步IO相比,异步IO的读写操作有内核自动完成,不过,在linux下目前仅支持简单的基于本地文件的aio异步操作,这也使得我们在编写高性能网络程序时,首选reactor模式,借由epoll这样的IO分发技术完成开发;而windows下的IOCP则是一种异步IO的技术,并由此产生了与reactor齐名的proactor模式,借助这种模式,可以完成 Windows 下高性能网络程序设计。
reactor和proactor的区别前者是同步 有消息到达时调用应用程序的回调,应用程序自己调用read 同步取得数据,而后者是内核异步数据读取完成之后才调用应用程序的回调。Linux下标榜的proactor其实都是伪的。
总结
支持单机1万并发的问题叫做C10K问题,为了解决C10K问题,需要重点考虑两个方面的问题:
- 如何和操作系统配合,感知IO事件的发送
- 如何分配和使用进程、线程资源来服务上万个连接
基于这些组成,产生了一些通用的组合方法,在Linux下,解决高性能问题的利器就是非阻塞IO加上epoll机制,再利用多线程能力,
网络编程:C10K问题而引出的reactor模型相关推荐
- 90分钟详解网络编程相关的细节处理丨 reactor丨网络io丨epoll丨C/C++丨Linux服务器开发丨后端开发丨Linux后台开发
90分钟搞懂网络编程相关细节处理 1. 网络编程四要素 2. io多路复用 3. reactor三种基础封装方式 视频讲解如下,点击观看: 90分钟详解网络编程相关的细节处理丨 reactor丨网络i ...
- 【Linux网络编程】循环服务器之UDP循环模型
00. 目录 文章目录 00. 目录 01. 概述 02. UDP循环服务器的实现方法 03. UDP循环服务器模型 04. UDP循环服务器实现 05. 附录 01. 概述 服务器设计技术有很多,按 ...
- Linux非阻塞IO(二)网络编程中非阻塞IO与IO复用模型结合
上文描述了最简易的非阻塞IO,采用的是轮询的方式,这节我们使用IO复用模型. 阻塞IO 过去我们使用IO复用与阻塞IO结合的时候,IO复用模型起到的作用是并发监听多个fd. 以简单的回射服务器为例,我 ...
- Linux 网络编程——并发服务器的三种实现模型
服务器设计技术有很多,按使用的协议来分有 TCP 服务器和 UDP 服务器,按处理方式来分有循环服务器和并发服务器. 循环服务器与并发服务器模型 在网络程序里面,一般来说都是许多客户对应一个服务器(多 ...
- 0729------Linux网络编程----------使用 select 、poll 和 epoll 模型 编写客户端程序
1.select 模型 1.1 select 函数原型如下,其中 nfds 表示的描述符的最大值加1(因为这里是左闭右开区间),中间三个参数分别表示要监听的不同类型描述符的集合,timeout用来表示 ...
- Linux后端服务器网络编程之线程模型丨reactor模型详解
前言 上一篇文章<后端服务器网络编程之 IO 模型>中讲到服务器端高性能网络编程的核心在于架构,而架构的核心在于进程/线程模型的选择.本文将主要介绍传统的和目前流行的进程/线程模型,在 ...
- 【Linux】一步一步学Linux网络编程教程汇总(更新中......)
00. 目录 文章目录 00. 目录 01. 基础理论知识 02. 初级编程 03. 高级编程 04. LibEvent库 05. 06. 07. 01. 基础理论知识 [Linux网络编程]网络协议 ...
- (Java)socket网络编程及处理socket粘包拆包问题
目录 1.socket简介 2.TCP/IP协议 3.tcp三次握手 4.socket的一些接口函数原理 5.java socket 长连接粘包拆包问题 6.socket模拟服务端客户端发消息 7.U ...
- 网络编程——The C10K Problem(C10K = connection 10 kilo 问题)。k 表示 kilo,即 1000
The C10K problem翻译 (C10K = connection 10 kilo 问题).k 表示 kilo,即 1000 比如:kilometer(千米), kilogram(千克). 如 ...
最新文章
- DLL load failed: 页面文件太小,无法完成操作
- 湖南省计算机二级程序题库,湖南省计算机二级单选题题库(直接打印)
- oxyen eclipse 启动 报错 se启动提示javaw.exe in your current PATH、No java virtual machine
- 如何评估深度学习模型效果?阿里工程师这么做 1
- Linux-Windows-Mac-RabbitMQ安装教程
- java jobkey_Java JobBuilder.newJob方法代码示例
- 如何将网页实现变灰效果?
- java 常用的五大包
- QT学习五之界面切换
- 自己小米4c 高通9008模式刷机 低版本 亲测有效
- 企业oa系统是什么,有什么好用的办公软件推荐?
- Odoo12功能增强模块
- 2021-7-20 Cityscape 数据集从19分类到4分类BiSeNetv1-v2训练验证和测试一条龙
- 临危受命 青力支持:“信豫链”及时上线 保障河南中小企业带“资”复工
- 向量的加减(运算符重载)
- Win11电脑蓝屏怎么办?Win11电脑蓝屏的修复方法
- WIFI6比WIFI5好在哪里呢?
- win10 idea配置git命令简写缩写
- 【华为上机试题C++】老师想知道从某某同学当中,分数最高的是多少,现在请你编程模拟老师的询问。当然,老师有时候需要更新某位同学的成绩.
- irq : nobody cared (try booting with the “irqpoll“ option) 问题说明