redis源码分析(2)——事件循环
(1)初始化
typedef struct aeEventLoop {int maxfd; /* highest file descriptor currently registered */int setsize; /* max number of file descriptors tracked */long long timeEventNextId;// <MM>// 存放的是上次触发定时器事件的时间// </MM>time_t lastTime; /* Used to detect system clock skew */aeFileEvent *events; /* Registered events */aeFiredEvent *fired; /* Fired events */// <MM>// 所有定时器事件组织成链表// </MM>aeTimeEvent *timeEventHead;// <MM>// 是否停止eventLoop// </MM>int stop;void *apidata; /* This is used for polling API specific data */// <MM>// 事件循环每一次迭代都会调用beforesleep// </MM>aeBeforeSleepProc *beforesleep;
} aeEventLoop;
aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;// <MM>// setsize指定事件循环监听的fd的数目// 由于内核保证新创建的fd是最小的正整数,所以直接创建setsize大小// 的数组,存放对应的event// </MM>if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;eventLoop->setsize = setsize;eventLoop->lastTime = time(NULL);eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;if (aeApiCreate(eventLoop) == -1) goto err;/* Events with mask == AE_NONE are not set. So let's initialize the* vector with it. */for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;err:if (eventLoop) {zfree(eventLoop->events);zfree(eventLoop->fired);zfree(eventLoop);}return NULL;
}
以epoll为例,aeApiCreate主要是创建epoll的fd,以及要监听的epoll_event,这些数据定义在:
typedef struct aeApiState {int epfd;struct epoll_event *events;
} aeApiState;
这里,监听到的事件组织方式与event_loop中监听事件一样,同样是setsize大小的数据,以fd为下标。
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}eventLoop->apidata = state;return 0;
}
(2)添加、删除事件
redis支持两类事件,网络io事件和定时器事件。定时器事件的添加、删除相对简单些,主要是维护定时器事件列表。首先看一下表示定时器事件的结构:
/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */long when_sec; /* seconds */long when_ms; /* milliseconds */aeTimeProc *timeProc;aeEventFinalizerProc *finalizerProc;void *clientData;struct aeTimeEvent *next;
} aeTimeEvent;
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}
/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;
} aeFileEvent;
下面看一下epoll添加事件的实现,主要是调用epoll_ctl。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee;/* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.u64 = 0; /* avoid valgrind warning */ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}
struct epll_event用于指定要监听的事件,以及该文件描述符绑定的data,在事件触发时可以返回。这里将data直接存为fd,通过这个数据,便可以找到对应的事件,然后调用其处理函数。
(3)等待事件触发
通过调用aeMain函数进入事件循环:
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);}
}
int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */// <MM>// 在两种情况下进入poll,阻塞等待事件发生:// 1)在有需要监听的描述符时(maxfd != -1)// 2)需要处理定时器事件,并且DONT_WAIT开关关闭的情况下// </MM>if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// <MM>// 根据最快发生的定时器事件的发生时间,确定此次poll阻塞的时间// </MM>if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))// <MM>// 线性查找最快发生的定时器事件// </MM>shortest = aeSearchNearestTimer(eventLoop);if (shortest) {// <MM>// 如果有定时器事件,则根据它触发的时间,计算sleep的时间(ms单位)// </MM>long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {// <MM>// 如果没有定时器事件,则根据情况是立即返回,或者永远阻塞// </MM>/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}
接着,调用aeApiPoll函数,传入前面计算的sleep时间,等待io事件放生。在函数返回后,触发的事件已经填充到eventLoop的fired数组中。epoll的实现如下,就是调用epoll_wait,函数返回后,会将触发的事件存放到state->events数组中的前numevents个元素。接下来,填充fired数组,设置每个触发事件的fd,以及事件类型。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// <MM>// 调用epoll_wait,state->events存放返回的发生事件的fd// </MM>retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);if (retval > 0) {int j;numevents = retval;// <MM>// 有事件发生,将发生的事件存放于fired数组// </MM>for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}
在事件返回后,需要处理事件。遍历fired数组,取得fd对应的事件,并根据触发的事件类型,回调其处理函数。
for (j = 0; j < numevents; j++) {// <MM>// poll返回后,会将所有触发的时间存放于fired数组// </MM>aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// <MM>// 回调发生事件的fd,注册的事件处理函数// </MM>if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}
/* If the system clock is moved to the future, and then set back to the* right value, time events may be delayed in a random way. Often this* means that scheduled operations will not be performed soon enough.** Here we try to detect system clock skews, and force all the time* events to be processed ASAP when this happens: the idea is that* processing events earlier is less dangerous than delaying them* indefinitely, and practice suggests it is. */if (now < eventLoop->lastTime) {te = eventLoop->timeEventHead;while(te) {te->when_sec = 0;te = te->next;}}eventLoop->lastTime = now;
接下来遍历所有定时器事件,查找触发的事件,然后回调处理函数。定时器事件处理函数的返回值,决定这个事件是一次性的,还是周期性的。如果返回AE_NOMORE,则是一次性事件,在调用完后会删除该事件。否则的话,返回值指定的是下一次触发的时间。
te = eventLoop->timeEventHead;maxId = eventLoop->timeEventNextId-1;while(te) {long now_sec, now_ms;long long id;if (te->id > maxId) {te = te->next;continue;}aeGetTime(&now_sec, &now_ms);if (now_sec > te->when_sec ||(now_sec == te->when_sec && now_ms >= te->when_ms)){// <MM>// 定时器事件的触发时间已过,则回调注册的事件处理函数// </MM>int retval;id = te->id;retval = te->timeProc(eventLoop, id, te->clientData);processed++;/* After an event is processed our time event list may* no longer be the same, so we restart from head.* Still we make sure to don't process events registered* by event handlers itself in order to don't loop forever.* To do so we saved the max ID we want to handle.** FUTURE OPTIMIZATIONS:* Note that this is NOT great algorithmically. Redis uses* a single time event so it's not a problem but the right* way to do this is to add the new elements on head, and* to flag deleted elements in a special way for later* deletion (putting references to the nodes to delete into* another linked list). */// <MM>// 根据定时器事件处理函数的返回值,决定是否将该定时器删除。// 如果retval不等于-1(AE_NOMORE),则更改定时器的触发时间为// now + retval(ms)// </MM>if (retval != AE_NOMORE) {aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);} else {// <MM>// 如果返回AE_NOMORE,则删除该定时器// </MM>aeDeleteTimeEvent(eventLoop, id);}te = eventLoop->timeEventHead;} else {te = te->next;}}
在回调处理函数时,有可能会添加新的定时器事件,如果不断加入,存在无限循环的风险,所以需要避免这种情况,每次循环不处理新添加的事件,这是通过下面的代码实现的。
if (te->id > maxId) {te = te->next;continue;}
redis源码分析(2)——事件循环相关推荐
- Redis源码分析:基础概念介绍与启动概述
Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...
- Redis源码分析(一)redis.c //redis-server.c
Redis源码分析(一)redis.c //redis-server.c 入口函数 int main() 4450 int main(int argc, char **argv) {4451 init ...
- redis源码分析 -- cs结构之服务器
服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...
- 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )
Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...
- 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 )
Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...
- 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理
10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...
- 【Redis源码分析】Redis命令处理生命周期
运营研发团队 李乐 前言 本文主要讲解服务器处理客户端命令请求的整个流程,包括服务器启动监听,接收命令请求并解析,执行命令请求,返回命令回复等,这也是本文的主题"命令处理的生命周期" ...
- Redis源码分析 —— 发布与订阅
前言 通过阅读Redis源码,配合GDB和抓包等调试手段,分析Redis发布订阅的实现原理,思考相关问题. 源码版本:Redis 6.0.10 思考问题 发布订阅基本概念介绍 订阅频道 -- SUBS ...
- [Vue源码分析]自定义事件原理及事件总线的实现
最近小组有个关于vue源码分析的分享会,提前准备一下- 前言: 我们都知道Vue中父组件可以通过 props 向下传数据给子组件:子组件可以通过向$emit触发一个事件,在父组件中执行回调函数,从而实 ...
- Redis源码分析(一)--Redis结构解析
从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久 ...
最新文章
- 揭开雷达的面纱(科普)探测能力
- PAT甲级1039 Course List for Student :[C++题解]排序、哈希表
- KINDLE TOUCH修复板砖过程
- 《Go语言圣经》学习笔记 第十一章 测试
- source code of MES Data
- 深度学习Caffe 入门理解使用教程
- Atitit json数据操作法 目录 1. 2. 常用json函数类型四大类型	 crud判断	1 1.1. 2.1. 创建json	2	1 1.2. 2.2. 选择与读取	3读取数据读取key
- java值传递人体自_请大家帮帮我这个初学者
- 论文解读-通过建模时空动态生成活动轨迹
- 移动商务,还是短信商务?
- rpm -e卸载mysql_rpm
- Python程序设计入门32道基础编程题目与参考代码
- Pycharm 2018安装步骤
- 平面设计学习之四(PS-计算磨皮法)
- Lync客户端证书安装
- Ceph分布式存储知识总结
- ORB-SLAM2多线程用法总结
- 1553B数据总线用终端电连接器-DK-6211
- 那些困扰你多年的项目管理问题,终于有解决方案了!
- Django MVT模型详解--高级