目录

1、muduo的IO模型

2、为什么 non-blocking 网络编程中应用层 buffer 是必须的?

2.1 TcpConnection 必须要有 output buffer

2.2 TcpConnection 必须要有 input buffer

3、Muduo Buffer 的数据结构

3.1 muduo buffer类设计

3.2 核心函数分析

4、参考


1、muduo的IO模型

Unix/Linux 上常见的五种 IO 模型:阻塞(blocking)、非阻塞(non-blocking)、IO 复用(IO multiplexing)、信号驱动(signal-driven)、异步(asynchronous)。这些都是单线程下的 IO 模型。

如果采用 one loop per thread 的模型,多线程服务端编程的问题就简化为如何设计一个高效且易于使用的 event loop,然后每个线程 run 一个 event loop 就行了(当然、同步和互斥是不可或缺的)。在“高效”这方面已经有了很多成熟的范例(libev、libevent、memcached、varnish、lighttpd、nginx)。

event loop 是 non-blocking 网络编程的核心,在现实生活中,non-blocking 几乎总是和 IO-multiplexing 一起使用,原因有两点:

  • 没有人真的会用轮询 (busy-pooling) 来检查某个 non-blocking IO 操作是否完成,这样太浪费 CPU cycles。
  • IO-multiplex 一般不能和 blocking IO 用在一起,因为 blocking IO 中 read()/write()/accept()/connect() 都有可能阻塞当前线程,这样线程就没办法处理其他 socket 上的 IO 事件了。见 UNPv1 第 16.6 节“nonblocking accept”的例子。

2、为什么 non-blocking 网络编程中应用层 buffer 是必须的?

Non-blocking IO 的核心思想是避免阻塞在 read() 或 write() 或其他 IO 系统调用上,这样可以最大限度地复用 thread-of-control,让一个线程能服务于多个 socket 连接。IO 线程只能阻塞在 IO-multiplexing 函数上,如 select()/poll()/epoll_wait()。这样一来,应用层的缓冲是必须的,每个 TCP socket 都要有 stateful 的 input buffer 和 output buffer。

2.1 TcpConnection 必须要有 output buffer

考虑一个常见场景:程序想通过 TCP 连接发送 100k 字节的数据,但是在 write() 调用中,操作系统只接受了 80k 字节,你肯定不想在原地等待,因为不知道会等多久(取决于对方什么时候接受数据,然后滑动 TCP 窗口)。程序应该尽快交出控制权,返回 event loop。在这种情况下,剩余的 20k 字节数据怎么办?

对于应用程序而言,它只管生成数据,它不应该关心到底数据是一次性发送还是分成几次发送,这些应该由网络库来操心,程序只要调用 TcpConnection::send() 就行了,网络库会负责到底。网络库应该接管这剩余的 20k 字节数据,把它保存在该 TCP connection 的 output buffer 里,然后注册 POLLOUT 事件,一旦 socket 变得可写就立刻发送数据。当然,这第二次 write() 也不一定能完全写入 20k 字节,如果还有剩余,网络库应该继续关注 POLLOUT 事件;如果写完了 20k 字节,网络库应该停止关注 POLLOUT,以免造成 busy loop。(Muduo EventLoop 采用的是 epoll level trigger,这么做的具体原因我以后再说。)

如果程序又写入了 50k 字节,而这时候 output buffer 里还有待发送的 20k 数据,那么网络库不应该直接调用 write(),而应该把这 50k 数据 append 在那 20k 数据之后,等 socket 变得可写的时候再一并写入。

如果 output buffer 里还有待发送的数据,而程序又想关闭连接(对程序而言,调用 TcpConnection::send() 之后他就认为数据迟早会发出去),那么这时候网络库不能立刻关闭连接,而要等数据发送完毕,使用shutdown关闭写作操,发送FIN消息,对方read到0,执行关闭连接,本分read到0,同样关闭连接(读方向)(详情见:为什么 muduo 的 shutdown() 没有直接关闭 TCP 连接?) 。

