Libevent教程001: 简介与配置
本文内容大致翻译自 libevent-book, 但不是照本翻译. 成文时, libevent最新的稳定版为 2.1.8 stable. 即本文如无特殊说明, 所有描述均以 2.1.8 stable 版本为准.
本文为系列文章的第一篇, 对应libevent-book的 chapter 0 + chapter 1 + R0 + R1
0. 前提条件
这个文档是对libevent的介绍与指导, 阅读文档需要你具有以下的能力:
- 你精通C语言
- 你至少了解Unix网络编程.
- 你会安装libevent
- 你大致知道libevent是干什么用的.
1. 基本概念: 阻塞/非阻塞/同步/异步/回调机制的讨论
这里首先要解释四个名词: 阻塞, 非阻塞, 同步, 异步. 它们都是修饰"接口"的形容词, 或者说的土一点, 它们都是修饰"函数"的形容词.
同步, 还是异步, 是从"消息通信"的视角去描述这个接口的行为. 而所谓的消息通信, 你可以简单的把"函数"想象成一个淘宝客服, 把"调用方"想象成你自己. 调用函数的过程其实就是三步:
- "你"询问"淘宝客服"一个问题. 比如, "在吗?". 在这个场景中, 你就是"调用方", "淘宝客服"是函数, 而那句"在吗?", 则是函数参数, 你把函数参数传递给函数.
- "淘宝客服"进行后台处理. 这时淘宝客服接收到了你的询问消息, 如果他没有在忙, 那么他可以立即回复你. 如果他现在正在忙, 比如正在吃饭, 比如正在和老婆吵架, 比如淘宝客服需要先看一下你之前的行为记录, 然后再决定如何回复你.(比如他看到你正在浏览一双袜子,觉得你在潜在的买家, 他决定回复你. 比如他看到你三天前下单买了一双袜子, 但袜子还没有发货, 他觉得你有退货的风险, 从而决定不理你, 假装不在.) 这个客服思考决断的过程, 就是函数内部进行处理运算的过程. 当然这个例子很简单, 有些牵强.
- 最终, 淘宝客服回复了你, "在的, 亲". 这里, 回复这个动作, 就是函数返回, 而"在的, 亲"这句话, 就是函数的返回值.
你从这个角度去看, 函数调用, 就是消息通信的过程, 你发送消息给函数, 函数经过一番运算思考, 把结果再回发给你.
所谓的同步, 异步, 指的是:
- 这个淘宝客服很老实, 对于每个顾客发来的问题, 他都需要经过一番思考, 再进行答复. 这个函数很老实, 对于每个函数调用, 都很老实的根据传入参数进行计算, 再返回结果. 也是是说, 在淘宝客服思考结束之前, 这个客服不会向你发送答复, 你也收不到答复. 也就是说, 在函数运算结束之前, 函数不会返回, 你也得不到返回值. 那么, 这个客服是同步的, 这个函数调用的过程是同步调用, 这个函数是同步的.
- 假如这个淘宝客服很不老实, 他装了一个自动答复小程序. 对于每个询问的顾客, 都先自动回复一句"亲, 现在很忙哟, 客服MM可能过一会才能给你答复". 也就是说, 顾客在发出询问之后, 立即就能得到一个答复. 也就是说, 调用方在调用一个函数之后, 这个函数就立即返回了. 而真正的结果, 可能在过五分钟之后才会给你. 即是五分钟之后客服对你说"在的呢, 亲". 这样的函数, 就叫异步函数.
异步客服需要解决一个问题: 当真正的运算结果得出之后, 被调用的客服如何通知作为调用方的你, 取走答案. 在淘宝客户端上, 是通过手机的震动消息提醒, 是通过聊天框的红点.
所以, 关于同步, 和异步, 这里做一个稍微正式一点的总结:
- 同步的过程: 调用方传参->函数运算->函数返回运算结果.
- 异步的过程: 调用方传参->函数说我知道了, 然后过了五分钟, 函数说我算出来了, 结果在这里, 你来取.
这里我们着眼于消息的传递, 通讯方式, 也就是站在函数的角度去看, 结果是如何传递给调用方的. 同步接口, 运算结果在"函数调用"这个场景下就返回给了调用方. 异步接口: 运算结果在"函数调用"这个场景之后的某个不定的时刻, 通过某种通知方式, 传递给调用方.
整个过程中我们忽略了一件事: 就是, 在函数执行运算的过程中, 调用方在干什么. 也是是, 在淘宝客服内心思考如何回复你的时候, 你在干什么.
这就引出了阻塞与非阻塞:
- 阻塞: 在函数执行运算的过程中, 当前线程什么也做不了. 在等待客服回复的过程中, 你什么也不做, 就在那干等着, 直到他回复了你.
- 非阻塞: 在函数执行去处的过程中, 当前线程可以去做其它事情. 在等待客服回复的过程中, 你上了个厕所, 还顺便洗了个澡.
换句话说:
- 同步与异步, 描述的是 被调用的函数, 如何将结果返回给调用者
- 阻塞与非阻塞, 描述的是 调用方, 在得到结果之前能不能脱身
这是两个维度上的逻辑概念, 这两个维度互相有一定的干涉, 并不是完全正交的两个维度, 这样, 既然是两个维度, 那么就有四种组合.
- 同步, 且阻塞: 调用方发起调用直至得到结果之前, 都不能干其它事情. 被调函数接收到参数直到运算结束之前, 都不会返回.
- 同步, 非阻塞: 调用方发起调用直至得到结果之前这段时间, 可以做其它事情. 但被调函数接收到参数直到运算结束之前, 都不会返回. 很显然这个逻辑概念说得通, 但其实是反常理的. 因为: 如果调用方在发起调用之后, 得到结果(函数返回)之前, 要去做其它事情, 那么就有一个隐含的前提条件: 调用方必须知道本次调用的耗时, 且被调方(函数)严格遵守这个时间约定. 一毫秒不多, 一毫秒不少. 这在代码的世界里是很难达到的.
- 异步, 且阻塞: 调用方发起调用直至得到结果之前, 都不能干其它事情. 被调用函数接收到参数之后立即返回, 但在随后的某个时间点才把运算结果传递给调用方. 之后调用方继续活动. 这个逻辑概念依然说得通, 但是很别扭. 这就相当于, 在你问淘宝客服问题的时候, 淘宝客服的自动回复机器人已经给你说了"客服很忙哟, 可能过一会才能答复你", 但你就是啥也不干, 非得等到客服答复你之后, 才去上厕所. 这种情景在代码世界里可能发生, 但似乎很智障.
异步, 非阻塞: 调用方发起调用直至得到结果之前这段时间, 可以做其它事情. 被调函数接收到参数后立即返回, 但在之后的某一个时间点才把运算结果传递给调用方. 这说起来很绕口, 举个栗子, 还是客服:
- 你拿出手机, 向客服发送消息, "在吗?". 然后把手机放桌子上, 转向上厕所去了.
- 客服收到你的消息, 机器人回复你"不好意思, 客服现在很忙, 但我们会尽快答复你的, 亲!".
- 你上厕所回来了, 看手机没消息, 又去吃饭了.
- 客服开始处理你的消息, 终于开始给你真正的回复"亲, 2333号客服为您服务, 你有什么要了解的吗?".
- 你吃饭的过程中, 手机震动, 你点开淘宝, 发现有了回复. 整个流程结束.
可以看到
- 阻塞方式下, 调用方总是能第一时间拿到调用结果. 因为在阻塞期间, 调用方啥也不干, 就等着函数返回结果. 非阻塞方式下, 调用方一般都是在函数返回了结果之后才去查看运算结果.
- 异步方式下, 被调用方可以推迟处理任务. 客服收到你的消息后可以先把饭吃完, 函数收到你的调用后并不一定立即就开始运算.
- 同步且阻塞, 双方都是杠精, 都是老实人. 理解起来比较自然.
- 异步非阻塞, 调用方不在乎什么时候能得到运算结果. 被调用方不在乎调用方着急不着急, 双方都是佛系青年. 理解起来也比较自然.
还有一个点要给大家介绍到, 就是回调函数. 在上面讲过, 异步调用, 需要函数以某种机制, 在运算结果得出之后, 将运算结果传递给调用方. 但回调函数又绕了一个弯.
假设没有回调函数机制, 异步流程就是:
- 顾客询问客服, "你们家有没有红色36D的胸罩啊? 我想给我老婆买一件, 我老婆的胸是36D的". 然后去上厕所去了
- 自动机器人向顾客回复"很忙哟, 请耐心等待"
- 客服开始处理顾客的询问. 去库房查货.
- 库房有货, 客服要想办法将这个信息送到顾客手中. 他通过淘宝客户端发表了答复, 淘宝客户端导致手机震动, 这个震动信号通知了顾客.
- 顾客在厕所正拉屎, 看到手机上的消息提醒, 思考了一分钟, 顾客下单购买了这个胸罩.
这个流程里顾客做了两件事:
- 询问客服"有没有36D的红色胸罩". 这是调用函数的行为
- 在得到肯定的答复之后, 下单购买了这个胸罩. 这是得到函数返回的运算结果, 并根据运算结果进一步执行程序流程.(调用了另外一个函数: 购买)
而淘宝客服只做了一件事:
- 查询库房里是否有货
而有了回调机制后, 异步流程就是这样的:
- 顾客询问客服, "你们家有没有红色36D的胸罩?". 然后顾客把手机交给秘书, 叮嘱道:"你盯着这个客服, 如果她说有, 你就下单买了, 地址写我家, 如果没有, 你就啥也不做". 然后顾客坐上了出差的飞机
- 自动机器人向顾客回复"很忙哟, 请耐心等待"
- 客服开始处理顾客的询问. 去库房查货.
- 库房有货, 客服要想办法将这个信息送到顾客手中. 他通过淘宝客户端发表了答复, 淘宝客户端导致手机震动, 这个震动信号通知了秘书.
- 秘书根据老板的指示, 下单购买了这个胸罩.
这个流程里, 顾客做了两件事:
- 询问客服"有没有36D的胸罩". 这是调用函数行为.
- 向秘书叮嘱. 这是向消息监控方注册回调函数的行为. 消息监控方负责接收函数的返回结果. 回调函数则是: "如果有, 就买给老板夫人, 如果没有, 就什么也不做"
淘宝客服只做了一件事:
- 查询库房里是否有货
而消息监控方, 也就是秘书, 做了一件事:
- 根据客服的答复选择不同的行为. 即在函数调用结果得出之后, 调用回调函数.
这就是回调函数的一个生动的例子, 回调函数机制中有了一个调用结果监控方, 就是秘书, 这个角色承担着非常重要的职责: 即是在函数返回结果之后, 调用对应的回调函数. 回调机制一般都实现在异步调用框架之中, 对于写代码的人来说是透明的, 它简化了调用方的职责与智力负担, 一定程度上抽象了代码逻辑, 简化了编程模型(注意: 是一定程度上!). 有了回调机制:
- 调用方不必再去关心函数返回结果以及返回时机. 不必通过轮询或其它方式去检查异步函数是否返回了结果.
- 调用方在调用时就向调用结果监控方注册合适的回调, 在调用函数那一刻, 将后续业务逻辑写在回调函数中, 只负责调用就行了. 代码越写越像状态机.
不过正所谓回调一时爽, 调试火葬厂. 写过JavaScript的同学对这一点一定是深有体会. 当程序不能正确运行的时候, 调试很蛋疼. 异步框架本身由于函数返回时机不确定, 调试就比较蛋疼, 再加上回调机制, 那真是火葬厂了. 特别是回调嵌套回调, 里面套个七八层的时候, 那真是把图灵从坟里挖出来也没用的绝望场景.
2. 异步IO与多路复用技术
我们先来看一段经典的同步且阻塞的HTTP客户端程序:
#include <netinet/in.h> // for socketaddr_in
#include <sys/socket.h> // for socket functions
#include <netdb.h> // for gethostbyname
#include <sys/errno.h> // for errno#include <unistd.h>
#include <string.h>
#include <stdio.h>int main(int argc, char ** argv)
{const char query[] = "GET / HTTP/1.0\r\n""Host: www.baidu.com\r\n""\r\n";const char hostname[] = "www.baidu.com";struct sockaddr_in sin;struct hostent * h;const char * cp;int fd;ssize_t n_written, remaining;char buf[4096];/** Look up the IP address for the hostname.* Watch out; this isn't threadsafe on most platforms.*/h = gethostbyname(hostname);if(!h){fprintf(stderr, "E: gethostbyname(%s) failed. ErrMsg: %s\n", hostname, hstrerror(h_errno));return -__LINE__;}if(h->h_addrtype != AF_INET){fprintf(stderr, "E: gethostbyname(%s) returned an non AF_INET address.\n", hostname);return -__LINE__;}/** Allocate a new socket*/fd = socket(AF_INET, SOCK_STREAM, 0);if(fd < 0){fprintf(stderr, "E: socket failed: %s\n", strerror(errno));return -__LINE__;}/** Connect to the remote host*/sin.sin_family = AF_INET;sin.sin_port = htons(80);sin.sin_addr = *((struct in_addr *)(h->h_addr));if(connect(fd, (struct sockaddr *)(&sin), sizeof(sin)) != 0){fprintf(stderr, "E: connect to %s failed: %s\n", hostname, strerror(errno));close(fd);return -__LINE__;}/** Write the query* XXX Can send succeed partially?*/cp = query;remaining = strlen(query);while(remaining){n_written = send(fd, cp, remaining, 0);if(n_written < 0){fprintf(stderr, "E: send failed: %s\n", strerror(errno));close(fd);return -__LINE__;}remaining -= n_written;cp += n_written;}/** Get an answer back*/while(1){ssize_t result = recv(fd, buf, sizeof(buf), 0);if(result == 0){break;}else if(result < 0){fprintf(stderr, "recv failed: %s\n", strerror(errno));close(fd);return -__LINE__;}fwrite(buf, 1, result, stdout);}close(fd);return 0;
}
在上面的示例代码里, 大部分有关网络与IO的函数调用, 都是阻塞式的. 比如gethostbyname
, 在DNS解析成功域名之前是不返回的(或者解析失败了会返回失败), connect
函数, 在与对端主机成功建立TCP连接之前是不返回的(或者连接失败), 再比如recv
与send
函数, 在成功操作, 或明确失败之前, 也是不返回的.
阻塞式IO确实比较土, 上面的程序编译运行的时候, 如果你网络状况不好, 可能会卡一两秒才会读到百度的首页, 这卡就是因为阻塞IO的缘故. 当然, 虽然比较土, 但像这样的场合, 使用阻塞IO是没什么问题的. 但假如你想写一个程序同时读取两个网站的首页的话, 就比较麻烦了: 因为你不知道哪个网站会先响应你的请求.. 你可以写一些, 比如像下面这样的, 很土的代码:
char buf[4096];
int i, n;
while(i_still_want_to_read())
{for(i = 0; i < n_sockets; ++i){n = recv(fd[i], buf, sizeof(buf), 0);if(n == 0){handle_close(fd[i]);}else if(n < 0){handle_error(fd[i], errno);}else{handle_input(fd[i], buf, n);}}
}
如果你的fd[]
数组里有两个网站的连接, fd[0]
接着百度, fd[1]
接着hao123, 假如hao123正常响应了, 可以从fd[1]
里读出数据了, 但百度的服务器被李老板炸了, 响应不了了, 这时, 上面的代码就会卡在i==0
时循环里的n = recv(fd[0], buf, sizeof(buf), 0)
这条语句中, 直到李老板把服务器修好. 这就很蛋疼.
当然, 你可以用多线程解决这个问题, 多数情况下, 你有一个问题, 你尝试使用多线程解决, 然后你多个了有问题.
上面是一个冷笑话, 多线程或多进程是一个解决方案, 通常情况下, 最简单的套路是使用一个线程或进程去建立TCP连接, 然后连接建立成功后, 为每个连接创建独立的线程或进程来进行IO读写. 这样即使一个网站抽风了, 也只阻塞属于它自己的那个读写线程或进程, 不会影响到其它网站的响应.
下面是另外一个例子程序, 这是一个服务端程序, 监听40173端口上的TCP连接请求, 然后把客户端发送的数据按ROT13法再回写给客户端, 一次处理一行数据. 这个程序使用Unix上的fork()
函数为每个客户端的连接创建一个独立的处理进程.
#include <netinet/in.h> // for sockaddr_in
#include <sys/socket.h> // for socket functions
#include <sys/errno.h> // for errno#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>#define MAX_LINE 16384char rot13_char(char c)
{if((c >= 'a' && c <= 'm') ||(c >= 'A' && c <= 'M')){return c+13;}else if((c >= 'n' && c <= 'z') ||(c >= 'N' && c <= 'Z')){return c-13;}else{return c;}
}void child(int fd)
{char outbuf[MAX_LINE + 1]; // extra byte for '\0'size_t outbuf_used = 0;ssize_t result;while(1){char ch;result = recv(fd, &ch, 1, 0);if(result == 0){break;}else if(result == -1){perror("read");break;}if(outbuf_used < sizeof(outbuf)){outbuf[outbuf_used++] = rot13_char(ch);}if(ch == '\n'){send(fd, outbuf, outbuf_used, 0);outbuf_used = 0;continue;}}
}void run(void)
{int listener;struct sockaddr_in sin;sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);listener = socket(AF_INET, SOCK_STREAM, 0);int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0){perror("bind");return;}if(listen(listener, 16) < 0){perror("listen");return;}while(1){struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr *)(&ss), &slen);if(fd < 0){perror("accept");}else{if(fork() == 0){child(fd);exit(0);}}}
}int main(int argc, char ** argv)
{run();return 0;
}
你可以使用下面的命令行, 通过netcat
工具向本机的40713发送数据, 来试验一下上面的服务端代码:
printf "abcdefghijklmnopqrstuvwxyz\n" | nc -4 -w1 localhost 40713
多进程或多线程确实是一个还算是比较优雅的, 应对并发连接的解决方案. 这种解决方案的缺陷是: 进程或线程的创建是有开销的, 在某些平台上, 这个开销还是比较大的. 这里优化的方案是使用线程, 并使用线程池策略. 如果你的机器需要处理上千上万的并发连接, 这就意味着你需要创建成千上万个线程, 想象一下, 服务器一般也就十几个核心, 64个不得了了, 如果有五千并发连接, 5000个线程排除轮64个核心的大米, 线程调度肯定是个大开销.
这个时候我们就需要了解一下非阻塞了, 通过下面的Unix调用, 可以将一个文件描述符设置为"非阻塞"的. 明确一下: "非阻塞"描述的是IO函数的行为, 将一个文件描述符设置为"非阻塞"的, 其实是指, 在这个文件描述符上执行IO操作, 函数的行为会变成非阻塞的.
fcntl(fd, F_SETFL, O_NONBLOCK);
当这个文件描述符是socket的文件描述符时, 我们一般也会直接称, "把一个socket设置为非阻塞". 将一个socket设置为非阻塞之后, 在对应的文件描述符上, 无论是执行网络编程相关的函数, 还是执行IO相关的函数, 函数行为都会变成非阻塞的, 即函数在调用之后就立即返回: 要么立即返回成功, 要把立即告诉调用者: "暂时不可用, 请稍后再试"
有了非阻塞这种手段, 我们就可以改写我们的访问网页程序了: 我们这时可以正确的处理同时下载两个网站的数据的需求了. 代码片断如下:
int i, n;
char buf[1024];for(i = 0; i < n_sockets; ++i)
{fcntl(fd[i], F_SETFL, O_NONBLOCK);
}while(i_still_want_to_read)
{for(int i = 0; i < n_sockets; ++i){n = recv(fd[i], buf, sizeof(buf), 0);if(n == 0){handle_close(fd[i]); // peer was closed}else if(n < 0){if(errno == EAGAIN){// do nothing, the kernel didn't have any data for us to read// retry}else{handle_error(fd[i], errno);}}else{handle_input(fd[i], buf, n); // read success}}
}
这样写确实解决了问题, 但是, 在对端网站还没有成功响应的那几百毫秒里, 这段代码将会疯狂的死循环, 会把你的一个核心占满. 这是一个很蛋疼的解决方案, 原因是: 对于真正的数据何时到达, 我们无法确定, 只能开个死循环轮询.
旧式的改进方案是使用一个叫select()
的系统调用函数. select()
函数内部维护了三个集合:
- 有数据可供读取的文件描述符
- 可以进行写入操作的文件描述符
- 出现异常的文件描述符
select()
函数在这三个集合有至少一个集合不为空的时候返回. 如果三个集合都为空, 那么select()
函数将阻塞.
下面是使用select()
改进后的代码片断:
fd_set readset;
int i, n;
char buf[1024];while(i_still_want_to_read)
{int maxfd = -1;FD_ZERO(&readset);// add all of the interesting fds to readsetfor(i = 0; i < n_sockets; ++i){if(fd[i] > maxfd){maxfd = fd[i];}FD_SET(fd[i], &readset):}select(maxfd+1, &readset, NULL, NULL, NULL);for(int i = 0; i < n_sockets; ++i){if(FDD_ISSET(fd[i], &readset)){n = recv(fd[i], &readset);if(n == 0){handle_close(fd[i]);}else if(n < 0){if(errno == EAGAIN){// the kernel didn't have any data for us to read}else{handle_error(fd[i], errno);}}else{handle_input(fd[i], buf, n);}}}
}
使用select()
改进了程序, 但select()
蛋疼的地方在于: 它只告诉你, 三集合中有数据了, 但是: 哪个fd可读, 哪个fd可写, 哪个fd有异常, 这些具体的信息, 它还是没告诉你. 如果你的fd数量不多, OK, 上面的代码没什么问题, 但如果你持有着上千个并发连接, 那每次select()
返回时, 你都需要把所有fd都轮一遍.
下面是使用select()
调用对rot13服务端示例代码的重构
#include <netinet/in.h> // for sockaddr_in
#include <sys/socket.h> // for socket functions
#include <sys/errno.h> // for errno
#include <fcntl.h> // for fcntl
#include <sys/select.h> // for select#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>#define MAX_LINE 16384char rot13_char(char c)
{if((c >= 'a' && c <= 'm') ||(c >= 'A' && c <= 'M')){return c+13;}else if((c >= 'n' && c <= 'z') ||(c >= 'N' && c <= 'Z')){return c-13;}else{return c;}
}struct fd_state{char buffer[MAX_LINE];size_t buffer_used;int writing;size_t n_written;size_t write_upto;
};struct fd_state * alloc_fd_state(void)
{struct fd_state * state = malloc(sizeof(struct fd_state));if(!state){return NULL;}state->buffer_used = state->n_written = state->writing = state->write_upto = 0;return state;
}void free_fd_state(struct fd_state * state)
{free(state);
}void make_nonblocking(int fd)
{fcntl(fd, F_SETFL, O_NONBLOCK);
}int do_read(int fd, struct fd_state * state)
{char buf[1024];int i;ssize_t result;while(1){result = recv(fd, buf, sizeof(buf), 0);if(result <= 0){break;}for(int i = 0; i < result; ++i){if(state->buffer_used < sizeof(state->buffer)){state->buffer[state->buffer_used++] = rot13_char(buf[i]);}if(buf[i] == '\n'){state->writing = 1;state->write_upto = state->buffer_used;}}}if(result == 0){return 1;}else if(result < 0){if(errno == EAGAIN){return 0;}return -1;}return 0;
}int do_write(int fd, struct fd_state * state)
{while(state->n_written < state->write_upto){ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0);if(result < 0){if(errno == EAGAIN){return 0;}return -1;}assert(result != 0);state->n_written += result;}if(state->n_written == state->buffer_used){state->n_written = state->write_upto = state->buffer_used = 0;}state->writing = 0;return 0;
}void run(void)
{int listener;struct fd_state * state[FD_SETSIZE];struct sockaddr_in sin;int i, maxfd;fd_set readset, writeset, exset;sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);for(i = 0; i < FD_SETSIZE; ++i){state[i] = NULL;}listener = socket(AF_INET, SOCK_STREAM, 0);make_nonblocking(listener);int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0){perror("bind");return;}if(listen(listener, 16) < 0){perror("listen");return;}FD_ZERO(&readset);FD_ZERO(&writeset);FD_ZERO(&exset);while(1){maxfd = listener;FD_ZERO(&readset);FD_ZERO(&writeset);FD_ZERO(&exset);FD_SET(listener, &readset);for(i = 0; i < FD_SETSIZE; ++i){if(state[i]){if(i > maxfd){maxfd = i;}FD_SET(i, &readset);if(state[i]->writing){FD_SET(i, &writeset);}}}if(select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0){perror("select");return;}if(FD_ISSET(listener, &readset)){struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr *)(&ss), &slen);if(fd < 0){perror("accept");}else if(fd > FD_SETSIZE){close(fd);}else{make_nonblocking(fd);state[fd] = alloc_fd_state();assert(state[fd]);}}for(i = 0; i < maxfd + 1; ++i){int r = 0;if(i == listener){continue;}if(FD_ISSET(i, &readset)){r = do_read(i, state[i]);}if(r == 0 && FD_ISSET(i, &writeset)){r = do_write(i, state[i]);}if(r){free_fd_state(state[i]);state[i] = NULL;close(i);}}}
}int main(int argc, char ** argv)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
但这样还不够好: FD_SETSIZE
是一个很大的值, 至少不小于1024. 当要监听的fd的值比较大的时候, 就很恶心, 遍历会遍历很多次. 对于非阻塞IO接口来讲, select
是一个很粗糙的解决方案, 这个系统调用提供的功能比较薄弱, 只能说是够用, 但接口确实太屎了, 不好用, 性能也堪优.
不同的操作系统平台上提供了很多select
的替代品, 它们都用于配套非阻塞IO接口来使单线程程序也有一定的并发能力. 这些替代品有poll()
, epoll()
, kqueue()
, evports
和/dev/poll
. 并且这些替代品的性能都比select()
要好的多. 但比较蛋疼的是, 上面提到的所有接口, 几乎都不是跨平台的. epoll()
是Linux独有的, kqueue()
是BSD系列(包括OS X)独有的. evports
和/dev/poll
是Solaris独有的. 是的, select()
属于POSIX标准的一部分, 但就是性能捉急. 也就是说, 如果你写的程序想跨平台, 高性能, 你就得自己写一层抽象, 把不同平台对于IO多路复用的底层统一起来: 这也就是Libevent干的事情.
libevent的低级API为IO多路复用提供了统一的接口, 其底层实现在不同的操作系统平台上都是最高效的实现.
下面, 我们将使用libevent对上面的程序进行重构. 注意: fd_sets
不见了, 取而代之的是一个叫event_base
的结构体.
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>#include <event2/event.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>#define MAX_LINE 16384void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);char
rot13_char(char c)
{/* We don't want to use isalpha here; setting the locale would change* which characters are considered alphabetical. */if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}struct fd_state {char buffer[MAX_LINE];size_t buffer_used;size_t n_written;size_t write_upto;struct event *read_event;struct event *write_event;
};struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{struct fd_state *state = malloc(sizeof(struct fd_state));if (!state)return NULL;state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);if (!state->read_event) {free(state);return NULL;}state->write_event =event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);if (!state->write_event) {event_free(state->read_event);free(state);return NULL;}state->buffer_used = state->n_written = state->write_upto = 0;assert(state->write_event);return state;
}void
free_fd_state(struct fd_state *state)
{event_free(state->read_event);event_free(state->write_event);free(state);
}void
do_read(evutil_socket_t fd, short events, void *arg)
{struct fd_state *state = arg;char buf[1024];int i;ssize_t result;while (1) {assert(state->write_event);result = recv(fd, buf, sizeof(buf), 0);if (result <= 0)break;for (i=0; i < result; ++i) {if (state->buffer_used < sizeof(state->buffer))state->buffer[state->buffer_used++] = rot13_char(buf[i]);if (buf[i] == '\n') {assert(state->write_event);event_add(state->write_event, NULL);state->write_upto = state->buffer_used;}}}if (result == 0) {free_fd_state(state);} else if (result < 0) {if (errno == EAGAIN) // XXXX use evutil macroreturn;perror("recv");free_fd_state(state);}
}void
do_write(evutil_socket_t fd, short events, void *arg)
{struct fd_state *state = arg;while (state->n_written < state->write_upto) {ssize_t result = send(fd, state->buffer + state->n_written,state->write_upto - state->n_written, 0);if (result < 0) {if (errno == EAGAIN) // XXX use evutil macroreturn;free_fd_state(state);return;}assert(result != 0);state->n_written += result;}if (state->n_written == state->buffer_used)state->n_written = state->write_upto = state->buffer_used = 1;event_del(state->write_event);
}void
do_accept(evutil_socket_t listener, short event, void *arg)
{struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0) { // XXXX eagain??perror("accept");} else if (fd > FD_SETSIZE) {close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */} else {struct fd_state *state;evutil_make_socket_nonblocking(fd);state = alloc_fd_state(base, fd);assert(state); /*XXX err*/assert(state->write_event);event_add(state->read_event, NULL);}
}void
run(void)
{evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();if (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {perror("bind");return;}if (listen(listener, 16)<0) {perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);/*XXX check it */event_add(listener_event, NULL);event_base_dispatch(base);
}int
main(int c, char **v)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
总之:
- 注意链接的时候加上 -levent
- 代码量没有减少, 逻辑也没有简化. libevent只是给你提供了一个通用的多路IO接口. 或者叫事件监听接口.
evutil_socket_t
类型的使用, 与evutil_make_socket_nonblocking()
函数的使用, 均是为也跨平台兼容性. 使用这些类型名与工具函数, 使得在windows平台上代码也能跑起来.
现在, 你看, 异步IO+事件处理(或者叫多路IO复用), 是单线程单进程程序取得并发能力的最佳途径, 而libevent则是把多平台的IO多路复用库给你抽象统一成一层接口了. 这样代写的代码不需要改动, 就可以运行在多个平台上.
这样就有了三个问题:
- 如果我的代码需要跨平台, 或者只需要跨部分平台(比如我只考虑Linux和BSD用户, 完全不考虑Windows平台), 我为什么不自己把多路IO库做个简单的封装, 为什么要使用libevent呢? 典型的就是Redis, 用了很薄的一层封装, 下面统一了
epoll
,kqueue
,evport
,select
等. 为什么, 我需要使用libevent呢? - 如果将libevent作为一个黑盒去用, 不可避免的问题就是: 它的性能怎么样? 它封装了多个多路IO库, 在封装上是否有性能损失?
- 现在是个轮子都说自己解决了跨平台问题, 那么libevent在windows上表现怎么样? 它能兼容IOCP式多路IO库吗? 毕竟IOCP的设计思路和
epoll``select``evport``kqueue
等都不一样.
答案在这里:
- 你没有任何理由非得使用libevent, redis就是一个很好的例子. libevent有不少功能, 但如果你只是跨小部分平台, 并且只关注在多路IO复用上, 那么真的没什么必要非得用libevent. 你完全可以像redis那样, 用几百行简单的把多路IO库自己封装一下.
- 基本上这么讲吧: 你使用系统原生异步IO多路复用接口的性能是多少, 使用libevent就是多少. 说实施libevent里没太多的抽象, 接口也没有多么好用, 封闭很薄, 和你使用原生接口基本一样.
- libevent从版本2开始就能搞定windows了. 上面我们使用的是libevent很底层的接口, 其设计思路是遵循*nix上的事件处理模型的, 典型的就是
select
与epoll
: 当网络可读写时, 通知应用程序去读去写. 而windows上IOCP的设计思路是: 当网络可读可写时不通知应用程序, 而是先完成读与写, 再通知应用程序, 应用程序直接拿到的就是数据. 当在libevent 2提供的bufferevents
系列接口中, 它将*nix平台下的设计, 改巴改巴改成了IOCP式的. 使用这个系列的接口不可避免的, 对*nix平台有性能损失(这和asio封装网络库是一样的做法), 但实话讲, IOCP式的设计确实对程序员更友好, 代码可读性高了不少.
总的来说, 你应该在如下的场合使用libevent
- 代码需要跨多个平台, 甚至是windows
- 想在*nix平台上使用IOCP式的事件接口编程
- 你不想自己封装多个平台上的多路IO接口, 并且自认为, 就算自己做, 做的也肯定没有libevent好. libevent是一个久经考验的很基础的库. 身经百战.
下面是使用bufferevents
系列接口, 以IOCP式风格对之前例子代码的重构, 体验一下更人性的事件处理方式:
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>#define MAX_LINE 16384void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);char
rot13_char(char c)
{/* We don't want to use isalpha here; setting the locale would change* which characters are considered alphabetical. */if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}void
readcb(struct bufferevent *bev, void *ctx)
{struct evbuffer *input, *output;char *line;size_t n;int i;input = bufferevent_get_input(bev);output = bufferevent_get_output(bev);while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {for (i = 0; i < n; ++i)line[i] = rot13_char(line[i]);evbuffer_add(output, line, n);evbuffer_add(output, "\n", 1);free(line);}if (evbuffer_get_length(input) >= MAX_LINE) {/* Too long; just process what there is and go on so that the buffer* doesn't grow infinitely long. */char buf[1024];while (evbuffer_get_length(input)) {int n = evbuffer_remove(input, buf, sizeof(buf));for (i = 0; i < n; ++i)buf[i] = rot13_char(buf[i]);evbuffer_add(output, buf, n);}evbuffer_add(output, "\n", 1);}
}void
errorcb(struct bufferevent *bev, short error, void *ctx)
{if (error & BEV_EVENT_EOF) {/* connection has been closed, do any clean up here *//* ... */} else if (error & BEV_EVENT_ERROR) {/* check errno to see what error occurred *//* ... */} else if (error & BEV_EVENT_TIMEOUT) {/* must be a timeout event handle, handle it *//* ... */}bufferevent_free(bev);
}void
do_accept(evutil_socket_t listener, short event, void *arg)
{struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0) {perror("accept");} else if (fd > FD_SETSIZE) {close(fd);} else {struct bufferevent *bev;evutil_make_socket_nonblocking(fd);bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);bufferevent_enable(bev, EV_READ|EV_WRITE);}
}void
run(void)
{evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();if (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {perror("bind");return;}if (listen(listener, 16)<0) {perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);/*XXX check it */event_add(listener_event, NULL);event_base_dispatch(base);
}int
main(int c, char ** argv)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
说实话也没人性到哪里去, 底层库就是这样, libevent还是太基础了. 算不上十分友好的轮子.
3. Libevent 简介
现在我们要正式介绍Libevent
3.1 libevent的卖点
- 代码跨平台.
- 性能高. libevent在非阻塞IO+多路复用的底层实现上, 选取的是特定平台上最快的接口. 比如Linux上用
epoll
, BSD上用kqueue
- 高并发可扩展. libevent就是为了那种, 需要维持成千上万的活动socket连接的应用程序使用的.
- 接口友好. 虽然并没有友好多少, 但至少比原生的
epoll
要好一点.
3.2 libevent下的各个子模块
evutil
通用类型定义, 跨平台相关的通用定义, 以及一些通用小函数event and event_base
核心模块. 事件库. *nix风格的事件模型: 在socket可读可写时通知应用程序.bufferevent
对核心事件库的再一层封装, IOCP式的事件模型: 在数据已读已写后通知应用程序evbuffer
这是bufferevent
模块内部使用的缓冲区实现.evhttp
简单的HTTP C/S实现evdns
简单的 DNS C/S实现evrpc
简单的 RPC实现
总的来说, 作为使用者, 需要关心的是:
evutil
是需要关心的- 对于主在*nix平台上写后台服务端程序的人: 只需要关心
event and event_base
核心库的用法即可. - 对于跨平台, 特别是包含win平台的开发人员: 需要关注
bufferevent
和evbuffer
, 对于核心库event and event_base
, 可以不关心 evhttp
,evdns
,evrpc
, 如无需要, 可以不用关心
3.3 libevent下的二进制库
以下是在链接你的代码的时候, 你需要了解的二进制库.
libevent_core
包含event and event_base
,evutil
,evbuffer
,bufferevent
中的所有函数libevent_extra
包含协议相关的函数. 包括 HTTP/DNS/RPC 等. 如果你用不到evhttp/evdns/evrpc
里的函数, 那么这个库不用链接.libevent
满清遗老, 包含了上面两个库里的所有函数. 官方不建议在使用libevent 2.0以上的版本时链接这个库. 这是个懒人库.libevent_pthreads
如果你编写多线程应用程序. 那么这个库里包含了基于POSIX线程库的相关函数实现. 如果你没有用到libevent中有关的多线程函数, 那么这个库不用链接. 以前这些函数是划分在libevent_core
中的, 后来被单独割出来了.注意: 这个库不是全平台的.libevent_openssl
这个库里的与OpenSSL相关的函数实现. 如果你没有用到libevent中有关OpenSSL的函数, 那么这个库不用链接. 以前这些函数也算在libevent_core
中, 最后也割出来了. 注意: 这个库也不是全平台的
3.4 libevent中的头文件
libevent中的头文件分为三类, 所有头文件都位于event2
目录下. 也就是说在代码中你应当这样写:
#include <event2/xxxx>
#include <event2/xxxx>
#include <event2/xxxx>
#include <event2/xxxx>
具体有哪些头文件在后续章节会详细介绍, 目前只介绍这个分类:
- API 头文件. 这些头文件定义了libevent对外的接口. 这些头文件没有特定前缀.
- 兼容性 头文件. 这些头文件是为了向前兼容老版本libevent存在的, 它们里面定义了老版本的一些废弃接口. 除非你是在做老代码迁移工作, 否则不建议使用这些头文件.
- 类型定义 头文件. 定义了libevent库中相关的类型. 这些头文件有共同后缀
_struct.h
3.5 如何将老版本的代码迁移到libevent 2上
官方建议大家使用版本2, 但有时候这个世界就是不那么让人舒服, 如果你需要和版本1的历史代码打交道, 你可以参照下面的对照表: 老头文件与新头文件的对照表
旧头文件 | 新头文件 |
---|---|
event.h | event2/event*.h, event2/buffer*.h, event2/bufferevent*.h, event2/tag*.h |
evdns.h | event2/dns*.h |
evhttp.h | event2/http*.h |
evrpc.h | event2/rpc*.h |
evutil.h | event2/util*.h |
在当前的2.0版本中, 老的旧头文件其实是不需要替换的, 这些旧头文件依然存在. 但还是建议将他们替换成新头文件, 因为说不定50年后libevent升级到3.0版本, 这些旧头文件就被扔了.
另外还有一些点需要你注意:
- 在1.4版本之前, 只有一个二进制库文件.
libevent
, 里面是libevent的所有实现. 如今这些实现被分割到了libevent_core
和libevent_extra
两个库中. - 在2.0之前, libevent不支持锁. 也就是说, 2.0之前如果要写出线程安全的代码, 你只能避免在线程间共享数据实例. 没有其它办法.
3.6 满清遗老
官方对待老版本是这样建议的:
- 1.4.7之前的版本被正式废弃了
- 1.3之前的版本有一堆bug, 用的时候看脸吧.
- 推荐使用2.0后的版本
我对老版本的态度是这样的: 能干活就好. 没有特殊原因, 我是不会做代码迁移的. 并且考虑到应用场景, 有时候用老版本也挺好的.
1.4.x版本的libevent被大量项目使用, 其实挺稳定的, 官方不建议使用, 只是官方不再在1.4版本上再加特性修bug了. 1.4版本最后的一个小版本号就停止在7上不动了. 而对于1.3版本, 确实不应该再碰了.
4. 使用libevent的正确姿势
libevent有几项全局设定, 如果你需要改动这几项设定, 那么确保在代码初始化的时候设定好值, 一旦你的代码流程开始了, 调用了第一个libevent中的任何函数, 后续强烈建议不要再更改设定值, 否则会引起不可预知的后果.
4.1 libevent中的日志
libevent默认情况下将把错误与警告日志写进stderr
, 并且如果你需要一些libevent内部的调试日志的话, 也可以通过更改设定来让其输出调试日志, 以在程序崩溃时提供更多的参考信息. 这些行为都可以通过自行实现日志函数进行更改. 下面是libevent相关的日志接口.
// 以下是日志级别
#define EVENT_LOG_DEBUG 0
#define EVENT_LOG_MSG 1
#define EVENT_LOG_WARN 2
#define EVENT_LOG_ERR 3// 以下是已经被废弃的日志级别定义
/* Deprecated; see note at the end of this section */
#define _EVENT_LOG_DEBUG EVENT_LOG_DEBUG
#define _EVENT_LOG_MSG EVENT_LOG_MSG
#define _EVENT_LOG_WARN EVENT_LOG_WARN
#define _EVENT_LOG_ERR EVENT_LOG_ERR// 这个是一个函数指针类型别名, 指向日志输出函数
// 日志输出函数应当是一个双参, 无返回值的函数, 第一个参数severity为日志级别, 第二个参数为日志字符串
typedef void (*event_log_cb)(int severity, const char *msg);// 这是用户自定义设置日志处理函数的接口. 如果调用该函数时入参设置为NULL
// 则将采用默认行为
void event_set_log_callback(event_log_cb cb);
比如下面我需要改写libevent记录日志的方式:
#include <event2/event.h>
#include <stdio.h>// 丢弃日志
static void discard_cb(int severity, const char * msg)
{// 这个日志函数内部什么也不做
}// 将日志记录至文件
static FILE * logfile = NULL;
static void write_to_file_cb(int severity, const char * msg)
{const char * s;if(!logfile){return;}switch(severity){case EVENT_LOG_DEBUG: s = "[DBG]"; break;case EVENT_LOG_MSG: s = "[MSG]"; break;case EVENT_LOG_WARN: s = "[WRN]"; break;case EVENT_LOG_ERR: s = "[ERR]"; break;default: s = "[???]"; break;}fprintf(logfile, "[%s][%s][%s] %s\n", __FILE__, __func__, s, msg);
}void suppress_logging(void)
{event_set_log_callback(discard_cb);
}void set_logfile(FILE * f)
{logfile = f;event_set_log_callback(write_to_file_cb);
}
注意: 在自定义的日志输出函数中, 不要调用其它libevent中的函数! 比如, 如果你要把日志远程输出至网络socket上去, 你还使用了bufferevent来输出你的日志, 在目前的libevent版本中, 这会导致一些奇怪的bug. libevent官方也承认这是一个设计上没有考虑到的点, 这可能在后续版本中被移除, 但截止目前的2.1.8 stable版本, 这个问题都还没有解决. 不要作死.
默认情况下的日志级别是EVENT_LOG_MSG
, 也就是说EVENT_LOG_DEBUG
级别的日志不会调用至日志输出函数. 要让libevent输出调试级别的日志, 请使用下面的接口:
#define EVENT_DBG_NONE 0
#define EVENT_DBG_ALL 0xffffffffu// 如果传入 EVENT_DBG_NONE, 将保持默认状态: 不输出调试日志
// 如果传入 EVENT_DEG_ALL, 将开启调试日志的输出
void event_enable_debug_logging(ev_uint32_t which);
调试日志很详尽, 通常情况下对于libevent的使用者而言是没有输出的必要的. 因为要用到调试级别日志的场合, 是你百般无奈, 开始怀疑libevent本身有bug的时候. 虽然从宏的命名上, 仿佛还存在着 EVENT_DGB_SOMETHING
这样, 可以单独控制某个模块的调试日志输出的参数, 但实际上并没有: 调试日志要么全开, 要么全关. 没有中间地带. 官方宣称可能在后续的版本中细化调试日志的控制.
而如果你要控制其它日志级别的输出与否, 请自行实现日志输出函数. 比如忽略掉EVENT_LOG_MSG
级别的日志之类的. 上面的接口只是控制"如果产生了调试日志, libevent调用或不调用日志输出函数"而已.
上面有关日志的接口均定义在<event2/event.h>
中.
- 日志输出函数相关接口早在版本1.0时就有了.
event_enable_debug_logging()
接口在2.1.1版本之后才有- 日志级别宏名, 在2.0.19之前, 是以下划线开头的, 即
_DEBUG_LOG_XXX
, 但现在已经废弃掉了这种定义, 在新版本中请使用不带下划线开头的版本.
4.2 正确处理致命错误
当libevent检测到有致命的内部错误发生时(比如踩内存了之类的不可恢复的错误), 默认行为是调用exit()
或abort()
. 出现这种情况99.99的原因是使用者自身的代码出现了严重的bug, 另外0.01%的原因是libevent自身有bug.
如果你希望在进程退出之前做点额外的事情, 写几行带fxxk的日志之类的, libevent提供了相关的入口, 这可以改写libevent对待致命错误的默认行为.
typedef void (*event_fatal_cb)(int err);
void event_set_fatal_callback(event_fatal_cb cb);
注意, 不要试图强行恢复这种致命错误, 也就是说, 虽然libevent给你提供了这么个接口, 但不要在注册的函数中试图让进程继续执行. 因为这个时候libevent内部已经有坑了, 如果继续强行恢复, 结果是不可预知的. 换个说法: 这个函数应该提供的是临终遗言, 而不应该试图救死扶伤.
这个函数也定义在 <event2/event.h>
中, 在2.0.3版本之后可用.
4.3 内存管理
默认情况下, libevent使用的是标准C库中的内存管理函数, 即malloc()
, realloc()
, free()
等. libevent允许你使用其它的内存管理库, 比如tcmalloc
或jemalloc
. 相关接口如下:
void event_set_mem_functions(void *(*malloc_fn)(size_t sz),void *(*realloc_fn)(void *ptr, size_t sz),void (*free_fn)(void *ptr));
接口的第一个参数是内存分配函数指针, 第二个参数是内存重分配函数指针, 第三个参数是内存释放函数指针.
下面是一个使用的例子:
#include <event2/event.h>
#include <sys/types.h>
#include <stdlib.h>union alignment
{size_t sz;void * ptr;double dbl;
};#define ALIGNMENT sizeof(union alignment)#define OUTPTR(ptr) (((char *)ptr) + ALIGNMENT)
#define INPTR(ptr) (((char *)ptr) - ALIGNMENT)static size_t total_allocated = 0;static void * my_malloc(size_t sz)
{void * chunk = malloc(sz + ALIGNMENT);if(!chunk) return chunk;total_allocated += sz;*(size_t *)chunk = sz;return OUTPTR(chunk);
}static void * my_realloc(void * ptr, size_t sz)
{size_t old_size = 0;if(ptr){ptr = INPTR(ptr);old_size = *(size_t*)ptr;}ptr = realloc(ptr, sz + ALIGNMENT);if(!ptr){return NULL;}*(size_t *)ptr = sz;total_allocated = total_allocated - old_size + sz;return OUTPTR(ptr);
}static void my_free(void * ptr)
{ptr = INPTR(ptr);total_allocated -= *(size_t *)ptr;free(ptr);
}void start_counting_bytes(void)
{event_set_mem_functions(my_malloc, my_realloc, my_free);
}
上面这个例子中, 提供了一种记录全局内存使用量的简单方案, 非线程安全.
对于自定义内存管理接口, 需要注意的有:
- 再次重申, 这是一个全局设定, 一旦设定, 后续所有的libevent函数内部的内存操作都会受影响. 并且不要在代码流程中途更改设定.
- 自定义的内存管理函数, 在分配内存时, 返回的指针后必须确保在至少
sz
个字节可用. - 自定义的内存重分配函数, 必须正确处理
realloc(NULL, sz)
这种情况: 即, 使之行为等同于malloc(sz)
. 也必须正确处理realloc(ptr, 0)
这种情况: 即, 使之行为与free(ptr)
相同且返回NULL. - 自定义的内存释放函数, 必须正确处理
free(NULL)
: 什么也不做. - 自定义的内在分配函数, 必须正确处理
malloc(0)
: 返回NULL. - 如果你在多线程环境中使用libevent, 请务必确保内存分配函数是线程安全的.
- 如果你要释放一个由libevent创建来的内存区域, 请确认你使用的
free()
版本与libevent内部使用的内存管理函数是一致的. 也就是说: 如果要操作libevent相关的内存区域, 请确保相关的内存处理函数和libevent内部使用的内在管理函数是一致的. 或者简单一点: 如果你决定使用某个内存管理库, 那么在整个项目范围内都使用它, 这样最简单, 不容易出乱子. 否则应该尽力避免在外部操作libevent创建的内存区域.
event_set_mem_functions()
接口也定义在<event2/event.h>
中, 在2.0.2版本后可用.
需要注意的是: libevent在编译安装的时候, 可以关闭event_set_mem_functions()
这个特性. 如果关闭了这个特性, 而在项目中又使用了这个特性, 那么在项目编译时, 编译将报错. 如果要检测当前引入的libevent库是否启用了这个功能, 可以通过检测宏EVENT_SET_MEM_FUNCTIONS_IMPLEMENTED
宏是否被定义来判断.
4.4 线程与锁
多线程程序设计里的数据访问是个大难题. 目前的版本里, libevent支持了多线程编程, 但这个支持法呢, 怎么讲呢, 使用者还是需要知道不少细节才能正确的写出多线程应用. libevent中的数据结构分为三类:
- 有一些数据结构就是非线程安全的. 这是历史遗留问题, libevent在大版本号更新为2后才支持多线程, 这些数据结构是从版本1一路继承下来的, 不要在多线程中共享这些实例. 没办法.
- 有一些数据结构的实例可以用锁保护起来, 以在多线程环境中共享. 如果你需要在多个线程中访问某个实例, 那么你需要给libevent说明这个情况, 然后libevent会为这个实例加上适当的锁保护, 以确保你在多线程访问它时是安全的. 加锁不需要你去加, 你需要做的只是告诉libevent一声, 如何具体操作后面再讲.
- 有些数据结构, 天生就是带锁的. 如果你带
libevent_pthreads
库链接你的程序, 那么这些结构的实例在多线程环境中一定的安全的. 你想让它不安全都没办法.
虽然libevent为你写了一些加锁解锁的无聊代码, 你不必要手动为每个对象加锁了, 但libevent还是需要你指定加锁的函数. 就像你可以为libevent指定其它的内存管理库一样. 注意这也是一个全局设定, 请遵循我们一再强调的使用规则: 进程初始化时就定好, 后续不许再更改.
如果你使用的是POSIX线程库, 或者标准的windows原生线程库, 那么简单了一些. 设置加解锁函数只需要一行函数调用, 接口如下:
#ifdef WIN32
int evthread_use_windows_threads(void);
#define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
#endif#ifdef _EVENT_HAVE_PTHREADS
int evthread_use_pthreads();
#define EVTHREAD_USE_PTHREADS_IMPLEMENTED
#endif
这两个函数在成功时都返回0, 失败时返回-1.
这只是加解锁. 但如果你想要自定义的是整个线程库, 那么你就需要手动指定如下的函数与结构定义
- 锁的定义
- 加锁函数
- 解锁函数
- 锁分配函数
- 锁释放函数
- 条件变量定义
- 条件变量创建函数
- 条件变量释放函数
- 条件变量等待函数
- 通知/广播条件变量的函数
- 线程定义
- 线程ID检测函数
这里需要注意的是: libevent并不会为你写哪怕一行的多线程代码, libevent内部也不会去创建线程. 你要使用多线程, OK, 你用哪种线程库都行, 没问题. 但你需要将配套的锁/条件变量/线程检测函数以及相关定义告诉libevent, 这样libevent才会知道如何在多线程环境中保护自己的实例, 以供你在多线程环境中安全的访问.
- 如果你使用的是POSIX线程或者windows原生线程库, 就方便了一点, 调一行函数的事情.
- 如果你在使用POSIX纯种或windows原生线程库时, 你不想使用POSIX配套的锁, 那OK, 你在调用完
evthread_use_xxx_threads()
之后, 把你自己的锁函数或者条件变量函数提供给libevent就好了. 注意这种情况下, 在你的程序的其它地方也需要使用你指定的锁或条件变量. - 而如果你使用的是其它线程库, 也OK, 只不过麻烦一点, 要提供锁的相关信息, 要提供条件变量的相关信息, 也要提供线程ID检测函数
下面是相关的接口
// 锁模式是 lock 与 unlock 函数的参数, 它指定了加锁解锁时的一些额外信息
// 如果调用 lock 或 unlock 时的锁都不满足下面的三种模式, 参数传0即可
#define EVTHREAD_WRITE 0x04 // 锁模式: 仅对读写锁使用: 获取或释放写锁
#define EVTHREAD_READ 0x08 // 锁模式: 仅对读写锁使用: 获取或释放读锁
#define EVTHREAD_TRY 0x10 // 锁模式: 仅在加锁时使用: 仅在可以立即加锁的时候才去加锁. // 若当前不可加锁, 则lock函数立即返回失败, 而不是阻塞// 锁类型是 alloc 与 free 函数的参数, 它指定了创建与销毁的锁的类型
// 锁类型可以是 EVTHREAD_LOCKTYPE_XXX 之一或者为0
// 所有支持的锁类型均需要被登记在 supported_locktypes 中, 如果支持多种锁, 则多个宏之间用 | 连结构成该字段的值// 当锁类型为0时, 指的是普通的, 非递归锁
#define EVTHREAD_LOCKTYPE_RECURSIVE 1 // 锁类型: 递归锁, 你必须提供一种递归锁给libevent使用
#define EVTHREAD_LOCKTYPE_READWRITE 2 // 锁类型: 读写锁, 在2.0.4版本之前, libevent内部没有使用到读写锁#define EVTHREAD_LOCK_API_VERSION 1// 将你要用的有关锁的所有信息放在这个结构里
struct evthread_lock_callbacks {int lock_api_version; // 必须与宏 EVTHREAD_LOCK_API_VERSION的值一致unsigned supported_locktypes; // 必须是宏 EVTHREAD_LOCKTYPE_XXX 的或组合, 或为0void *(*alloc)(unsigned locktype); // 锁分配, 需要指定锁类型void (*free)(void *lock, unsigned locktype); // 锁销毁, 需要指定锁类型int (*lock)(unsigned mode, void *lock); // 加锁, 需要指定锁模式int (*unlock)(unsigned mode, void *lock); // 解锁, 需要指定锁模式
};// 调用该函数以设置相关锁函数
int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *);// 调该函数以设置线程ID检测函数
void evthread_set_id_callback(unsigned long (*id_fn)(void));// 将你要用的有关条件变量的所有信息都放在这个结构里
struct evthread_condition_callbacks {int condition_api_version;void *(*alloc_condition)(unsigned condtype);void (*free_condition)(void *cond);int (*signal_condition)(void *cond, int broadcast);int (*wait_condition)(void *cond, void *lock,const struct timeval *timeout);
};// 通过该函数以设置相关的条件变量函数
int evthread_set_condition_callbacks(const struct evthread_condition_callbacks *);
要探究具体如何使用这些函数, 请看libevent源代码中的evthread_pthread.c
与evthread_win32.c
文件.
对于大多数普通用户来说, 只需要调用一下evthread_use_windows_threads()
或evthread_use_pthreads()
就行了.
上面这些函数均定义在 <event2/thread.h>
中. 在2.0.4版本后这些函数才可用. 2.0.1至2.0.3版本中使用了一些旧接口, event_use_pthreads()
等. 有关条件变量的相关接口直至2.0.7版本才可用, 引入条件变量是为了解决之前libevent出现的死锁问题.
libevent本身可以被编译成不支持锁的二进制库, 用这种二进制库链接你的多线程代码, bomshakalaka, 跑不起来. 这算是个无用知识点.
另外额外注意: 多线程程序, 并且还使用了POSIX线程库和配套的锁, 那么你需要链接libevent_pthreads
. windows平台则不用.
4.5 小知识点: 有关锁的调试
libevent有一个额外的特性叫"锁调试", 开启这种特性后, libevent将把它内部有关锁的所有调用都再包装一层, 以检测/获取在锁调用过程中出现的错误, 比如:
- 解了一个没有持有的锁
- 对一个非递归锁进行了二次加锁
如果出现了上述错误, 则libevent会导致进程退出, 并附送一个断言错误
要开启这个特性, 调用下面的接口:
void evthread_enable_lock_debugging(void);
#define evthread_enable_lock_debuging() evthread_enable_lock_debugging()
注意, 这也是一个全局设置项, 请遵循: 一次设置, 初始化时就设置, 永不改动的规则.
这个特性在2.0.4版本中开始支持, 当时接口函数名拼写错误了, 少写了一个g: evthread_enable_lock_debuging()
. 后来在2.1.2版本中把这个错误的拼写修正过来了. 但还是兼容了之前的错误拼写.
这个特性吧, 很明显是libevent内部开发时使用的. 现在开放出来估计是考虑到, 如果你的代码中出现了一个bug是由libevent内部加解锁失误导致的, 那么用个特性可以定位到libevent内部. 否则你很难把锅甩给libevent. 当然这种情况很少见.
4.6 小知识点: 排除不正确的使用姿势
libevent是一个比较薄的库, 薄的好处是性能很好, 坏处是没有在接口上对使用者做过多的约束. 这就导致一些二把刀使用者经常会错误的使用libevent. 常见的智障行为有:
- 向相关接口传递了一个未初始化的事件结构实例
- 试图第二次初始化一个正在被使用的事件结构实例
这种错误其实挺难发现的, 为了解决这个痛点, libevent额外开发了一个新特性: 在发生上述情况的时候, libevent给你报错.
但这是一个会额外消耗资源的特性, libevent内部其实是追踪了每个事件结构的初始化与销毁, 所以仅在开发测试的时候打开它, 发现问题, 解决问题. 在实际部署的时候, 不要使用这个特性. 开启这个特性的接口如下:
void event_enable_debug_mode(void);
再不厌其烦的讲一遍: 全局设定, 初始化时设定, 一次设定, 永不更改.
这个特性开启后, 也有一个比较蛋疼的事情: 就是如果你的代码里大量使用了event_assign()
来创建事件结构, 可能你的程序在这个特性下会OOM挂掉..原因是: libevent可以通过对event_new()
和event_free()
的追踪来检测事件结构实例是否未被初始化, 或者被多次初始化, 或者被非法使用. 但是对于event_assign()
拷贝来的事件结构, 这追踪就无能为力了, 并且蛋疼的是event_assign()
还是浅拷贝. 这样, 如果你的代码里大量的使用了event_assign()
, 这就会导致内置的的追踪功能一旦追上车就下不来了, 完事车太多就OOM挂掉了.
为了避免在这个特性下由于追踪event_assign()
创建的事件实例(或许这里叫实例已经不合适了, 应该叫句柄)而导致程序OOM, 可以调用下面的函数以解除对这种事件实例的追踪, 以避免OOM
void event_debug_unassign(struct event * ev);
这样, 调试模式下, 相关的追踪检测就会放弃追踪由event_assign
创建的事件. 所以你看, 这个特性也不是万能的, 有缺陷, 凑合用吧. 在不开启调试模式下, 调用event_debug_unassign()
函数没有任何影响
下面是一个例子:
#include <event2/event.h>
#include <event2/event_struct.h>#include <stdlib.h>void cb(evutil_socket_t fd, short what, void *ptr)
{struct event *ev = ptr;if (ev) // 通过判断入参是否为NULL, 来确认入参携带的事件实例是event_new来的还是event_assign来的event_debug_unassign(ev); // 如果是event_assign来的, 那么就放弃对它的追踪
}/** 下面是一个简单的循环, 等待fd1与fd2同时可读*/
void mainloop(evutil_socket_t fd1, evutil_socket_t fd2, int debug_mode)
{struct event_base *base;struct event event_on_stack, *event_on_heap; // 一个是栈上的事件实例, 一个是堆上的事件实例if (debug_mode)event_enable_debug_mode(); // 开启调试模式base = event_base_new();event_on_heap = event_new(base, fd1, EV_READ, cb, NULL); // 通过event_new来创建堆上的实例, 并把事件回调的入参设置为NULLevent_assign(&event_on_stack, base, fd2, EV_READ, cb, &event_on_stack); // 通过event_assign来初始化栈上的实例, 并把事件回调的入参设置为事件实例自身的指针event_add(event_on_heap, NULL);event_add(&event_on_stack, NULL);event_base_dispatch(base);event_free(event_on_heap);event_base_free(base);
}
这个例子也写的比较蛋疼, 凑合看吧.
另外, 调试模式下的详情调试信息, 只能通过在编译时额外定义宏USE_DEBUG
来附加. 即在编译时加上-DUSE_DEBUG
来开启. 加上这个编译时的宏定义后, libevent就会输出一大坨有关其内部流程的详情日志, 包括但不限于
- 事件的增加
- 事件的删除
- 与具体平台相关的事件通知信息
这些详情不能通过调用API的方式开启或关闭. 而开启调试模式的API, 在2.0.4版本后才可用.
4.7 检测当前项目中引用的libevent的版本
接口很简单, 如下:
#define LIBEVENT_VERSION_NUMBER 0x02000300
#define LIBEVENT_VERSION "2.0.3-alpha"
const char *event_get_version(void); // 获取字符串形式的版本信息
ev_uint32_t event_get_version_number(void); // 获取值形式的版本信息
值形式的版本信息由一个uint32_t
类型存储, 从高位到低位, 每8位代表一个版本号. 比如 0x02000300
代表的版本号就是02.00.03.00
. 三级版本号后可能还有一个小版本号, 比如就存在过一个2.0.1.18
的版本
下面是一个在编译期检查libevent版本的写法, 若版本小于2.0.1, 则编译不通过. 需要注意的是, 编译期检查的是宏里的值, 如果你的项目构建比较混乱, 很可能出现头文件的版本, 和最终链接的二进制库的版本不一致的情况. 所以编译期检查也不一定靠谱
#include <event2/event.h>#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000100
#error "This version of Libevent is not supported; Get 2.0.1-alpha or later."
#endifint
make_sandwich(void)
{/* Let's suppose that Libevent 6.0.5 introduces a make-me-asandwich function. */
#if LIBEVENT_VERSION_NUMBER >= 0x06000500evutil_make_me_a_sandwich();return 0;
#elsereturn -1;
#endif
}
下面是一个在运行时检查libdvent版本的写法. 检查运行期的版本是通过函数调用检查的, 这就保证了返回的版本号一定是链接进的库的版本号. 这个比较靠谱. 另外需要注意的是, 数值形式的版本号在libevent2.0.1之后才提供. 所以只能比较蠢的用比较字符串的方式去判断版本号
#include <event2/event.h>
#include <string.h>int
check_for_old_version(void)
{const char *v = event_get_version();/* This is a dumb way to do it, but it is the only thing that worksbefore Libevent 2.0. */if (!strncmp(v, "0.", 2) ||!strncmp(v, "1.1", 3) ||!strncmp(v, "1.2", 3) ||!strncmp(v, "1.3", 3)) {printf("Your version of Libevent is very old. If you run into bugs,"" consider upgrading.\n");return -1;} else {printf("Running with Libevent version %s\n", v);return 0;}
}int
check_version_match(void)
{ev_uint32_t v_compile, v_run;v_compile = LIBEVENT_VERSION_NUMBER;v_run = event_get_version_number();if ((v_compile & 0xffff0000) != (v_run & 0xffff0000)) {printf("Running with a Libevent version (%s) very different from the ""one we were built with (%s).\n", event_get_version(),LIBEVENT_VERSION);return -1;}return 0;
}
接口和宏的定义位于 <event2/event.h>
中, 字符串形式的版本号在1.0版本就提供了, 数值形式的版本号直至2.0.1才提供
4.8 一键释放所有全局实例
就算你手动释放了所有在程序代码初始化时创建的libevent对象, 在程序退出之前, 也依然有一些内置的, 对使用者不可见的libevent内部实例以及一些全局配置实例存在着, 并且存在在堆区. 一般情况下不用管它们: 程序都退出了, 释放不释放有什么区别呢? 反正操作系统会帮你清除的. 但有时你想引入一些第三方的分析工具, 比如检测内存泄漏的工具时, 就会导致这些工具误报内存泄漏.
你可以简单的调一下下面这个函数, 完成一键完全清除:
void libevent_global_shutdown(void);
注意哦: 这个函数不会帮你释放你自己调用libevent接口创建出来的对象哦! 还没那么智能哦!
另外, 很显然的一点是, 当调用了这个函数之后, 再去调用其它libevent接口, 可能会出现异常哦! 所以没事不要调用它, 如果你调用它, 那么一定是自杀前的最后一秒.
函数定义在<event2/event.h>
中, 2.1.1版本后可用
转载于:https://www.cnblogs.com/neooelric/p/9057381.html
Libevent教程001: 简介与配置相关推荐
- Python Flask Web教程001:Flask 简介
Flask Web教程001:Flask 简介 0. 前言 1. flask简介 2. flask的优势 3. 总结 0. 前言 本系列教程从Flask框架的基础知识开始,逐渐深入到使用flask进行 ...
- Nacos教程_1 简介和安装
教程原稿–https://gitee.com/fakerlove/joker-nacos 文章目录 nacos 教程 1. 简介 1.1 介绍 1.2 安装 下载 linux版本 windows版本 ...
- Git教程_1 简介
https://gitee.com/fakerlove/git 文章目录 git 教程 1. 简介 1.1 介绍 1.2 环境准备 1.3 安装 1. 设置 用户名和密码: 2. 然后看本地目录是否有 ...
- docker教程_1 简介和安装
https://gitee.com/fakerlove/docker 文章目录 Docker 教程 1. 简介 1.1 概念 1.2 优点 1.3 安装 环境准备 安装 Docker 教程 1. 简介 ...
- MongoDB中不溜教程(1)简介与命令
MongoDB教程之简介与命令 MongoDB简介 MongoDB下载安装 名词解释 常用命令 数据库命令 创建数据库 查看当前数据库 显示数据库列表 删除数据库 集合命令 创建集合 显示集合列表 删 ...
- html学习之路教程001
001学习目录 标题标签 换行标签和段落标签 文本格式化标签 spn和div标签 图像标签 相对路径和绝对路径 超链接标签 锚点定位 教程001总结案例 标题标签 <h1>标题标签< ...
- SpringBoot简明教程之项目属性配置(二):@ConfigurationProperties与@Value简单比较
前文回顾 @ConfigurationProperties与@Value简单比较 如何通过@Value来实现注入值 是否支持松散绑定(Relaxed Binding) 是否支持SpEL语法 是否支持J ...
- Windows server 2012 搭建×××图文教程(二)配置路由和远程访问服务
Windows server 2012 搭建×××图文教程(一)安装×××相关服务 Windows server 2012 搭建×××图文教程(二)配置路由和远程访问服务 Windows server ...
- Xamarin.Forms教程下载安装JDK配置环境变量
Xamarin.Forms教程下载安装JDK配置环境变量 Xamarin.Form环境配置下载安装JDK JDK是编程Java程序必须的软件.也许有人会问我们用的C#为什么还有Java呢?这是因为我们 ...
最新文章
- FGMap学习之--三维地图
- 有了这个工具,不执行代码就可以找PyTorch模型错误
- Veeam Backup Replication v7 安装配置手册
- [转载] FatFs模块功能配置选项
- 【数据结构与算法】之深入解析运用链表结构计算“两数相加”的算法实现
- 区块链系列教程之:比特币的世界
- C++(STL):25 ---序列式容器stack源码剖析
- LinkedList 真的是查找慢增删快?刷新你的认知!
- EMC测试项目——辐射骚扰
- html设置桌面背景win7,win7电脑桌面背景怎么设置_win7电脑桌面壁纸怎么设置-win7之家...
- 深海迷航创造模式中如何起飞火箭
- android账号密码长度限制,限制输入密码长度
- Nginx反向代理处理跨域问题
- Java,第一次作业——六边形面积
- 【小学】再做一年级算术题
- Crash 工具使用
- java 模板函数_重温Java中的模板方法设计模式
- 如何在浏览器查看登录密码
- 【洛谷P6199 [EER1]河童重工】【点分治+虚树】
- 合同审查自动化-企业合同处理新模式