今天来学习一下muduo源码中client和server间的大致通信流程,以echo服务为例,先看一下echo对面的main函数代码。

#include "examples/simple/echo/echo.h"#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"#include <unistd.h>// using namespace muduo;
// using namespace muduo::net;int main()
{LOG_INFO << "pid = " << getpid();       //getpid()获取进程号muduo::net::EventLoop loop;muduo::net::InetAddress listenAddr(2007);  //绑定IP地址和端口EchoServer server(&loop, listenAddr);server.start();loop.loop();
}

先实例化一个loop,这会调用loop的构造函数,看一下loop的构造函数。

EventLoop::EventLoop(): looping_(false),                                     //判断是否在loopquit_(false),                                        //判断是否退出的标志eventHandling_(false),                               //处理handevent的标志callingPendingFunctors_(false),                      //判断当前是不是在执行方法队列iteration_(0),threadId_(CurrentThread::tid()),                     //当前线程IDpoller_(Poller::newDefaultPoller(this)),             //创建一个 poll 或 epoll 对象timerQueue_(new TimerQueue(this)),                   //创建一个计时器wakeupFd_(createEventfd()),                          //发送唤醒loop消息的描述符,随便写点消息即可唤醒wakeupChannel_(new Channel(this, wakeupFd_)),        //wakeupChannel_用来自己给自己通知的一个通道,该通道会纳入到poller来管理currentActiveChannel_(NULL)                          //当前活跃的channel链表指针
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread)                                //判断是否是本线程的loop,是一个loop类型的指针{LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;        //用LOG_FATAL终止abort它}else{t_loopInThisThread = this; //this赋给线程局部数据指针}//设定wakeupChannel的回调函数,即EventLoop自己的的handleRead函数wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));          //channel->handleEventWithGuard会调用到handleRead// we are always reading the wakeupfdwakeupChannel_->enableReading();    //注册wakeupFd_到poller
}

loop就相当于一个reactor,loop构造函数new了epoll,创建了唤醒描述符,并注册到epoll。接着对EchoServer进行了构造,调用了start函数。

EchoServer::EchoServer(muduo::net::EventLoop* loop,const muduo::net::InetAddress& listenAddr): server_(loop, listenAddr, "EchoServer")
{server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1));         //连接到达server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3));      //消息到达
}void EchoServer::start()
{server_.start();
}

EchoServer构造函数中设置了连接达到和消息达到的回调函数,并构建了TcpServer。再看TcpServer的构造函数。

TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(CHECK_NOTNULL(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),nextConnId_(1)
{ /* * 设置回调函数,当有客户端请求时,Acceptor接收客户端请求,然后调用这里设置的回调函数* 回调函数用于创建TcpConnection连接*/acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer构造函数new了一个acceptor_,设置了当有新连接达到时该调用的回调函数。看一下acceptor的构造函数。

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),//初始化创建sockt fdacceptChannel_(loop, acceptSocket_.fd()),//初始化channellistenning_(false),idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{assert(idleFd_ >= 0);acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));// //有可读事件,当fd可读时调用回调函数hanleRead
}

Acceptor构造函数中设置了当有可读事件是该调用的构造函数,然后start()函数和loop()函数开始调用。