综上,要让程序在 write 操作上不阻塞,网络库必须要给每个 tcp connection 配置 output buffer。

2.2 TcpConnection 必须要有 input buffer

TCP 是一个无边界的字节流协议,接收方必须要处理“收到的数据尚不构成一条完整的消息”和“一次收到两条消息的数据”等等情况。一个常见的场景是,发送方 send 了两条 10k 字节的消息(共 20k),接收方收到数据的情况可能是:

  • 一次性收到 20k 数据
  • 分两次收到,第一次 5k,第二次 15k
  • 分两次收到,第一次 15k,第二次 5k
  • 分两次收到,第一次 10k,第二次 10k
  • 分三次收到,第一次 6k,第二次 8k,第三次 6k
  • 其他任何可能

网络库在处理“socket 可读”事件的时候,必须一次性把 socket 里的数据读完(从操作系统 buffer 搬到应用层 buffer),否则会反复触发 POLLIN 事件,造成 busy-loop。(Again, Muduo EventLoop 采用的是 epoll level trigger,这么做的具体原因我以后再说。)

那么网络库必然要应对“数据不完整”的情况,收到的数据先放到 input buffer 里,等构成一条完整的消息再通知程序的业务逻辑。这通常是 codec 的职责,见陈硕《Muduo 网络编程示例之二:Boost.Asio 的聊天服务器》一文中的“TCP 分包”的论述与代码。

所以,在 tcp 网络编程中,网络库必须要给每个 tcp connection 配置 input buffer。

所有 muduo 中的 IO 都是带缓冲的 IO (buffered IO),你不会自己去 read() 或 write() 某个 socket,只会操作 TcpConnection 的 input buffer 和 output buffer。更确切的说,是在 onMessage() 回调里读取 input buffer;调用 TcpConnection::send() 来间接操作 output buffer,一般不会直接操作 output buffer。

muduo 的 onMessage() 的原型如下,它既可以是 free function,也可以是 member function,反正 muduo TcpConnection 只认 boost::function<>。

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime);

对于网络程序来说,一个简单的验收测试是:输入数据每次收到一个字节(200 字节的输入数据会分 200 次收到,每次间隔 10 ms),程序的功能不受影响。对于 Muduo 程序,通常可以用 codec 来分离“消息接收”与“消息处理”,见陈硕《在 muduo 中实现 protobuf 编解码器与消息分发器》一文中对“编解码器 codec”的介绍。

如果某个网络库只提供相当于 char buf[8192] 的缓冲,或者根本不提供缓冲区,而仅仅通知程序“某 socket 可读/某 socket 可写”,要程序自己操心 IO buffering,这样的网络库用起来就很不方便了。(我有所指,你懂得。)

3、Muduo Buffer 的数据结构

Buffer 的内部是一个 vector of char,它是一块连续的内存。此外,Buffer 有两个 data members,指向该 vector 中的元素。这两个 indices 的类型是 int,不是 char*,目的是应对迭代器失效。muduo Buffer 的设计参考了 Netty 的 ChannelBuffer 和 libevent 1.4.x 的 evbuffer。不过,其 prependable 可算是一点“微创新”。

Muduo Buffer 的数据结构如下:

两个 indices 把 vector 的内容分为三块:prependable、readable、writable,各块的大小是(公式一):

  • prependable = readIndex
  • readable = writeIndex - readIndex
  • writable = size() - writeIndex

(prependable 的作用留到后面讨论。)

readIndex 和 writeIndex 满足以下不变式(invariant):

  • 0 ≤ readIndex ≤ writeIndex ≤ data.size()

Muduo Buffer 里有两个常数 kCheapPrepend 和 kInitialSize,定义了 prependable 的初始大小和 writable 的初始大小。(readable 的初始大小为 0。)在初始化之后,Buffer 的数据结构如下:括号里的数字是该变量或常量的值。

3.1 muduo buffer类设计

