在网络程序中我们通常要处理三种事件,网络I/O事件、信号以及定时事件,我们可以使用I/O复用系统调用(select、poll、epoll)将这三类事件进行统一处理。我们通常使用定时器来检测一个客户端的活动状态,服务器程序通常管理着众多定时事件,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响。为此我们需要将每个定时事件分别封装为定时器,并使用某种容器类数据结构,比如:链表、排序链表、最小堆、红黑树以及时间轮等,将所有定时器串联起来,以实现对定时事件的统一管理。此处所说的定时器,确切的说应该是定时容器,定时器容器是容器类数据结构;定时器则是容器内容纳的一个个对象,它是对定时事件的封装,定时容器是用来管理定时器的。

在本文中将主要介绍使用红黑树来实现的定时容器。

1、时间轮简介

一种简单的时间轮如下图所示:

轮中实线指针指向轮中的一个槽(slot)。它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针指向的槽),每次转动一次称为一个滴答(tick)。一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是定时器的心搏时间。该时间轮共有N个槽,因此它转一圈所需的时间就是N*si。每个槽中保存了一个定时器链表,时间轮的结构与哈希链表的结构是比较相似的。槽中的每条链表上的定时器具有相同的特征:它们的定时相差N*si的整数倍。时间轮正是利用这个关系将定时器三列到不同的链表中的。

假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts对应链表中的:
ts=(cs+(ti/si))%Nts = (cs + (ti / si)) \% N ts=(cs+(ti/si))%N
比如每个槽间隔si为100ms,N为600,转动一圈经过的时间为600*100ms,也就是60s。假设现在指针指向槽cs=10;我们添加一个定时时间60s也就是60000ms的定时器,
ts=(10+(60000/100))%600=10ts = (10 + (60000 / 100)) \% 600 = 10 ts=(10+(60000/100))%600=10
也就是转一圈重新回到cs=10时将会触发,如果此时添加的定时时间为120s,也就是转两圈重新回到该位置时触发。因此需要一个变量来保存一个定时器需要转多少圈后才触发。

我们可以与I/O复用系统调用(select、poll、epoll)和时间轮一起来实现定时容器,将时间轮的的槽间隔作为I/O复用系统调用的超时值,当系统调用返回时,就检查当前指向的槽中的定时器,遍历槽中的定时器,如果圈数为0,则说明该定时器到期,执行相应的回调函数。如果圈数大于0,将其减一。遍历完成之后,将指针指向下一个槽并继续上述操作。

时间轮使用哈希表的思想,将定时器散列到不同的链表上,这样每条链表上的定时器就相对比较少。但是对时间轮而言,要提高精度,就要使si足够小。要提高执行效率,则要求N足够大。

2、代码实现如下:

该定时容器的思路是:将槽间隔作为I/O复用系统调用(select、poll、epoll)的超时值,当系统调用返回后就调用tick函数检查当前指针指向的槽的链表中的定时器,如果定时器中保存的圈数变量等于0,说明定时器到期,执行其中的回调函数,并删除该定时器。如果圈数大于0,说明该定时器还未到期,将圈数减一。遍历完成后,将指针指向下一个槽位。继续上述操作。

时间轮定时容器的几个接口介绍:

​ 1) tick :在tick函数中循环查找定时器,如果定时器圈数为0,则定时器到期,执行其回调函数,然后删除该定时器。如果定时器圈数大于0,则将该变量减1。

​ 2)addTimer::向容器中添加一个定时器,并返回定时器的指针。

​ 3)delTimer::根据传入的定时器指针删除容器中的一个定时器,并且销毁资源。

​ 4)resetTimer: 重置一个定时器。

​ 5)getMinExpire:获取槽间隔;

代码如下:

timer_common.hpp

