muduo网络库浅谈(一)

  • 序言
  • 第一章 muduo的关键结构
    • class EventLoop
    • class Channel
    • class Poller
  • 番外
    • 定时任务
      • class Timestamp
      • class Timer
      • class TimerQueue
      • class EventLoop调整
    • 在线程中创建并执行EventLoop

序言

C++的学习过程充满着迷茫,C++primer,侯捷老师的STL源码剖析,再到boost库,C++多线程库,纷繁复杂的数据结构,以及为了效率无所不用其极的函数重载,shared_ptr的线程安全性,多线程访问时数据竞争,C++对于C的进步在于安全的内存管理,以及大量库支持,避免了我们重复造轮子的过程。最近看了陈硕老师的muduo网络库,该网络库是一个C++网络服务中的经典,muduo库的意义在于:“奋木铎以警众,使明听也”,为了不至于看久了忘记,就撰写这篇博文,本博客将会从程硕老师的出版图书《Linux多线程服务端编程 使用muduo C++网络库》的第八章开始讲解(某东上打折很给力,买来看看还是很值得的!希望大家支持正版)。

在正式介绍之前不妨先思考一下网络库的设计需求,设立几个小目标。
1.一个单线程的事件处理机制。
2.一个基于网络通信的事件处理(举例来说,对通信时间进行处理以及处理后发送的处理机制)。
3.多线程下实现上述1,2要求。

接下来将分为三个章节来讲述要求1,2,3,的源码解读。

第一章 muduo的关键结构

class EventLoop

muduo网络库将网络服务通信的各个流程中的环节封装成不同的class,高度模块化的设计使得以后拓展接口提供了极大地便利,class EventLoop是网络库的重要组成部分,初始EventLoop什么也不做,代码如下:

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H#include "thread/Thread.h"namespace muduo
{class EventLoop : boost::noncopyable
{public:EventLoop();~EventLoop();void loop();void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:void abortNotInLoopThread();bool looping_; const pid_t threadId_;
};}#endif

可以看到EventLoop定义有构造,析构,loop,isInLoopThread,abortNotInLoopThread等函数,成员函数的定义如下:

#include "EventLoop.h"#include "logging/Logging.h"#include <assert.h>
#include <poll.h>using namespace muduo;__thread EventLoop* t_loopInThisThread = 0;//thread_local变量EventLoop::EventLoop(): looping_(false),threadId_(CurrentThread::tid())
{LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}
}EventLoop::~EventLoop()
{assert(!looping_);t_loopInThisThread = NULL;
}void EventLoop::loop()
{assert(!looping_);assertInLoopThread();looping_ = true;::poll(NULL, 0, 5*1000);LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}void EventLoop::abortNotInLoopThread()
{LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this<< " was created in threadId_ = " << threadId_<< ", current thread id = " <<  CurrentThread::tid();
}

其中t_loopInThisThread属于thread变量,存储当前线程的EventLoop指针,初始为0,在构造函数中,对事件循环的looping_标志位,以及线程号进行初始化(一个EventLoop对应一个线程,避免多线程下的数据竞争以及访问逻辑混乱的情况,所以使用大量assertInLoopThread以及boost:noncopyable以保证EventLoop在本线程中运行),LOG_TRACE 等属于日志,便于意外情况下对服务器运行情况的分析,以后不再赘述。

析构函数太过简单不需要解释;loop()函数用于正式启用事件循环,本例中loop()并不完全,poll函数属于IO复用,之后会讲解。abortNotInLoopThread()将线程运行错误输入日志。以上便是一个EventLoop的基本结构,但并不涉及具体的事件处理机制。

class Channel

在介绍class Channel之前,不得不说说Linux下IO复用——poll函数。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

pollfd为poll函数需要监听的事件,nfds为需要监听事件的数量,pollfd

struct pollfd{int fd;            //文件描述符short events;    //需要监听的事件short revents; //实际发生的事件
};

Linux一大特色便是万物皆文件,int fd指定了一个文件描述符,poll函数会根据events(用户设置)来监听fd,并根据发生的事件重写revents,并返回具体事件发生的数量。

因此对于上述调用poll环节可以分为class Channelclass Poller两部分!以下为class Channel的源码:

#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H#include <boost/function.hpp>
#include <boost/noncopyable.hpp>namespace muduo
{class EventLoop;
class Channel : boost::noncopyable
{public:typedef boost::function<void()> EventCallback;//定义回调函数Channel(EventLoop* loop, int fd);void handleEvent();//事件具体处理void setReadCallback(const EventCallback& cb)//读回调函数设置{ readCallback_ = cb; }void setWriteCallback(const EventCallback& cb)//写回调函数设置{ writeCallback_ = cb; }void setErrorCallback(const EventCallback& cb)//错误回调函数设置{ errorCallback_ = cb; }int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; }bool isNoneEvent() const { return events_ == kNoneEvent; }void enableReading() { events_ |= kReadEvent; update(); }//该函数负责向poller的channal列表进行注册// void enableWriting() { events_ |= kWriteEvent; update(); }// void disableWriting() { events_ &= ~kWriteEvent; update(); }// void disableAll() { events_ = kNoneEvent; update(); }// for Pollerint index() { return index_; }void set_index(int idx) { index_ = idx; }EventLoop* ownerLoop() { return loop_; }private:void update();static const int kNoneEvent;//static const int kReadEvent;//static const int kWriteEvent;//EventLoop* loop_;//所属的EventLoopconst int  fd_;int        events_;int        revents_;int        index_; // used by Poller.EventCallback readCallback_;//函数指针EventCallback writeCallback_;//函数指针EventCallback errorCallback_;//函数指针
};

成员函数的实现:

#include "Channel.h"
#include "EventLoop.h"
#include "logging/Logging.h"#include <sstream>#include <poll.h>using namespace muduo;const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;Channel::Channel(EventLoop* loop, int fdArg): loop_(loop),fd_(fdArg),events_(0),revents_(0),index_(-1)
{}void Channel::update()
{loop_->updateChannel(this);
}void Channel::handleEvent()//根据poll返回的事件revents_标识符调用相应的事件回调函数
{if (revents_ & POLLNVAL) {LOG_WARN << "Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)) {if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {if (readCallback_) readCallback_();}if (revents_ & POLLOUT) {if (writeCallback_) writeCallback_();}
}

可以看出,class Channel的功能主要在于创建事件pollfd以及相应的回调函数(一个需要监听的事件pollfd对应一个Channel,一个Channel也唯一属于一个EventLoop,相同的事件不需要在不同的EventLoop中重复的监听,逻辑上是没必要的,因此一个事件的Channel具有唯一性,boost::noncopyable),唯一没有介绍的是 enableReading()以及int index,enableReading()代表一个Channel实例已经可读,并调用自身的私有函数update(),继而调用EventLoop的updateChannel()成员函数向EventLoop注册自己,表示自己需要加入到poll的监听队列(此处的队列是一种说法,并非数据结构)中,index用于表明在监听队列中的编号

那么问题来了,具体内部注册具有是怎么实现的呢?不妨将这个问题设为问题1

根据上述问题,EventLoop则需要改变一下,增加新的成员函数,新的代码如下:

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H#include "thread/Thread.h"#include <boost/scoped_ptr.hpp>
#include <vector>namespace muduo
{class Channel;
class Poller;class EventLoop : boost::noncopyable
{public:EventLoop();// force out-line dtor, for scoped_ptr members.~EventLoop();////// Loops forever.////// Must be called in the same thread as creation of the object.///void loop();void quit();//新增// internal use onlyvoid updateChannel(Channel* channel);//新增// void removeChannel(Channel* channel);void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:void abortNotInLoopThread();typedef std::vector<Channel*> ChannelList;//定义保存数据的结构bool looping_; //循环标志位,涉及loop()中的逻辑,使得loop()无法重复运行。bool quit_; //停止标志位const pid_t threadId_;boost::scoped_ptr<Poller> poller_;//拥有一个class Poller的实例ChannelList activeChannels_;//活动事件的列表
};}#endif

成员函数如下:

#include "EventLoop.h"#include "Channel.h"
#include "Poller.h"#include "logging/Logging.h"#include <assert.h>using namespace muduo;__thread EventLoop* t_loopInThisThread = 0;
const int kPollTimeMs = 10000;EventLoop::EventLoop(): looping_(false),quit_(false),threadId_(CurrentThread::tid()),poller_(new Poller(this))
{LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}
}EventLoop::~EventLoop()
{assert(!looping_);t_loopInThisThread = NULL;
}void EventLoop::loop()//loop()函数正式增加了poll监听事件的结构
{assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false;while (!quit_){activeChannels_.clear();//上一次的活跃Channel删除;poller_->poll(kPollTimeMs, &activeChannels_);//调用Class Poller的成员函数,返回活跃的Channel数组for (ChannelList::iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it){(*it)->handleEvent();//调用Channel对应的回调函数}}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}void EventLoop::quit()
{quit_ = true;// wakeup();
}void EventLoop::updateChannel(Channel* channel)
{assert(channel->ownerLoop() == this);assertInLoopThread();poller_->updateChannel(channel);//注册一个Channel的工作转移到了class Poller
}void EventLoop::abortNotInLoopThread()
{LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this<< " was created in threadId_ = " << threadId_<< ", current thread id = " <<  CurrentThread::tid();
}

以上的代码并没有增加太多的成员函数,但也没有解释问题1——Channel是怎么注册的(EventLoop把问题推给了Class Poller),同时在loop()函数中调用了Class Poller的成员函数,私有成员中增加boost::scoped_ptr poller_(这很好理解,Poller内部封装由poll的功能,EventLoop独自拥有一个Class Poller的实例就可以实现监听所有的Channel),至此,所有的问题都推给了Class Poller,代码如下:

class Poller

#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H#include <map>
#include <vector>#include "datetime/Timestamp.h"
#include "EventLoop.h"struct pollfd;namespace muduo
{class Channel;
class Poller : boost::noncopyable
{public:typedef std::vector<Channel*> ChannelList;Poller(EventLoop* loop);~Poller();Timestamp poll(int timeoutMs, ChannelList* activeChannels);//poll成员函数,与上文的poll函数不一样。void updateChannel(Channel* channel);//向Class Poller注册Channel;void assertInLoopThread() { ownerLoop_->assertInLoopThread(); }//emmm,不必多说private:void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;//填充活跃的Channeltypedef std::vector<struct pollfd> PollFdList;//定义了存储pollfd的结构,注意为vector。typedef std::map<int, Channel*> ChannelMap;//Channel的存储结构,Channel并不直接存储pollfd,而是存储struct pollfd中对应的成员,map<int, Channel*>中int为pollfd->fd,也就是时class Channel的成员fd_。EventLoop* ownerLoop_;//PollFdList pollfds_;//ChannelMap channels_;//
};}
#endif

上述中pollfds_为vector,用于高效地随机访问(Channel中保存了在vector中的下标index),channels_使用map来管理,实现高效地查找,删除和插入Channel(尽管当前没有实现删除)。

再看看成员函数的实现:

#include "Poller.h"#include "Channel.h"
#include "logging/Logging.h"#include <assert.h>
#include <poll.h>using namespace muduo;Poller::Poller(EventLoop* loop): ownerLoop_(loop)//保存有所属的EventLoop指针
{}Poller::~Poller()//析构函数什么也不做,不要惊讶,其成员都是stl中的容器,容器内保存的是一个类的实例则自动调用其析构,具体可参考《stl源码剖析》的2.2.3小节;
//如果为指针,需要手动管理析构,但Channel的创建和析构都不属于class Poller的工作范畴,具体可参考下文class TimerQueue的析构。
{}Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels)
{int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);//监听pollfds_的文件描述符,并返回发生事件Timestamp now(Timestamp::now());//时间戳,之后会介绍if (numEvents > 0)//有事件发生{LOG_TRACE << numEvents << " events happended";fillActiveChannels(numEvents, activeChannels);//调用该函数,将活跃的Channel填充至activeChannels中} else if (numEvents == 0) {LOG_TRACE << " nothing happended";} else {LOG_SYSERR << "Poller::poll()";}return now;//返回当前时间戳
}void Poller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const//填充活跃的pollfd对应的Channel添加至activeChannels中
{for (PollFdList::const_iterator pfd = pollfds_.begin();pfd != pollfds_.end() && numEvents > 0; ++pfd)//历遍寻找活跃的pollfd{if (pfd->revents > 0){--numEvents;//numEvents活跃事件数量全部找到则可提前结束历遍。ChannelMap::const_iterator ch = channels_.find(pfd->fd);//assert(ch != channels_.end());Channel* channel = ch->second;assert(channel->fd() == pfd->fd);channel->set_revents(pfd->revents);//填充fd的revents// pfd->revents = 0;activeChannels->push_back(channel);//填充活跃的Channel至activeChannels数组中。}}
}void Poller::updateChannel(Channel* channel)//添加或删除Channel
{assertInLoopThread();LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();if (channel->index() < 0)//Channel的index默认为-1,代表未使用过{// a new one, add to pollfds_assert(channels_.find(channel->fd()) == channels_.end());struct pollfd pfd;//定义结构体pollfd,将Channel的fd,events赋予pfd中pfd.fd = channel->fd();pfd.events = static_cast<short>(channel->events());pfd.revents = 0;pollfds_.push_back(pfd);//pfd将入需要监听的pollfds_数组中。int idx = static_cast<int>(pollfds_.size())-1;//根据在pollfds_数组的位置,设置Channel的index,此时index由-1变为pollfds_.size()-1,代表已经添加过。channel->set_index(idx);channels_[pfd.fd] = channel;//将入map中,可以认为是map<fd,Channel*>} else {// 该Channel已经被添加过一次,则实现对应的Channel更新assert(channels_.find(channel->fd()) != channels_.end());assert(channels_[channel->fd()] == channel);int idx = channel->index();//得到Channel的index,即在pollfds_中的位置assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));struct pollfd& pfd = pollfds_[idx];assert(pfd.fd == channel->fd() || pfd.fd == -1);pfd.events = static_cast<short>(channel->events());//更新eventspfd.revents = 0;if (channel->isNoneEvent()) {// ignore this pollfdpfd.fd = -1;}}
}

class Poller的主要作用是维护EventLoop所拥有的的Channel,其成员函数poll用于事件的监听,并返回活跃事件,以及updateChannel()函数向Poller中添加Channel,不过当前的class Poller不具有移除Channel的作用,只能不断的向pollfds_和channels_添加。

以下为三个class的关键函数loop()和enableReading()的调用循序:

番外

为了给EventLoop根据一定周期执行某个函数,设计出响应的定时器功能,定时器由class TimerId,Timer,TimerQueue,三个来实现的,接下来将一一介绍。

定时任务

首先需要明确两点:1、定时器的作用,无论EventLoop由多少个定时任务,仅需要一个class就可以集中的管理;2、poll是阻塞的,一旦监听的文件描述符没有发生事件,则会阻塞,那么定时任务无法得到及时的执行,所以需要将计时任务作为一个监听事件(也就是Channel)来实现定时的唤醒,计算有哪些计时器到期,执行对应的函数。这样我们就明确了class TimerQueue的功能了!
但在介绍class TimerQueue之前,需要先介绍几个class以便更好地理解定时器的实现:

class Timestamp

class Timestamp的主要作用是保存时间戳(int64_t microSecondsSinceEpoch_;),以及重载了一些运算符,提供了一些如toString(),now()等接口,具体的成员函数的实现不讲了,简单讲一下头文件中成员的作用。

#ifndef MUDUO_BASE_TIMESTAMP_H
#define MUDUO_BASE_TIMESTAMP_H#include "copyable.h"#include <stdint.h>
#include <string>namespace muduo
{class Timestamp : public muduo::copyable
{public:Timestamp();//默认构造函数,时间戳设为0explicit Timestamp(int64_t microSecondsSinceEpoch);//初始化时间戳为microSecondsSinceEpochvoid swap(Timestamp& that)//交换时间戳{std::swap(microSecondsSinceEpoch_, that.microSecondsSinceEpoch_);}std::string toString() const;//时间戳转为strstd::string toFormattedString() const;//同上类似bool valid() const { return microSecondsSinceEpoch_ > 0; }//时间戳是否可用int64_t microSecondsSinceEpoch() const { return microSecondsSinceEpoch_; }//返回时间戳static Timestamp now();//静态函数,返回当前时间的时间戳static Timestamp invalid();//返回一个为0的时间戳static const int kMicroSecondsPerSecond = 1000 * 1000;private:int64_t microSecondsSinceEpoch_;//时间戳以微妙来存储
};inline bool operator<(Timestamp lhs, Timestamp rhs)//重载运算符
{return lhs.microSecondsSinceEpoch() < rhs.microSecondsSinceEpoch();
}inline bool operator==(Timestamp lhs, Timestamp rhs)
{return lhs.microSecondsSinceEpoch() == rhs.microSecondsSinceEpoch();
}inline double timeDifference(Timestamp high, Timestamp low)//时间差
{int64_t diff = high.microSecondsSinceEpoch() - low.microSecondsSinceEpoch();return static_cast<double>(diff) / Timestamp::kMicroSecondsPerSecond;
}
inline Timestamp addTime(Timestamp timestamp, double seconds)//为给定的时间戳增加seconds秒,并返回增加后的时间戳
{int64_t delta = static_cast<int64_t>(seconds * Timestamp::kMicroSecondsPerSecond);return Timestamp(timestamp.microSecondsSinceEpoch() + delta);
}
}
#endif

class Timer

上小节class Timestamp构成class Timer的成员,class Timer顾名思义:定时器!首先来明确定时器的作用,EventLoop可以拥有数个定时任务,那么单个定时器不仅需要保存时间戳,定时时间,是否循环,以及相应的时间回调函数。一下为class Timer的头文件:


#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H#include <boost/noncopyable.hpp>#include "datetime/Timestamp.h"
#include "Callbacks.h"namespace muduo
{class Timer : boost::noncopyable//不可拷贝
{public:Timer(const TimerCallback& cb, Timestamp when, double interval)//构造函数,分别设定回调函数,时间戳,循环间隔,是否重复: callback_(cb),expiration_(when),interval_(interval),repeat_(interval > 0.0){ }void run() const//运行回调函数{callback_();}Timestamp expiration() const  { return expiration_; }//返回时间戳bool repeat() const { return repeat_; }//返回是否重复void restart(Timestamp now);//重启
{if (repeat_){expiration_ = addTime(now, interval_);//默认=调用}else{expiration_ = Timestamp::invalid();//默认=调用}
}private:const TimerCallback callback_;Timestamp expiration_;const double interval_;const bool repeat_;
};}
#endif

class TimerQueue

接下来就是class TimerQueue的正体了,class TimerQueue为定时器队列,先来明确class TimerQueue的实现任务:1、提供对外接口——增加定时任务;2、创建一个timerfd(LInux新增了timerfd作为定时任务,用法和上文的pollfd一样,当超时后发生可读事件,所以同样需要Channel,暂且把这个Channel叫他timerqueue Channel,timerqueue Channel对应一个回调函数),并使用poll来监听,对该文件描述符可读事件进行超时通知,从poll返回后调用timerqueue Channel的回调函数,该回调函数会检查class TimerQueue所有定时器是否到期,调用其到期的Timer的回调函数。上代码直接看:

#ifndef MUDUO_NET_TIMERQUEUE_H
#define MUDUO_NET_TIMERQUEUE_H#include <set>
#include <vector>#include <boost/noncopyable.hpp>#include "datetime/Timestamp.h"
#include "thread/Mutex.h"
#include "Callbacks.h"
#include "Channel.h"namespace muduo
{class EventLoop;
class Timer;
class TimerId;
class TimerQueue : boost::noncopyable//一个EventLoop对应一个定时器队列
{public:TimerQueue(EventLoop* loop);~TimerQueue();TimerId addTimer(const TimerCallback& cb,Timestamp when,double interval);//对外接口,增加定时器,以及设定相关的Timer的回调函数和定时间隔。private:typedef std::pair<Timestamp, Timer*> Entry;typedef std::set<Entry> TimerList;//set内置红黑树,且自动排序,方便找到已经到期的Timervoid handleRead();//TimerQueue的回调函数,poll返回后运行,检查其到期的定时器,并运行Timer的回调函数std::vector<Entry> getExpired(Timestamp now);//以数组形式返回到期的Timer,void reset(const std::vector<Entry>& expired, Timestamp now);//重新设置定时器Timer的时间戳,bool insert(Timer* timer);//插入定时器TimerEventLoop* loop_;//所属的EventLoopconst int timerfd_;//timefd的文件描述符Channel timerfdChannel_;//timefd对应的Channel实例TimerList timers_;//保存定时器Timer
};}
#endif  // MUDUO_NET_TIMERQUEUE_H

成员函数定义如下:

#define __STDC_LIMIT_MACROS
#include "TimerQueue.h"#include "logging/Logging.h"
#include "EventLoop.h"
#include "Timer.h"
#include "TimerId.h"#include <boost/bind.hpp>#include <sys/timerfd.h>namespace muduo
{namespace detail
{int createTimerfd()//非成员函数,创建timerfd文件描述符用于poll监听,注意没有设置超时时间
{int timerfd = ::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK | TFD_CLOEXEC);if (timerfd < 0){LOG_SYSFATAL << "Failed in timerfd_create";}return timerfd;//返回timefd的文件描述符
}struct timespec howMuchTimeFromNow(Timestamp when)//void resetTimerfd(...)需要的函数,用于计算timerfd的超时时间
{int64_t microseconds = when.microSecondsSinceEpoch()- Timestamp::now().microSecondsSinceEpoch();if (microseconds < 100){microseconds = 100;}struct timespec ts;ts.tv_sec = static_cast<time_t>(microseconds / Timestamp::kMicroSecondsPerSecond);ts.tv_nsec = static_cast<long>((microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);return ts;
}void readTimerfd(int timerfd, Timestamp now)//
{uint64_t howmany;ssize_t n = ::read(timerfd, &howmany, sizeof howmany);LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();if (n != sizeof howmany){LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";}
}void resetTimerfd(int timerfd, Timestamp expiration)//设置timerfd的超时时间,每次poll返回timerfd后都需要重新设置,以保持及时唤醒定时器!
{// wake up loop by timerfd_settime()struct itimerspec newValue;struct itimerspec oldValue;bzero(&newValue, sizeof newValue);bzero(&oldValue, sizeof oldValue);newValue.it_value = howMuchTimeFromNow(expiration);int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);if (ret){LOG_SYSERR << "timerfd_settime()";}
}}
}using namespace muduo;
using namespace muduo::detail;TimerQueue::TimerQueue(EventLoop* loop): loop_(loop),//设置所属的EventLooptimerfd_(createTimerfd()),//创建一个,也是唯一一个timerfd,定时器文件描述符,用于poll的IO复用timerfdChannel_(loop, timerfd_),//创建一个Channel,Channel是联系class Poller唯一途径。timers_()//定时器class timer的队列
{timerfdChannel_.setReadCallback(boost::bind(&TimerQueue::handleRead, this));//设置Channel的可读事件的回调函数timerfdChannel_.enableReading();//向EventLoop的成员Poller进行注册timerfd
}TimerQueue::~TimerQueue()
{::close(timerfd_);//关闭tiemrfd,该资源由class TimerQueue创建,声明周期与其一致for (TimerList::iterator it = timers_.begin();it != timers_.end(); ++it){delete it->second;//class Timer的实例由class TimerQueue创建,需要回收其资源}
}TimerId TimerQueue::addTimer(const TimerCallback& cb,Timestamp when,double interval)//添加class timer的实例
{Timer* timer = new Timer(cb, when, interval);//需要负责其析构loop_->assertInLoopThread();bool earliestChanged = insert(timer);//插入set<entry>中,并返回一个bool值,表示是否需要重新设置timerfd的超时时间if (earliestChanged){resetTimerfd(timerfd_, timer->expiration());}return TimerId(timer);
}void TimerQueue::handleRead()//handleRead()为向Channel注册的可读事件的回调函数,主要功能是找到到期的Timer,并执行响应的Timer中的回调函数
{loop_->assertInLoopThread();Timestamp now(Timestamp::now());readTimerfd(timerfd_, now);std::vector<Entry> expired = getExpired(now);//getExpired()返回到期的Timer的vector// safe to callback outside critical sectionfor (std::vector<Entry>::iterator it = expired.begin();it != expired.end(); ++it){it->second->run();//执行Timer的回调函数}reset(expired, now);//重新将Timer加入到set<Entry>中,set会自动对定时器的时间卓先后顺序排序
}std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)//返回到期的Timer的vector
{std::vector<Entry> expired;Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));//设置到期的最大值TimerList::iterator it = timers_.lower_bound(sentry);//找到lower_boundassert(it == timers_.end() || now < it->first);std::copy(timers_.begin(), it, back_inserter(expired));//将到期Timer拷贝至vector<Entry> expired中timers_.erase(timers_.begin(), it);删除set<Entry>中到期Timerreturn expired;//返回到期的Timer
}void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)//将expired中的Timer根据循环间隔重新加入到set<Entry>的Timer队列
{Timestamp nextExpire;for (std::vector<Entry>::const_iterator it = expired.begin();it != expired.end(); ++it){if (it->second->repeat()){it->second->restart(now);insert(it->second);}else{// FIXME move to a free listdelete it->second;}}if (!timers_.empty()){nextExpire = timers_.begin()->second->expiration();}if (nextExpire.valid()){resetTimerfd(timerfd_, nextExpire);}
}bool TimerQueue::insert(Timer* timer)//插入一个timer
{bool earliestChanged = false;Timestamp when = timer->expiration();//返回时间戳TimerList::iterator it = timers_.begin();if (it == timers_.end() || when < it->first)//set<Entry>timers_的第一Entry为最小的时间戳{earliestChanged = true;//需要重设timerfd的超时时间的bool标志}std::pair<TimerList::iterator, bool> result =timers_.insert(std::make_pair(when, timer));//将Timer插入到set<Entry>timers_中assert(result.second);return earliestChanged;//返回需要重设timerfd的超时时间的bool标志
}

洋洋洒洒标注了那么多,估计会看晕,但要点只有3个:

1、从构造函数上看,只有创建timerfd,创建Channel,注册回调函数和Channel,之后的事情就是其他class的问题了。

2、而另一个重要的功能就是添加定时器——TimerQueue::addTimer(const TimerCallback& cb,…),该函数添加一个定时器需要维护timerfd的超时时间(超时时间需要根据timers中最小的时间戳来决定),以及一个定时器列表,该列表根据时间戳从小到大排列,这也是为什么该列表使用set来实现的且set<pair<timestamp,timer*>>中pair<timestamp,timer*>不会出现重复。

3、timerfd超时后调用的回调函数handleRead(),该函数会从set<pair<timestamp,timer*>>取出到期的timer执行相应的回调函数timecallback(),执行完毕后计算timer下次到期的时间戳,再次加入到set<pair<timestamp,timer*>>中。

实际流程图如下:
构造函数注册

poll对timerfd超时后的调用顺序

addtimer函数的调用顺序就不写了,基本也就那么个顺序。
TimerQueue对外接口的函数只有一个,addtimer(),只要在EventLoop中加入TimerQueue,定义相关调用addtimer()成员函数就可实现定时任务。
EventLoop新增:

TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{return timerQueue_->addTimer(cb, time, 0.0);
}TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, cb);
}TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(cb, time, interval);
}private:boost::scoped_ptr<TimerQueue> timerQueue_;

