同步刷盘

在RocketMQ中有同步刷盘和异步刷盘两种方式

2种刷盘方式适用的场景如下

刷盘方式 适用场景
同步刷盘 数据可靠性高,适用于金融等对数据可靠性要求高的场景,性能比异步刷盘要低
异步刷盘 性能和吞吐量高 , Broker端异常关闭时,有少量消息丢失

根据前面的章节我们知道RocketMQ会通过SendMessageProcessor来处理刷盘的消息,当消息存储到内存中后,就开始刷盘

异步刷盘的方式有两种,第一种Mmap+PageCache(默认的异步刷盘方式),上面说到的同步刷盘也是这种机制,代码实现如下

@Test
public void writeCaseOne() throws Exception {File file = new File("/Users/peng/software/rocketmq/test/case1.txt");FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 2048);byteBuffer.put("hello mmap\n".getBytes());// 将 pagecache 中的内容强制刷到磁盘byteBuffer.force();
}

第二种是DirectByteBuffer+PageCache,也就是直接写堆外内存

@Test
public void writeCaseTwo() throws Exception {File file = new File("/Users/peng/software/rocketmq/test/case2.txt");FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();ByteBuffer byteBuffer = ByteBuffer.allocate(20);byteBuffer.put("hello mmap\n".getBytes());byteBuffer.flip();while (byteBuffer.hasRemaining()) {fileChannel.write(byteBuffer);}// 将 pagecache 中的内容强制刷到磁盘fileChannel.force(false);
}


从CommitLog#submitFlushRequest方法可以看到刷盘的逻辑

当broker端配置的是同步刷盘,但是发送过来的消息不需要等待消息刷盘完成,就会退化成异步刷盘,我们先看同步刷盘,在RocketMQ中,并不是往内存中放一条消息,就刷盘一次,这样效率太低。RocketMQ会每隔10ms统一执行刷盘请求来提高效率

  1. 首先把刷盘的请求封装成GroupCommitRequest,然后放到GroupCommitService的阻塞队列中
  2. GroupCommitService每隔10ms将目前阻塞队列中的刷盘请求统一执行,然后唤醒阻塞等待的线程
public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 有数据过来会结束等待的this.waitForRunning(10);this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// 省略部分逻辑
}

不断执行doCommit方法进行刷盘,当刷盘完成时,会唤醒等待刷盘的线程

这里有个需要注意的细节点,我我们放请求的时候是放到requestsWrite中,但是读的时候却是在requestsRead中,那么requestsRead中能读取到值吗?

// GroupCommitService
// 读请求列表
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 读请求列表
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();

我们来看ServiceThread类的waitForRunning方法

其实当每次等待结束后都会调用onWaitEnd方法,而GroupCommitService重写了这个方法,在这个方法内部调用swapRequests方法

private void swapRequests() {lock.lock();try {LinkedList<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;} finally {lock.unlock();}
}

swapRequests方法会将requestsWrite和requestsRead中的内容进行交换。

首先通过上次刷盘位置定位到MappedFile,然后开始刷盘


可以看到有两种刷盘的方式,调用FileChannel#force(异步刷盘并且开启transientStorePool)或者MappedByteBuffer#force(同步刷盘或者异步刷盘但是不开启transientStorePool)

当刷盘的时候,需要累积到一定页数才开始刷,同步刷盘是0页,异步输盘是4页。至此同步输盘的逻辑就梳理完了。

其实异步输盘不开启transientStorePool时,执行的逻辑和这个差不多,只是累计的页数不相同而已

异步刷盘

不开启TransientStorePool

当不开启TransientStorePoo时,会先唤醒FlushRealTimeService线程,然后开始开始刷盘

先算出输盘的页数,默认4页,如果10s没有刷盘了,则将页数设为0,然后执行MappedFileQueue#flush方法,这个方法在同步刷盘已经分析过了,不再分析。

开启TransientStorePool

当开启TransientStorePool是会先唤醒CommitRealTimeService,将ByteBuffer中的内容刷入FileChannel,接着唤醒FlushRealTimeService线程,将FileChannel中的数据刷入磁盘

先算出commit的页数,默认4页,如果200ms没有commit了,则将页数设为0(在后续执行流程可以看到commit也对页数有要求),然后执行MappedFileQueue#commit方法,将将ByteBuffer中的内容刷入FileChannel

MappedFile#commit0

至于这两种刷盘方式的好处,我个人理解也不是很深刻,因此转一下社区胡宗棠老师对这个问题的解读

一般有两种,有两种方式进行读写

  1. 第一种,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。
  2. 第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

参考博客

[1]

RocketMQ源码解析:同步刷盘和异步刷盘的实现相关推荐

  1. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  2. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  3. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  4. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  5. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  6. 消息中间件RocketMQ源码解析-- --调试环境搭建

    1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...

  7. RocketMQ源码解析-Broker的HA实现

    以master异步复制为例子. 在rocketmq的slave broker机子当中,会在DefaultMessageStore的启动当中启动自己的HaService来进行自己的ha服务. publi ...

  8. RocketMQ源码解析-Broker部分之Broker启动过程

    目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...

  9. RocketMQ源码解析-PullConsumer取消息(1)

    PullConsumer取消息需要自己手动调用Consumer的pull方法主动拉取消息.需要的参数有具体的消息队列(调用消费者的fetchSubscibeMessageQueue()可以得到相应to ...

最新文章

  1. Java中ArrayList remove会遇到的坑
  2. ashx session 使用注意要点。
  3. JZOJ 4058. 【JSOI2015】子集选取
  4. 普通调幅(AM)与抑制载波双边带调幅(DSB)matlab编程实现
  5. 2007cad多个文件窗口上部排列_【中考信息技术总复习讲义】模块三 操作系统与文件管理...
  6. Java System类loadLibrary()方法与示例
  7. 【将图像字符画】【第二玩】图像字符化
  8. java学完jdk后学什么_学完了javase之后要学什么?
  9. ASP.NET中Session简单原理图
  10. Ribbon负载均衡源码解读
  11. Leetcode之跳跃游戏Ⅱ
  12. 利用 Python分析北京雾霾天,发现这么秘密
  13. 单测量矢量多目标精确DOA估计的高效稀疏表示算法
  14. Sybase迁移Oracle字符集问题,Sybase数据库迁移数据到Oracle(未改进)
  15. kdiff3的主窗口说明 Base Local Remote 分别代表什么分支
  16. windows系统PrintScreen键截屏
  17. 单模光电转换器怎么接_光纤收发器及其连接方式图解!
  18. 【读书联动】认知觉醒:开启自我改变的原动力
  19. 修改MySQL的字符集为utf8mb4
  20. 设计分享 | 基于51单片机实现红外控制系统控制电机调速

热门文章

  1. 冯·诺依曼架构哈佛架构(嵌入式学习)
  2. layui中form.val组件
  3. 单页应用的优缺点,单页应用首屏加载优化、小程序首次启动速度优化
  4. java线程暂停和继续_线程暂停和恢复
  5. uboot编译全过程
  6. android 圆形碰撞
  7. iPhone为何能长期统治日本手机市场
  8. 开发过程中自己遇到的问题总结
  9. (原创)一个JavaScript Function Outliner插件 第四版本 支持内嵌javascript,且可以对javascript进行压缩
  10. php source insight语言文件,玩转SourceInsight语言定义——让你的sourceinsight