#ifndef _LIB_SRC_TIMER_COMMON_H
#define _LIB_SRC_TIMER_COMMON_H#include <stdio.h>
#include <sys/time.h>// 获取时间戳 单位:毫秒
time_t getMSec()
{struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}// 定时器数据结构的定义
template <typename _User_Data>
class Timer
{public:Timer() : _user_data(nullptr), _cb_func(nullptr) {};Timer(int msec) : _user_data(nullptr), _cb_func(nullptr){this->_expire = getMSec() + msec;}~Timer(){}void setTimeout(time_t timeout){this->_expire = getMSec() + timeout;}time_t getExpire(){return _expire;}void setUserData(_User_Data *userData){this->_user_data = userData;}void handleTimeOut(){if(_cb_func){_cb_func(_user_data);}}using TimeOutCbFunc = void (*)(_User_Data *);void setCallBack(TimeOutCbFunc callBack){this->_cb_func = callBack;}private:time_t _expire;                    // 定时器生效的绝对时间            _User_Data *_user_data;            // 用户数据TimeOutCbFunc _cb_func;           // 超时时的回调函数
};template <typename _UData>
class ITimerContainer
{public:ITimerContainer() = default;virtual ~ITimerContainer() = default;public:virtual void tick() = 0;               virtual Timer<_UData> *addTimer(time_t timeout) = 0;virtual void delTimer(Timer<_UData> *timer) = 0;virtual void resetTimer(Timer<_UData> *timer, time_t timeout) = 0;virtual int getMinExpire() = 0;
};#endif

time_wheel_timer.hpp