至此定时器暂且还算是大功告成, 需要注意的是,为了不造成数据竞争等严重问题,addtimer()加入了loop_->assertInLoopThread(),因为如果有两个线程A,B,A线程调用addtimer(),B线程为EventLoop的所属线程,并且B线程正在调用了reset()或getExpired(Timestamp now),以上三个函数都会对set容器进行操作,线程安全不法保证! 但加入了断言,使得A线程无法随心所欲的使用addtimer等函数,解决方法是EventLoop加入一个函数队列,如果不在函数执行不在B线程中则加入B线程中EventLoop的函数队列,B线程在poll阻塞结束后执行完时间,再执行函数队列中的函数,当然得考虑B线程一直阻塞的情况,那么这个函数队列永远得不到执行,继而需要增加一个Channel,用于将poll从阻塞中及时唤醒。

那么新的EventLoop如下:

class EventLoop调整

class EventLoop : boost::noncopyable
{public:typedef boost::function<void()> Functor;EventLoop();~EventLoop();void loop();void quit();Timestamp pollReturnTime() const { return pollReturnTime_; }void runInLoop(const Functor& cb);//可以暴露给其他线程的成员,线程安全性得以保证。void queueInLoop(const Functor& cb);//将函数加入函数队列TimerId runAt(const Timestamp& time, const TimerCallback& cb);TimerId runAfter(double delay, const TimerCallback& cb);TimerId runEvery(double interval, const TimerCallback& cb);void wakeup();//使线程从poll阻塞中返回void updateChannel(Channel* channel);void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:void abortNotInLoopThread();void handleRead();  // 事件回调void doPendingFunctors();//执行函数队列typedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */bool quit_; /* atomic */bool callingPendingFunctors_; //正在执行函数队列的标识符,具有深意,能够及时的wakeup()const pid_t threadId_;Timestamp pollReturnTime_;boost::scoped_ptr<Poller> poller_;boost::scoped_ptr<TimerQueue> timerQueue_;int wakeupFd_;//文件描述符,属于EventLoop,对wakeupFd_写入后,poll会从阻塞中返回,及时的唤醒boost::scoped_ptr<Channel> wakeupChannel_;ChannelList activeChannels_;MutexLock mutex_;//锁std::vector<Functor> pendingFunctors_; // 函数队列
};
}#endif