首先来看一下Buffer的部分类成员和构造函数:


class Buffer : public muduo::copyable
{
public:explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),   //总大小为1032readerIndex_(kCheapPrepend),   //初始指向8writerIndex_(kCheapPrepend)    //初始指向8{assert(readableBytes() == 0);assert(writableBytes() == initialSize);assert(prependableBytes() == kCheapPrepend);}public:static const size_t kCheapPrepend = 8;    //默认预留8个字节static const size_t kInitialSize = 1024;   //初始大小
private:std::vector<char> buffer_;    //vector用于替代固定数组size_t readerIndex_;            //读位置size_t writerIndex_;             //写位置//const char Buffer::kCRLF[] = "\r\n";static const char kCRLF[];      //'\r\n',使用柔性数组
}

下面是Buffer获取各个长度的方法:

//可读大小size_t readableBytes() const{ return writerIndex_ - readerIndex_; }//可写大小size_t writableBytes() const{ return buffer_.size() - writerIndex_; }//预留大小size_t prependableBytes() const{ return readerIndex_; }//读的下标const char* peek() const{ return begin() + readerIndex_; }

3.2 核心函数分析

核心函数一:readFd()函数:


//结合栈上空间,避免内存使用过大,提高内存使用率
//如果有10K个连接,每个连接就分配64K缓冲区的话,将占用640M内存
//而大多数时候,这些缓冲区的使用率很低
ssize_t Buffer::readFd(int fd, int* savedErrno)
{// saved an ioctl()/FIONREAD call to tell how much to read//节省一次ioctl系统调用(获取当前有多少可读数据)//为什么这么说?因为我们准备了足够大的extrabuf,那么我们就不需要使用ioctl取查看fd有多少可读字节数了char extrabuf[65536];//使用iovec分配两个连续的缓冲区struct iovec vec[2];const size_t writable = writableBytes();//第一块缓冲区,指向可写空间vec[0].iov_base = begin()+writerIndex_;vec[0].iov_len = writable;//第二块缓冲区,指向栈上空间vec[1].iov_base = extrabuf;vec[1].iov_len = sizeof extrabuf;// when there is enough space in this buffer, don't read into extrabuf.// when extrabuf is used, we read 128k-1 bytes at most.const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;  //writeable一般小于65536const ssize_t n = sockets::readv(fd, vec, iovcnt);  //iovcnt=2if (n < 0){*savedErrno = errno;}else if (implicit_cast<size_t>(n) <= writable)   //第一块缓冲区足够容纳{writerIndex_ += n;   //直接加n}else   //当前缓冲区,不够容纳,因而数据被接受到了第二块缓冲区extrabuf,将其append至buffer{writerIndex_ = buffer_.size();   //先更显当前writerIndexappend(extrabuf, n - writable);   //然后追加剩余的再进入buffer当中}return n;
}

由于该操作在网络程序中执行的很频繁,为了避免每次使用系用调用ioctl查看可读字节数,所以muduo在这里使用了一个65536个字节足够大的缓冲区。但是这个缓冲区怎么用呢?要知道怎么用我们先来看一下iovec结构体:

#include <sys/uio.h>  struct iovec {  ptr_t iov_base; /* Starting address */  size_t iov_len; /* Length in bytes */
};
    struct iovec定义了一个向量元素。通常,这个结构用作一个多元素的数组。对于每一个传输的元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是readv所接收的数据或是writev将要发送的数据。成员iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度。
#include <sys/uio.h>
ssize_t readv(int fd, const struct iovec *iov, int iovcnt); //将fd缓冲区数据一次性读进多个iov
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);//将多个iov中的数据一次性读进iov
  • 参数1:文件描述符
  • 参数2:struct iovec结构体指针,通常表示多个缓冲区
  • 参数3:缓冲区的个数

有了这两个函数,当想要集中写出某张链表时,只需让iov数组的各个元素包含链表中各个表项的地址和其长度,然后将iov和它的元素个数作为参数传递给writev(),这些数据便可一次写出。