#ifndef _LIB_SRC_TIME_WHEEL_TIMER_H_
#define _LIB_SRC_TIME_WHEEL_TIMER_H_#include "timer_common.hpp"
#include <array>
#include <list>
#include <iostream>/** @Author: MGH* @Date: 2021-09-29 12:57:45* @Last Modified by: Author* @Last Modified time: 2021-09-29 12:57:45* @Description: Time Wheel Timer
*/template <typename _UData>
class TimerNode
{public:TimerNode() = default;~TimerNode() = default;public:void setTimeSlot(int slot){this->_time_slot = slot;}int getTimeSlot(){return this->_time_slot;}void setRotation(int rotation){this->_rotation = rotation;}int getRotation(){return this->_rotation;}public:Timer<_UData> timer;private:int _time_slot;            // 记录定时器在时间轮中的槽位int _rotation;              // 记录定时器在时间轮转多少圈后生效
};template <typename _UData>
class TWTimerContainer : public ITimerContainer<_UData>
{public:TWTimerContainer();~TWTimerContainer() override;public:void tick() override;               Timer<_UData> *addTimer(time_t timeout) override;void delTimer(Timer<_UData> *timer) override;void resetTimer(Timer<_UData> *timer, time_t timeout) override;int getMinExpire() override;private:TimerNode<_UData> * del_timer(Timer<_UData> *timer);void add_timer(TimerNode<_UData> *timer_node, time_t timeout);private:// 时间轮上槽的数目为600,转一圈的时间是60sstatic const int _SLOTS_NUM = 600;// 时间轮using TimerList = std::list< TimerNode<_UData> *>; std::array< TimerList *, _SLOTS_NUM> _slots;// 时间轮的槽间隔,100ms static const int _SI = 100;                     // 时间轮当前的槽int _cur_slot;};template <typename _UData>
TWTimerContainer<_UData>::TWTimerContainer() : _cur_slot(1)
{_slots.fill(nullptr);
}template <typename _UData>
TWTimerContainer<_UData>::~TWTimerContainer()
{TimerList *temp = nullptr;for(int i = 0; i < _slots.size(); i++){temp = _slots[i];if(temp != nullptr){for(auto itr = temp->begin(); itr != temp->end(); ++itr ){delete *itr;}}delete temp;}}template <typename _UData>
void TWTimerContainer<_UData>::tick()
{// 取出当前指针指向的slot中保存的链表auto slot_list = _slots[_cur_slot];TimerNode<_UData> *node = nullptr;if(slot_list){for(auto itr = slot_list->begin(); itr != slot_list->end(); ++itr){// 如果定时器的rotation大于0,则它在这一轮不起作用if((*itr)->getRotation() > 0){   (*itr)->setRotation((*itr)->getRotation() - 1);continue;}// 否则说明定时器到期, 执行回调函数(*itr)->timer.handleTimeOut();auto temp_itr = itr++;node = *temp_itr;// 删除定时器delete node;slot_list->erase(temp_itr);}}_cur_slot = (_cur_slot + 1) % _SLOTS_NUM;
}template <typename _UData>
TimerNode<_UData> *TWTimerContainer<_UData>::del_timer(Timer<_UData> *timer)
{// 由于Timer在TimerNode中第一个位置,可以直接强转TimerNode<_UData> *timer_node = reinterpret_cast<  TimerNode<_UData>* >(timer);if(timer_node == nullptr){return nullptr;}// 获取定时器在时间轮中的哪个槽中int ts = timer_node->getTimeSlot();auto slot_list = _slots[ts];if(slot_list == nullptr){return nullptr;}slot_list->remove(timer_node);return timer_node;
}template <typename _UData>
void TWTimerContainer<_UData>::add_timer(TimerNode<_UData> *timer_node, time_t timeout)
{/*  根据待插入定时器的超时值计算出它经过多少个时间滴答后被触发。如果传入的时间值小于时间轮的槽间隔,则向上折合*/int ticks = 0;// 计算出定时时间需要走过的槽数if(timeout < _SI){ticks = 1;}   else{ticks = timeout / _SI;}int rotation = ticks / _SLOTS_NUM;        // 圈数int ts = (_cur_slot + (ticks % _SLOTS_NUM)) % _SLOTS_NUM;      // 计算待插入的定时器应该被插入哪个槽中                            timer_node->setRotation(rotation);timer_node->setTimeSlot(ts);if(_slots[ts] == nullptr){_slots[ts] = new TimerList;}_slots[ts]->push_back(timer_node);std::cout << "add timer, rotation:" << rotation << " ts:" << ts << std::endl;
}// 添加一个定时器,并返回Timer类型指针
template <typename _UData>
Timer<_UData> *TWTimerContainer<_UData>::addTimer(time_t timeout)
{                               TimerNode<_UData> *timer_node = new TimerNode<_UData>;if(timer_node){add_timer(timer_node, timeout);return &timer_node->timer;}return nullptr;
}// 删除一个定时器
template <typename _UData>
void TWTimerContainer<_UData>::delTimer(Timer<_UData> *timer)
{TimerNode<_UData> *timer_node = del_timer(timer);if(timer_node){delete timer_node;}}template <typename _UData>
void TWTimerContainer<_UData>::resetTimer(Timer<_UData> *timer, time_t timeout)
{TimerNode<_UData> *timer_node = del_timer(timer);if(!timer_node){return ;}add_timer(timer_node, timeout);
}template <typename _UData>
int TWTimerContainer<_UData>::getMinExpire()
{return _SI;
}#endif

下面代码是使用epoll实现的一个回射服务器,在服务端将检测非活跃连接,每个客户端都有一个定时器,超时时间为15s,当客户端与服务器在15s内没有数据交互,服务端就会踢掉相应的客户端。客户端发送数据后,服务端将会重置其定时器。

test_timewheel_timer.cpp

#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <signal.h>
#include "time_wheel_timer.hpp"using std::cout;
using std::endl;#define PORT 6666
#define MAX_EVENTS 1024
#define MAX_BUF_SIZE 1024struct Event;using readHandle = void(*)(Event *, ITimerContainer<Event> *);
using writeHandle = void(*)(Event *, ITimerContainer<Event> *);// 自定义结构体,用来保存一个连接的相关数据
struct Event
{int fd;char ip[64];uint16_t port;epoll_event event; void *timer;char buf[MAX_BUF_SIZE];int buf_size;readHandle read_cb;writeHandle write_cb;
};int epfd;
int pipefd[2];// 超时处理的回调函数
void timeout_handle(Event *cli)
{if(cli == nullptr){return ;}cout << "Connection time out, fd:" << cli->fd << " ip:[" << cli->ip << ":" << cli->port << "]" << endl;epoll_ctl(epfd, EPOLL_CTL_DEL, cli->fd, &cli->event);close(cli->fd);delete cli;
}void err_exit(const char *reason)
{cout << reason << ":" << strerror(errno) << endl;exit(1);
}// 设置非阻塞
int setNonblcoking(int fd)
{int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;
}// 设置端口复用
void setReusedAddr(int fd)
{int reuse = 1;setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
}// 初始化server socket
int socket_init(unsigned short port, bool reuseAddr)
{int fd = socket(AF_INET, SOCK_STREAM, 0);if(fd < 0){err_exit("socket error");}if(reuseAddr){setReusedAddr(fd);}struct sockaddr_in addr;bzero(&addr, 0);addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = htonl(INADDR_ANY);int ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));if(ret < 0){err_exit("bind error");}setNonblcoking(fd);ret = listen(fd, 128);if(ret < 0){err_exit("listen error");}return fd;
}void readData(Event *ev, ITimerContainer<Event> *htc)
{ev->buf_size = read(ev->fd, ev->buf, MAX_BUF_SIZE - 1);if(ev->buf_size == 0){close(ev->fd);htc->delTimer((Timer<Event> *)ev->timer);epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ev->event);cout << "Remote Connection has been closed, fd:" << ev->fd << " ip:[" << ev->ip << ":" << ev->port << "]" << endl;delete ev;return;}ev->event.events = EPOLLOUT;epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &ev->event);
}void writeData(Event *ev, ITimerContainer<Event> *htc)
{write(ev->fd, ev->buf, ev->buf_size);ev->event.events = EPOLLIN;epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &ev->event);// 重新设置定时器htc->resetTimer((Timer<Event> *)ev->timer, 15000);
}// 接收连接回调函数
void acceptConn(Event *ev, ITimerContainer<Event> *htc)
{Event *cli = new Event;struct sockaddr_in cli_addr;socklen_t sock_len = sizeof(cli_addr);int cfd = accept(ev->fd, (struct sockaddr *)&cli_addr, &sock_len);if(cfd < 0){cout << "accept error, reason:" << strerror(errno) << endl;return;} setNonblcoking(cfd);cli->fd = cfd;cli->port = ntohs(cli_addr.sin_port);inet_ntop(AF_INET, &cli_addr.sin_addr, cli->ip, sock_len);cli->read_cb = readData;cli->write_cb = writeData;auto timer = htc->addTimer(15000);      //设置客户端超时值15秒timer->setUserData(cli);timer->setCallBack(timeout_handle);cli->timer = (void *)timer;cli->event.events = EPOLLIN;cli->event.data.ptr = (void *) cli;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &cli->event);cout << "New Connection, ip:[" << cli->ip << ":" << cli->port << "]" << endl;
}void sig_handler(int signum)
{char sig = (char) signum;write(pipefd[1], &sig, 1);
}int add_sig(int signum)
{struct sigaction sa;memset(&sa, 0, sizeof(sa));sa.sa_handler = sig_handler;sa.sa_flags |= SA_RESTART;sigfillset(&sa.sa_mask);return sigaction(signum, &sa, nullptr);}int main(int argc, char *argv[])
{// 信号处理int ret = add_sig(SIGINT);if(ret < 0){err_exit("add sig error");}ret = socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd);if(ret < 0){err_exit("socketpair error");}int fd = socket_init(PORT, true);Event server;Event sig_ev;server.fd = fd;sig_ev.fd = pipefd[0];epfd = epoll_create(MAX_EVENTS);if(epfd < 0){err_exit("epoll create error");}sig_ev.event.events = EPOLLIN;sig_ev.event.data.ptr = (void *) &sig_ev;;server.event.events = EPOLLIN;server.event.data.ptr = (void *)&server;epoll_ctl(epfd, EPOLL_CTL_ADD, pipefd[0], &sig_ev.event);epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &server.event);cout << "------ Create TimerContainer ------" << endl;ITimerContainer<Event> *htc = new TWTimerContainer<Event>;cout << "------ Create TimerContainer over ------" << endl;struct epoll_event events[MAX_EVENTS];int nready = 0;int timeout = 10000;      //设置超时值为10秒char buf[1024] = {0};bool running = true;while(running){// 将定时容器中定时时间最短的时长作为epoll_wait的最大等待时间auto min_expire = htc->getMinExpire();timeout = (min_expire == -1) ? 10000 : min_expire;nready = epoll_wait(epfd, events, MAX_EVENTS, timeout);if(nready < 0){cout << "epoll wait error, reason:" << strerror(errno) << endl;} else if(nready > 0){// 接收新的连接for(int i = 0; i < nready; i++){Event *ev =  (Event *) events[i].data.ptr;// 接受新的连接if(ev->fd == pipefd[0]){   int n = read(pipefd[0], buf, sizeof(buf));if(n < 0){cout << "deal read signal error:" << strerror(errno) << endl;continue; }else if(n > 0){for(int i = 0; i < n; i++){switch (buf[i]){case SIGINT:running = false;break;}}}}else if(ev->fd == fd ){acceptConn(ev, htc);}else if(ev->event.events & EPOLLIN){ev->read_cb(ev, htc);}else if(ev->event.events & EPOLLOUT){ev->write_cb(ev, htc);}}}else{htc->tick();}}close(fd); close(pipefd[0]); close(pipefd[1]); delete htc;return 0;
}