成员函数如下:

#include "EventLoop.h"#include "Channel.h"
#include "Poller.h"
#include "TimerQueue.h"#include "logging/Logging.h"#include <boost/bind.hpp>#include <assert.h>
#include <sys/eventfd.h>using namespace muduo;__thread EventLoop* t_loopInThisThread = 0;
const int kPollTimeMs = 10000;static int createEventfd()//创建一个文件描述符,用于wakeupFd_
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd;
}EventLoop::EventLoop(): looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(CurrentThread::tid()),poller_(new Poller(this)),timerQueue_(new TimerQueue(this)),wakeupFd_(createEventfd()),//创建一个文件文件描述符wakeupChannel_(new Channel(this, wakeupFd_))//创建相应的 wakeupFd_的Channel
{LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(boost::bind(&EventLoop::handleRead, this));//绑定回调函数// we are always reading the wakeupfdwakeupChannel_->enableReading();//向poller注册Channel
}EventLoop::~EventLoop()
{assert(!looping_);::close(wakeupFd_);t_loopInThisThread = NULL;
}void EventLoop::loop()
{assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false;while (!quit_){activeChannels_.clear();pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (ChannelList::iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it){(*it)->handleEvent();}doPendingFunctors();//新增,执行任务队列}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}void EventLoop::quit()
{quit_ = true;if (!isInLoopThread()){wakeup();//新增,从poll阻塞唤醒当前线程,并结束loop循环}
}void EventLoop::runInLoop(const Functor& cb)//暴露给外不线程的函数
{if (isInLoopThread()){cb();//在所属线程中直接执行}else{queueInLoop(cb);//加入所属的EventLoop的函数队列,并唤醒EventLoop的线程}
}void EventLoop::queueInLoop(const Functor& cb)//加入所属的EventLoop的函数队列,并唤醒EventLoop的线程
{{MutexLockGuard lock(mutex_);//上锁pendingFunctors_.push_back(cb);}if (!isInLoopThread() || callingPendingFunctors_)//唤醒线程的条件中有callingPendingFunctors_,因为如果执行函数队列PendingFunctors_时有可能调用queueinloop(),!isInLoopThread()部分返回false,,无法执行wakeup(),无法被及时的唤醒{wakeup();}
}TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{return timerQueue_->addTimer(cb, time, 0.0);
}TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, cb);
}TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(cb, time, interval);
}void EventLoop::updateChannel(Channel* channel)
{assert(channel->ownerLoop() == this);assertInLoopThread();poller_->updateChannel(channel);
}void EventLoop::abortNotInLoopThread()
{LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this<< " was created in threadId_ = " << threadId_<< ", current thread id = " <<  CurrentThread::tid();
}void EventLoop::wakeup()//唤醒,对wakeupFd_文件描述符写入,poll从监听阻塞中返回事件的发生
{uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}void EventLoop::handleRead()//回调函数
{uint64_t one = 1;ssize_t n = ::read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";}
}void EventLoop::doPendingFunctors()//执行函数队列,
{std::vector<Functor> functors;callingPendingFunctors_ = true;{MutexLockGuard lock(mutex_);functors.swap(pendingFunctors_);//交换函数队列,减少锁的持有时间,并且避免了函数队列中函数有可能执行queueinloop(),造成无限制的死循环的情况。}for (size_t i = 0; i < functors.size(); ++i){functors[i]();}callingPendingFunctors_ = false;
}

有了runInLoop()函数就可以对EventLoop的公有函数进行调整,实现公有函数在不同线程调用的安全性。

代码如下,因为很简单就不注释了:

TimerId TimerQueue::addTimer(const TimerCallback& cb,Timestamp when,double interval)
{Timer* timer = new Timer(cb, when, interval);loop_->runInLoop(boost::bind(&TimerQueue::addTimerInLoop, this, timer));return TimerId(timer);
}void TimerQueue::addTimerInLoop(Timer* timer)
{loop_->assertInLoopThread();bool earliestChanged = insert(timer);if (earliestChanged){resetTimerfd(timerfd_, timer->expiration());}
}

在线程中创建并执行EventLoop

class EventLoopThread
上文EventLoop已经实现了大部分的功能,但无法在线程中使用,具体以代码展示来说明:

#include "EventLoop.h"
#include <stdio.h>muduo::EventLoop* g_loop;
int g_flag = 0;void run4()
{printf("run4(): pid = %d, flag = %d\n", getpid(), g_flag);g_loop->quit();
}void run3()
{printf("run3(): pid = %d, flag = %d\n", getpid(), g_flag);g_loop->runAfter(3, run4);g_flag = 3;
}void run2()
{printf("run2(): pid = %d, flag = %d\n", getpid(), g_flag);g_loop->queueInLoop(run3);
}void run1()
{g_flag = 1;printf("run1(): pid = %d, flag = %d\n", getpid(), g_flag);g_loop->runInLoop(run2);g_flag = 2;
}int main()
{printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);muduo::EventLoop loop;g_loop = &loop;loop.runAfter(2, run1);loop.loop();printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
}

在该程序中loop只能在该进程中执行,无法直接放入一个新线程中执行,或者说需要每次写一个函数来执行EventLoop的创建和运行,再将这个函数放入新线程中执行,这样不如直接提供一个线程运行的class,用于在新线程中创建EventLoop的实例,并执行loop(),返回EventLoop的指针,代码如下:


#ifndef MUDUO_NET_EVENTLOOPTHREAD_H
#define MUDUO_NET_EVENTLOOPTHREAD_H#include "thread/Condition.h"
#include "thread/Mutex.h"
#include "thread/Thread.h"#include <boost/noncopyable.hpp>namespace muduo
{class EventLoop;class EventLoopThread : boost::noncopyable
{public:EventLoopThread();~EventLoopThread();EventLoop* startLoop();private:void threadFunc();EventLoop* loop_;bool exiting_;Thread thread_;MutexLock mutex_;Condition cond_;
};}#endif

成员函数如下:

EventLoopThread::EventLoopThread(): loop_(NULL),exiting_(false),thread_(boost::bind(&EventLoopThread::threadFunc, this)),//新线程执行函数的绑定mutex_(),cond_(mutex_)
{}EventLoopThread::~EventLoopThread()
{exiting_ = true;loop_->quit();thread_.join();
}EventLoop* EventLoopThread::startLoop()//启动线程
{assert(!thread_.started());thread_.start();{MutexLockGuard lock(mutex_);while (loop_ == NULL){cond_.wait();}}return loop_;
}void EventLoopThread::threadFunc()//创建EventLoop,并运行loop
{EventLoop loop;{MutexLockGuard lock(mutex_);loop_ = &loop;cond_.notify();}loop.loop();//assert(exiting_);
}

至此,muduo网络库的基本框架已经介绍完毕,第二章网络库的实现同样是基于第一章的结构之上。。。

未完待续

muduo网络库浅谈(一)相关推荐

  1. muduo网络库源码阅读Step by Step

    Posted on: Nov 26 2015 Categories: muduo C++ Tags: muduo 一般写服务端程序都需要有一个称手的网络库来帮我们处理琐碎的网络通信细节,比如连接的建立 ...

  2. muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool

    muduo是支持多线程的网络库,在muduo网络库学习(七)用于创建服务器的类TcpServer中也提及了TcpServer中有一个事件驱动循环线程池,线程池中存在大量线程,每个线程运行一个Event ...

  3. muduo网络库学习(七)用于创建服务器的类TcpServer

    目前为止,涉及到的绝大多数操作都没有提及线程,EventLoop,Poller,Channel,Acceptor,TcpConnection,这些对象的执行都是在单独线程完成,并没有设计多线程的创建销 ...

  4. muduo网络库学习(四)事件驱动循环EventLoop

    muduo的设计采用高并发服务器框架中的one loop per thread模式,即一个线程一个事件循环. 这里的loop,其实就是muduo中的EventLoop,所以到目前为止,不管是Polle ...

  5. 基于C++11的muduo网络库

    文章目录 写在前面 项目编译问题 库安装的问题 项目测试代码 关于压力测试 项目概述 muduo网络库的reactor模型 muduo的设计 muduo各个类 辅助类 NonCopyable Time ...

  6. muduo网络库源码复现笔记(十七):什么都不做的EventLoop

    Muduo网络库简介 muduo 是一个基于 Reactor 模式的现代 C++ 网络库,作者陈硕.它采用非阻塞 IO 模型,基于事件驱动和回调,原生支持多核多线程,适合编写 Linux 服务端多线程 ...

  7. muduo网络库的封装

    一.基础socket编程 网络编程的底层离不开socket,其处理流程表示如下: int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockadd ...

  8. muduo网络库使用心得

    上个月看了朋友推荐的mudo网络库,下完代码得知是国内同行的开源作品,甚是敬佩.下了mudo使用手冊和035版的代码看了下结构,感觉是一个比較成熟并且方便使用的网络库.本人手头也有自己的网络库,尽管不 ...

  9. Muduo网络库核心梳理

    Muduo网络库 Muduo网络库本身并不复杂,是一个新手入门C++面向对象网络编程的经典实战项目.但是,新手在刚刚上手读代码的时候,非常容易陷入代码的汪洋大海,迷失方向.本文旨在简要梳理Muduo网 ...

最新文章

  1. 学习RadonDB源码(三)
  2. ubuntu下装Source Insight
  3. python【力扣LeetCode算法题库】289- 生命游戏
  4. eclipse配置jdk问题
  5. leetcode 224. Basic Calculator | 224. 基本计算器(中缀表达式求值)
  6. IOS开发基础之团购案例17-xib和UITableView两种方式实现
  7. 中秋节公司发了这个(结尾分享红包)
  8. C++中的位域(bit-filed):一种节省空间的成员
  9. 邮箱不可用 550 User has no permission
  10. eds800变频器故障代码_干货|三菱变频器故障剖析,及严重故障和轻微故障判断技巧!...
  11. 灰色模型 java代码_灰色模型的简单Java实现
  12. 计算机病毒级防范措施总结,计算机病毒及防范措施
  13. 能上QQ,不能打开网页
  14. 神经网络中的filter(滤波器)和kernel(内核)的概念
  15. 6个Web前端值得收藏很实用的菜单模板(上)
  16. pygame设计舒尔特方格游戏python舒尔特方格小程序
  17. Android 使用adb查看和修改电池信息
  18. excel 根据某单元格的值设置整行颜色(条件格式)
  19. OC Extension Font(字体宏定义)
  20. 从耦合微带线到近、远端串扰

热门文章

  1. 以太坊生成合约地址以及存在的账户碰撞
  2. 网页是由html和什么组成,【填空题】网站中,各个网页是由________联系起来的。...
  3. 05【画图】——EXCEL画统计图
  4. 超全机械设备 机器3dm犀牛资源素材网站整理
  5. 优品购第一天demo创建测试
  6. python 函数作用于矩阵_图解NumPy:常用函数的内在机制
  7. 机器学习——2(大数据金融风控的预测案例学习总结)
  8. 光伏“锁定效应”是技术进步最大障碍
  9. 某服务器软件系统对可用性,某服务器软件系统对可用性(Availability)和性能(Performance)要求较高,()设计策略能提高该系统...
  10. c语言非静态成员引用方式,C++中静态成员函数访问非静态成员的实例