上述函数的优势:从socket读到缓冲区的方法是使用readv先读至Buffer_,Buffer_不够会读至栈上65536个字节大小的空间,然后以append的方式追加入Buffer_。即考虑了避免系统调用带来开销,又不影响数据的接收,很"优雅”的一种解决方案。

核心函数二:append()函数:

把数据追加到缓冲区的append()函数是这样实现的:

void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);   //确保缓冲区可写空间大于等于len,如果不足,需要扩充std::copy(data, data+len, beginWrite());   //追加数据hasWritten(len);  //内部仅仅是写入后调整writeindex}

它先调用ensureWritableBytes(len)确定空间,空间不够该函数内部会扩充:

void ensureWritableBytes(size_t len){if (writableBytes() < len)  //如果可写数据小于len{ makeSpace(len);   //增加空间}assert(writableBytes() >= len);}//扩容函数
void makeSpace(size_t len)  //vector增加空间{if (writableBytes() + prependableBytes() < len + kCheapPrepend)  //确保空间是真的不够,而不是挪动就可以腾出空间{// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{//内部腾挪就足够append,那么就内部腾挪一下。// move readable data to the front, make space inside bufferassert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,    //原来的可读部分全部copy到Prepend位置,相当于向前挪动,为writeable留出空间begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;   //更新下标writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}

append()函数就是这样,实际上resize()会触发vector的内存分配机制,STL的内存分配机制看过源码的人都知道,每次翻倍,足够使用且节省开销。

核心函数三:shrink(size_t reserve)

这个函数就是Buffer的内部挪动函数。我们知道如果缓冲区readIndex一直后移,writeIndex也会后移,writeable区域变小,但实际上这时readIndex前面已被读走数据的区域实际上已经空闲了。这时如果来大量数据,我们尽量避免申请内存的消耗。通过内部挪动,将Buffer中readable数据挪到初始位置,就可以腾出空间了。

//收缩空间,保留reserver个字节,可能多次读写后buffer太大了,可以收缩void shrink(size_t reserve){// FIXME: use vector::shrink_to_fit() in C++ 11 if possible.//为什么要使用Buffer类型的other来收缩空间呢?如果不用这种方式,我们可选的有使用resize(),把我们在resize()z和Buffer other;  //生成临时对像,保存readable内容,然后和自身交换,该临时对象再析构掉//ensureWritableBytes()函数有两个功能,一个是空间不够resize空间,一个是空间足够内部腾挪,这里明显用的是后者。other.ensureWritableBytes(readableBytes()+reserve);  //确保有足够的空间,内部此时已经腾挪other.append(toStringPiece());   //把当前数据先追加到other里面,然后再交换。swap(other);   //然后再交换}

它调用了swap实际上是这样的:

void swap(Buffer& rhs){buffer_.swap(rhs.buffer_);std::swap(readerIndex_, rhs.readerIndex_);std::swap(writerIndex_, rhs.writerIndex_);}

4、参考

1、 Muduo 设计与实现之一:Buffer 类的设计

2、muduo应用层缓冲区buffer设计

【muduo源码分析】Buffer类的设计相关推荐

  1. muduo源码分析——TcpServer和Acceptor

    这篇文章用于分析muduo的TcpServer类和Acceptor类,原本打算将TcpConnection也放到这里一起聊的,但是那个太多啦,一篇文章太长会让人读的很不舒服把. 当然我用的代码是其他大 ...

  2. spring Quartz 源码分析--触发器类CronTriggerBean源码剖析

    前面我们讲到了Quartz框架在项目中的实现,在Quartz中的重要API有两个重要的触发器类:CronTrigger 和SimpleTrigger 在Quartz框架中这两个触发器都继承了一个抽象基 ...

  3. muduo源码分析之回调模块

    这次我们主要来说说muduo库中大量使用的回调机制.muduo主要使用的是利用Callback的方式来实现回调,首先我们在自己的EchoServer构造函数中有这样几行代码 EchoServer(Ev ...

  4. muduo源码分析之Buffer

    这一次我们来分析下muduo中Buffer的作用,我们知道,当我们客户端向服务器发送数据时候,服务器就会读取我们发送的数据,然后进行一系列处理,然后再发送到其他地方,在这里我们想象一下最简单的Echo ...

  5. muduo源码分析2——Singleton分析

    1. 一般singleton写法 单例模式即要求只有在第一次调用的时候创建该对象,主要分为以下两条路(返回指针还是引用),返回引用可以防止使用中delete instance导致对象被提前销毁: pr ...

  6. muduo源码分析之TcpServer模块

    这次我们开始muduo源代码的实际编写,首先我们知道muduo是LT模式,Reactor模式,下图为Reactor模式的流程图[来源1] 然后我们来看下muduo的整体架构[来源1] 首先muduo有 ...

  7. 【muduo源码分析】TcpServer服务架构

    1.muduo整体类图 2.服务器TcpServer (1)TcpServer由用户直接使用,生命周期由用户控制,用户设置好相应的回调MessageCallback.ConnectionCallbac ...

  8. 【muduo源码分析 】 MutexLock和MutexLockGuard封装

    MutexLock 封装临界区(critical section),这是一个简单的资源类,用RAII 手法[CCS,条款13] 封装互斥器的创建与销毁.临界区在Windows 上是struct CRI ...

  9. Hadoop3.2.1 【 HDFS 】源码分析 :FSDirectory类解析

    Table of Contents 一.前言. 二.构造方法 三.常量 四.方法 一.前言. Namenode最重要的两个功能之一就是维护整个文件系统的目录树(即命名空间namesystem) . H ...

最新文章

  1. graphql_GraphQL的稳步上升
  2. 乐观锁和悲观锁的区别(最全面的分析)
  3. CListCtrl 使用技巧
  4. Android IOC模块,利用了Java反射和Java注解
  5. 如何告别半途而废——韦东山嵌入式Linux视频学习笔记00
  6. Magicodes.IE Excel合并行数据导入教程
  7. php保存gbk字符串,php判断字符串gbk/utf8编码和转换
  8. 孪生神经网络_基于局部和全局孪生网络的鲁棒的人脸跟踪
  9. 嵌入式操作系统内核原理和开发(等值block内存池设计)
  10. TextView的跑马灯效果(AS开发实战第二章学习笔记)
  11. python 技能清单_Python清单
  12. 一套完整自定义工作流的实现
  13. WS2811B驱动使用及使用说明应用
  14. SpringBoot 检索篇 - 整合 Elasticsearch7.6.2
  15. SLF4J: Failed toString() invocation on an object of type [com.zhao.guang.xiao.top.po.BlogBean$Hibern
  16. 云锁linux宝塔安装,【最新版】宝塔面板下为Nginx自编译云锁Web防护模块教程
  17. 梧桐树定制福满满养老年金,给你养老生活源源不断的现金流!
  18. 古墓丽影10linux,《古墓丽影:崛起》Linux版上架Steam
  19. 使用Squid架设代理服务器实现局域网共享上网
  20. Facebook、微软、谷歌三大研究巨头齐聚首,共同探讨人工智能发展现状和趋势

热门文章

  1. C#笔记04 数组和循环
  2. python print sep,Python3.x语句print(1,2,3,sep=’:’)的输出结果为()。
  3. Python List sort方法无效
  4. 关于 动态分流系统 ABTestingGateway 的想法
  5. 当PowerDesigner的工具栏不见时候该怎么调出来
  6. 三维点云学习(4)6-ransac 地面分割
  7. 是否可以在网络共享磁盘上创建数据库?
  8. SDL1.3(C语言)程序移植LINUX。。。
  9. Spring通过注解的形式 将bean以及相应的属性值 放入ioc容器
  10. iconv命令的使用,解决libxml2中解释中文失败的问题