本文实现的是一种简单的时间轮,因为它只有一个轮子。而复杂的时间轮可能有多个轮子,不同的轮子拥有不同的粒度。相邻的两个轮子,精度高的转一圈,精度低的仅往前移动一槽。就像钟表一样,当秒针转一圈,分针才移动一下,同样,分针转一圈,时针才移动一下。

时间轮的思想应用范围非常广泛,各种操作系统的定时任务调度以及基于java的通信框架Netty中也有时间轮的实现,几乎所有的时间任务调度系统采用的都是时间轮的思想。

.
.
其它相关博客:
最小堆实现的定时器
红黑树实现的定时器

.
.
本人能力有限,代码中难免存在一些Bug,还请见谅。如果有好的建议,敬请提出。

参考资料:《Linux高性能服务器编程》

高性能定时器3——时间轮相关推荐

  1. 【高性能定时器】 时间轮

    时间轮 简述 顾名思义,时间轮就像一个轮子,在转动的时候外界会指向轮子不同的区域,该区域就可以被使用.因此只要将不同时间的定时器按照一定的方法散列到时间轮的不同槽(即时间轮划分的区域)之中,就可以实现 ...

  2. Linux网络编程 | 高性能定时器 :时间轮、时间堆

    文章目录 时间轮 时间堆 在上一篇博客中我实现了一个基于排序链表的定时器容器,但是其存在一个缺点--随着定时器越来越多,添加定时器的效率也会越来越低. 而下面的两个高效定时器--时间轮.时间堆,会完美 ...

  3. java实现时间轮定时器_c++ 时间轮定时器实现

    前言 之所以写这篇文章,是在一篇博客中看到了时间轮定时器这个东西,感觉很是惊艳,https://www.cnblogs.com/zhongwencool/p/timing_wheel.html.在以前 ...

  4. linux现代时间轮算法,linux2.6定时器的时间轮算法分析

    1.Overview 常用的定时器实现算法有两种:红黑树和时间轮(timing wheel). 在Linux2.6的代码中,kernel/timer.c文件实现了一个通用定时器机制,使用的是时间轮算法 ...

  5. Linux服务器编程--升序链表定时器和时间轮定时器的比较

    1 两种机制的比较 2 代码实现 两种机制的原理不赘述了,代码中有详细注释. 2.1 升序链表法 完整代码参见:https://github.com/GaoZiqiang/Cplus_daily_tr ...

  6. 【高性能定时器】时间堆(最小堆)

    最小堆及其应用:时间堆 最小堆及其应用:时间堆 一. 堆 1. 概念 2. 最小堆的实现 3. 性质 4. 代码 二.时间堆 1. 概念简述 2. 实现细节 3. 代码 一. 堆 1. 概念 堆是一种 ...

  7. 高性能定时器-------时间轮

    基于排序链表的定时器(https://blog.csdn.net/destory27/article/details/81748580)存在一个问题:添加定时器的效率偏低. 如图所示时间轮内,指针指向 ...

  8. 时间轮和时间堆管理定时器

    高性能定时器 时间轮 由于排序链表定时器容器有这样一个问题:添加定时器的效率偏低.而即将介绍的时间轮则解决了这个问题.一种简单的时间轮如下所示. 如图所示的时间轮内,指针指向轮子上的一个slot(槽) ...

  9. 游戏后台之高效定时器-时间轮

    高性能定时器 定时器的结构有多钟比如链表式,最小堆,时间轮的 在不同应用场景下使用哪种需要考虑效率和复杂度 这次我么那先先讲讲时间轮定时器,在linux内核里这种结构的定时器大量使用. 1.升序链表定 ...

  10. 时间轮实现定时器(哈西表思想)

    时间轮使用了哈西表的思想,将定时器散列到不同的链表上,这样每条链表上的定时器的数目将随插入定时器的数量增多明显减少,相比较顺序链表只有一条链表来装入所有定时器而言,哈西表的每条链表上的定时器的数量将很 ...

最新文章

  1. Navicat for Oracle Cannot load OCI DLL
  2. 计算机应用网站设计,《计算机应用基础》课程网站的设计与实现
  3. java $p_javap -c命令详解
  4. Taro+react开发(23)--componentWillReceiveProps
  5. zookeeper使用和原理探究
  6. Python Logging Loggers
  7. 如何在 Deno 中构建一个 URL 短链生成器
  8. python缩放图片,复制即用
  9. Java 正则表达式匹配规则
  10. 计算机硬件单片机,计算机硬件单片机总结报告
  11. 腾讯云服务器登录宝塔面板
  12. python画太阳花代码
  13. 药片计数器电路设计与实验
  14. wpf,silverlight,wp7,winform等学习资料整合(一)
  15. [corefx注释说]-System.Collections.Generic.StackT
  16. RGB颜色空间转LAB
  17. 数组-leetcode#15-找出三个数之和等于0的所有不重复序列
  18. QT多线程同步之QWaitcondition
  19. bert的兄弟姐妹梳理——Roberta、DeBerta、Albert、Ambert、Wobert等
  20. ybt1003:对齐输出

热门文章

  1. 华为手机升级回退_华为手机版本回退 - 卡饭网
  2. 自动化运维平台-OpManager
  3. Devexpress ASP.NET最新版开发.NET环境配置Visual Studo和SQL Server对应版本
  4. java转换字符串编码格式_java转换字符串编码格式的方法
  5. ict中的it和ct_ICT.Social – IT专业人员的社交网络
  6. java执行bat代码
  7. Worktile、Teambition与Tower项目管理软件对比
  8. 【语音处理】时域信号分析基本工具,什么是窗函数
  9. HC-05与JDY-09蓝牙模块对比与使用
  10. 阿里云iot平台实现MQTT通信(mqtt.fx接入iot平台及测试)