void TcpServer::start()
{if (started_.getAndSet(1) == 0){threadPool_->start(threadInitCallback_);//启动线程池,threadInitCallback_创建好所有线程后调用的回调函数assert(!acceptor_->listenning());loop_->runInLoop(       //直接调用linsten函数std::bind(&Acceptor::listen, get_pointer(acceptor_)));}
}
void EventLoop::loop()
{assert(!looping_);assertInLoopThread(); //事件循环必须在IO线程中,即创建该evenloop的线程looping_ = true;quit_ = false;  // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear();                            //activeChannels_是一个vector等待io复用函数返回pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //调用poll返回活动的事件,有可能是唤醒返回的++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priority  按优先级排序//处理IO事件eventHandling_ = true;for (Channel* channel : activeChannels_)            //遍历通道来进行处理{currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);  //pollReturnTime_是poll返回的时刻}currentActiveChannel_ = NULL;                       //处理完了赋空eventHandling_ = false;//执行方法队列中的方法[方法队列functors,我们可以跨线程的往里面添加新的方法,这些方法会在处理完io事件后执行]doPendingFunctors();                                //这个设计也能够进行计算任务}
void Acceptor::listen()
{loop_->assertInLoopThread();  //保证是在IO线程listenning_ = true;acceptSocket_.listen();acceptChannel_.enableReading(); 注册可读事件
}

start函数中在loop里面调用了Acceptor的listen函数注册可读事件(将可读事件加入epoll),接着loop函数在while循环里面调用poller_->poll()。接着看一下epoll的poll函数。

Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{LOG_TRACE << "fd total count " << channels_.size();int numEvents = ::epoll_wait(epollfd_,&*events_.begin(),  //events_已初始化static_cast<int>(events_.size()),    //监控套接字的数目timeoutMs);int savedErrno = errno;Timestamp now(Timestamp::now());if (numEvents > 0){LOG_TRACE << numEvents << " events happened";fillActiveChannels(numEvents, activeChannels);if (implicit_cast<size_t>(numEvents) == events_.size()) //如果返回的事件数目等于当前事件数组大小,就分配2倍空间{events_.resize(events_.size()*2);}}else if (numEvents == 0){LOG_TRACE << "nothing happened";}else{// error happens, log uncommon onesif (savedErrno != EINTR){errno = savedErrno;LOG_SYSERR << "EPollPoller::poll()";}}return now;
}

代码中有调用epoll_wait函数。

当有新连接到达时,epoll返回,currentActiveChannel_->handleEvent(上述loop.loop函数中)被调用,看一下对应代码。

//处理所有发生的事件,如果活着,底层调用handleEventWithGuard
void Channel::handleEvent(Timestamp receiveTime) 事件到来调用handleEvent处理
{std::shared_ptr<void> guard;       //守护if (tied_){guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);}
}//处理所有发生的事件
//EPOLLIN :表示对应的文件描述符可以读;
//EPOLLOUT:表示对应的文件描述符可以写;
//EPOLLPRI:表示对应的文件描述符有紧急的数据可读
//EPOLLERR:表示对应的文件描述符发生错误;
//EPOLLHUP:表示对应的文件描述符被挂断;
//EPOLLET:表示对应的文件描述符有事件发生;
void Channel::handleEventWithGuard(Timestamp receiveTime)
{                   eventHandling_ = true;         LOG_TRACE << reventsToString();if ((revents_ & POLLHUP) && !(revents_ & POLLIN))  //判断返回事件类型{if (logHup_){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";}if (closeCallback_) closeCallback_();}if (revents_ & POLLNVAL)                 //不合法文件描述符{LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)){if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))   //POLLRDHUP是对端关闭连接事件,如shutdown等{if (readCallback_) readCallback_(receiveTime);}if (revents_ & POLLOUT){if (writeCallback_) writeCallback_();}eventHandling_ = false;
}

是可读事件,readCallback_被调用,[acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this))]有设置,则Acceptor::handleRead()函数被调用。

void Acceptor::handleRead()      //新连接到达由acccptor处理
{loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);//这里时真正接收连接if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr);//将新连接信息传送到回调函数中}else//没有回调函数则关闭client对应的fd{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}}
}

newConnectionCallback_被调用,[acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));],则调用tcpServer中的newConnection函数。

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();//从事件驱动线程池中取出一个线程给TcpConnection /* 为TcpConnection生成独一无二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根据sockfd获取tcp连接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 创建一个新的TcpConnection代表一个Tcp连接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));/* 添加到所有tcp 连接的map中,键是tcp连接独特的名字(服务器名+客户端<地址,端口>) */connections_[connName] = conn;/* 为tcp连接设置回调函数(由用户提供) */conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);/* * 关闭回调函数,由TcpServer设置,作用是将这个关闭的TcpConnection从map中删除* 当poll返回后,发现被激活的原因是EPOLLHUP,此时需要关闭tcp连接* 调用Channel的CloseCallback,进而调用TcpConnection的handleClose,进而调用removeConnection*/conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe/* * 连接建立后,调用TcpConnection连接建立成功的函数* 1.新建的TcpConnection所在事件循环是在事件循环线程池中的某个线程* 2.所以TcpConnection也就属于它所在的事件驱动循环所在的那个线程* 3.调用TcpConnection的函数时也就应该在自己所在线程调用* 4.所以需要调用runInLoop在自己的那个事件驱动循环所在线程调用这个函数* 5.当前线程是TcpServer的主线程,不是TcpConnection的线程,如果在这个线程直接调用会阻塞监听客户端请求* 6.其实这里不是因为线程不安全,即使在这个线程调用也不会出现线程不安全,因为TcpConnection本就是由这个线程创建的*/ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

connectionCallback_和messageCallback_就是echo.cc中设置的,connectEstablished会调用了用户设置的connectionCallback.

新连接到达之后就new了一个TcpConnection,看一下TcpConnection的构造函数。

TcpConnection::TcpConnection(EventLoop* loop,const string& nameArg,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr): loop_(CHECK_NOTNULL(loop)),name_(nameArg),state_(kConnecting),                  //输入输出缓冲区reading_(true),socket_(new Socket(sockfd)),            //RAII管理已连接套接字channel_(new Channel(loop, sockfd)),  //使用Channel管理套接字上的读写localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64*1024*1024)
{//设置事件分发器的各事件回调  (将TcpConnection类的四个事件处理函数设置为事件分发器对应的回调函数)channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this<< " fd=" << sockfd;socket_->setKeepAlive(true);
}

设置了套接字可读,可读对应的回调函数。当client发送数据过来时,TcpConnection::handleRead被调用。

void TcpConnection::handleRead(Timestamp receiveTime)
{loop_->assertInLoopThread();int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){//调用回调函数,使用shared_from_this()得到自身的shared_ptr,延长了该对象的生命期,保证了它的生命期长过messageCallback_函数,messageCallback_能安全的使用它messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0){handleClose();}else{errno = savedErrno;LOG_SYSERR << "TcpConnection::handleRead";handleError();}
}

最终调用到了用户设置的messageCallback_,到此流程大致分析完毕。

tcpserver:    setNewConnectionCallback   //新连接到达(新连接到达由acccptor处理)acceptor:setReadCallback            //有可读事件linsten:enableReading   //注册可读事件echo:setConnectionCallback   //连接到达setMessageCallback         //数据到达TcpConnection:setReadCallback        //可读事件setWriteCallback       //可写事件

上面是各个类设置的对应的回调函数,当新连接达到时,acceptor处理,然后到tcpserver,然后到TcpConnection,在后面就是TcpConnection和echo进行交互。

muduo源码client/server通信流程相关推荐

  1. openxr runtime Monado 源码解析 源码分析:CreateInstance流程(设备系统和合成器系统)Compositor comp_main client compositor

    monado系列文章索引汇总: openxr runtime Monado 源码解析 源码分析:源码编译 准备工作说明 hello_xr解读 openxr runtime Monado 源码解析 源码 ...

  2. 深入浅出 Redis client/server交互流程

    2019独角兽企业重金招聘Python工程师标准>>> 最近笔者阅读并研究redis源码,在redis客户端与服务器端交互这个内容点上,需要参考网上一些文章,但是遗憾的是发现大部分文 ...

  3. bluetoothd源码剖析(一)启动流程

    蓝牙系列: bluez调试笔记_weixin_41069709的博客-CSDN博客_bluezbluez移植https://blog.csdn.net/weixin_41069709/article/ ...

  4. 【Android 10 源码】MediaRecorder 录像流程:MediaRecorder 初始化

    MediaRecorder 用于录制音频和视频,录制控制基于一个简单的状态机.下面是典型的使用 camera2 API 录制,需要使用到的 MediaRecorder API. MediaRecord ...

  5. 【Android 10 源码】MediaRecorder 录像流程:MediaRecorder 配置

    MediaRecorder 录像配置主要涉及输出文件路径.音频来源.视频来源.输出格式.音频编码格式.视频编码格式.比特率.帧率和视频尺寸等. 我们假设视频输入源来自 Camera,Camera2 A ...

  6. 【Muduo源码剖析笔记】 网络库之Acceptor、Connector

    [Muduo源码剖析笔记] 网络库之Acceptor.Connector Acceptor typedef std::function<void (int sockfd, const InetA ...

  7. Retrofit2源码解析——网络调用流程(下)

    Retrofit2源码解析系列 Retrofit2源码解析(一) Retrofit2源码解析--网络调用流程(上) 本文基于Retrofit2的2.4.0版本 implementation 'com. ...

  8. android 输入法如何启动流程_android输入法02:openwnn源码解析01—输入流程

    android 输入法 02:openwnn 源码解析 01-输入流程 之后要开始 android 日文输入法的测试,因此现在开始研究 android 输入法.之前两 篇文章已经对 android 自 ...

  9. 【Flink】 Flink 源码之 SQL 执行流程

    1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...

最新文章

  1. 三十二、图的创建深度优先遍历(DFS)广度优先遍历(BFS)
  2. china-pub计算机图书最新一周排行榜
  3. uvm_dpi——DPI在UVM中的实现(一)
  4. redis lua 抽奖 PHP,通过redis+lua实现加减库存
  5. jquery实现页面提示,数据正在加载中。(
  6. python实例 89,90
  7. 从gb2py.idx中获取一个汉字的拼音首字母
  8. Qt下的模态和非模态对话框
  9. LeetCode 338. 比特位计数(动态规划)
  10. VB 6.0使用api
  11. Android 音视频深入 十九 使用ijkplayer做个视频播放器(附源码下载)
  12. 正则去除汉字和只取数字
  13. CSS 动画指南: 原理和实战 (一)
  14. Android开发学习之路-带文字的图片分享
  15. Cadence Orcad Capture定时保存功能介绍图文视频教程
  16. Android重新分区parted,分区工具parted的详解及常用分区使用方法【转】
  17. 宝德Pr2500y服务器装系统,GP2500-LG41-24V
  18. Excel中的美元符号$
  19. 拉里佩奇:专注未来(ted)
  20. 日语学习(简单语法-2)

热门文章

  1. flutter_web 实战之文章列表与详情
  2. Linux服务源码安装后开机自启动04-php-fpm
  3. PIE_SDK.NET功能表
  4. python+appium+PyCharm==自动化测试APP环境
  5. es6-let 和 const
  6. Find Minimum in Rotated Sorted Array II
  7. jQuery父级以及同级元素查找
  8. Delphi 调用外部程序并等待其运行结束
  9. 用外观判断论文好坏?这位顶会领域主席的论文被自己的AI审稿系统拒绝了
  10. 想要自学深度学习?不用GPU,浏览器就够了