文章目录

  • Rocketmq 刷盘机制
    • 三个文件
      • indexFile
      • consumeQueue
      • commitlog
    • 异步刷盘
    • consumerqueue和indexfile文件是什么时候更新的
    • 同步刷盘

Rocketmq 刷盘机制

笔者这里分析的是4.8版本,这里发现以前的handleflushdisk 以及handleha这两个方法在4.8中已经不会使用到了,所以这里特地对源码进行深入剖析了,读者可以直接跳到异步刷盘流程阅读。

三个文件

在rocketmq里面存在这样三个文件

  • indexfile
  • consumequeue
  • commitlog

其中indexfile和consumequeue可以理解为索引文件,
commitlog才是真正的存放消息的文件

因为rocketmq要保证性能,发送消息落盘的速度,所以,在落盘上选择了顺序写,这样消息在文件上的顺序就是乱序的,所以就需要维护一个indexfile索引文件,以及消费队列的一个索引

这里先介绍三个文件的api,然后接着分析刷盘流程:所以读者可以先看下面的刷盘流程,然后在回头看三个文件的实现原理,文章篇幅较大

indexFile

indexfile文件存储在store目录下的index文件里面,里面存放的是消息的hashcode和index内容,

我们来看看indexfile里面都存储的什么东西:

文件由一个文件头组成:长40字节
500w个hashslot,每个4字节
2000w个index条目,每个20字节

所以这里我们可以估算每个indexfile的大小为:40+500w4+2000w20个字节,大约400M左右

indexfileheader由如下组成:

那消息的索引是怎么put到这个文件里面的呢?以及是如何获取的呢?

  • 这边我们启动一下环境,然后发送一下消息,就可以进入到该方法

来到putkey方法,this.indexHeader.getIndexCount()是indeshead里面最后4个字节(上图),指index的数量,如果数量小于200w说明没有超过最大的。

然后计算出hash,
slotPos是要存放的hash槽的位置,
absSlotPos是要存放hash槽的具体偏移量(绝对位置)

接着获取到对应absSlotPos绝对位置的int值,
timeDiff是当前时间与BeginTimestamp的时间差,所以这里4个字节肯定够了


计算出index的绝对位置,也就是获取index最大,然后顺序往后写

然后就开始往mappedByteBuffer里面写数据了,主要用到了putInt,putLong方法,写完之后,更新indexHead的信息,

可以看到this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());这段是将index的位置存在了hash槽里面,也就是说,查找一个索引的话是先找到hash槽里面的index位置,然后计算出index的具体偏移量来查找的

如果没有hash冲突,将hashslot++
将indexcount++

这边笔者提出一个问题?为什么要存timeDIff时间?存时间为什么不存绝对时间?


indexfile怎么解决hash冲突?

如下两段代码:

如果拿到的slotvalue不是0的话,说明存在hash冲突,
然后接下来的this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue)会将该值存入到index里面,也就是index的最后四个字节,也就是说,如果我们拿到index的值的话,发现index的最后四位不是0,那么就继续取出来,也就是说,这里index中相同hash是相连的,所以每个index就有存储hash的必要了,如果hash冲突了,根据最后四位拿出来,还是需要比对一下hash的。

接着我们看,是如何从indexfile中查找到消息的offset的?

这边启动执行org.apache.rocketmq.test.client.producer.querymsg.QueryMsgByKeyIT#testQueryMsg这个测试方法,就可以进入debug

来到org.apache.rocketmq.store.index.IndexFile#selectPhyOffset方法:

先将mappedfile锁住,然后根据key计算hash,
拿到slotpos,
计算出slot绝对位置offset

这边贴出整个方法,以及执行到最后每个变量的值:在右边一目了然

L214:将nextIndexToRead赋值,nextIndexToRead是下一次读取的index的位置
L215:校验
L219:计算出index的绝对位置absIndexPos
L223-L227:拿到index中的值
L239:比较hash值,然后放入phyOffsets中
L242:判断了如果prevIndexRead值合法且不是当前位置,就说明发生了hash冲突,将nextIndexToRead赋值,接着继续循环

所以这个方法执行完成之后phyOffsets会存放所以hash值和预期相同的一些消息地址

所以indexfile是如何解决hash冲突的?这个问题也就迎刃而解


以上是indexfile的常用连个api

consumeQueue

consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每个topic默认4个队列,里面存放的consumequeue文件

consumequeue文件存储的单元如下,固定20个字节,单个consumequeue文件默认包含30w个条目,所以单个文件大小大概6M左右,

这边我们发送消息进入到org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper 的debug:

进行了一些简单的判断之后来到:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo

将消息的信息put到byteBufferIndex里面,

先是8个字节的offset,然后4个字节的size,最后8个字节tag,当然这里的最后8个字节也有可能存放延迟消息的执行时间戳这里不展开

而执行到最后就是将byteBufferIndex追加到该队列里面,也就是追加到consumequeue里面

appendMessage实现逻辑:

commitlog

commitlog文件存储再store目录下的commitlog目录下:
每个默认1G大小,

如下是commitlog的中每个msg的形状,这是计算msg的长度代码里面的一段可以瞥见,可以很明白的看到了一个msg的结构是什么样的想要具体点可以看看丁威老师的博客里面的这里我把图片抄过来:

对比上面和下面的图片就很清晰的看到每个消息长什么样:

接下来我们分析一下org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)这个核心方法,

同样我们是发送一个消息来进入到debug:

先是获取到wroteOffset也就是文件初始位置加上bytebuffer写入位置,这里初始位置是第一个文件所以是0,
然后判断系统ip是不是ipv6地址,决定长度
然后调用createMessageId生成msgid,这里不展开,有兴趣的读者可以点进方法看下,

接着计算一些数据的长度:
将topic转化为byte数组,供存储
计算msg长度
校验msg的长度是否比最大长度长,以及最后commitlog最后需要8个字节的留白

接着将各个数据都放到bytebuffer里面,可以看到msg的格式和上面图片是一样的,

然后,就直接new了一个result里面放的是PUT_OK返回出去了,这里可以看到这里把msg放到bytebuffer里面就直接返回了,也就是说,只是放到内存映射里面,然后下面刷写到磁盘上,就又broker自己去操作了,那么同步刷盘是怎么实现呢?

  • 同步刷盘,在调用完这个方法之后会同步调用一下force方法

异步刷盘

Rocketmq的存储是与读写是机遇NIIO的内存映射机制的,就如上面分析的一样,数据是先都put到一个bytebuffer里面,然后根据配置的刷盘策略在不同时间进行刷盘的,

运行一下namesrv和broker,然后发送消息:进入到debug

消息发送之后在之前分析的一个顺藤摸瓜RocketMQ之消息发送debug解析,里面提到了往remoting模块发送了一个sendmsg的一个code的request,而这个code对应的processer在broker里面是SendMessageProcessor这个类,我们跟进这个类:

因为这边默认的是一步刷盘,所以这里走的是asyncProcessRequest这个方法,

跟进去判断了一下是否是批量发送消息,然后调用了asyncSendMessage方法:

继续跟进去发现这边校验了一下code,然后将body序列化了,拿到queueid,设置了一些topic,queue信息到msginner里面,这个msginner对象就是消息对象,最终是要store里面存储的。

记下来设置了一些时间信息,以及主机信息,判断了是否是事务消息,然后调用了this.brokerController.getMessageStore().asyncPutMessage(msgInner),也就是拿到MessageStore对象然后用该对象里面的commitlog对象来进行存储落盘

继续跟进,发现msg的一些信息都已经设置好了,接着调用commitlog进行落盘,而


来到org.apache.rocketmq.store.CommitLog#asyncPutMessage这个方法:

设置了存储时间和crc,对事务进行了判断

接着拿到锁,有设置了存储时间,最后调用了mappedFile.appendMessage方法,

一直来到这个方法org.apache.rocketmq.store.MappedFile#appendMessagesInner

在该方法里面校验了写位置是否大于最大长度

然后调用了cb.doAppend,而这个方法我们在上面说过的org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)这个方法,然后进行返回,

到这里我们还没有发现其他两个文件是什么时候更新的呢?comsumequeue和indexfile

接下来就将两个异步任务给返回出去:

一个刷盘任务,一个ha任务

因为是异步刷盘,所以这里的submitFlushRequest方法里面走下面这个分支:

直接唤醒刷盘线程,然后返回一个completed的future,并且里面的状态是put_ok,然后就回到我们一开始的发送成功的页面了。

consumerqueue和indexfile文件是什么时候更新的

在我们的broker启动的时候在这个方法里面org.apache.rocketmq.broker.BrokerController#start

跟进到方法里面,启动了this.reputMessageService.start();这个线程,

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run我们直接来到这个线程的run方法:

一个死循环在执行doreput方法:

在方法里面:执行了DefaultMessageStore.this.doDispatch(dispatchRequest)这个方法 ,继续更近

是从一个集合里面拿到了CommitLogDispatcher的实现类,然后挨个执行一下:

然后我们发现:
他的实现类就包含一个构建consumerqueue的和一个构建index的,那我们继续发送消息进入到debug:

果然这里面存放的三个dispatcher就是其中两个就是我们需要处理的文件的实现类:

我们来到consumerqueue实现类,发现这里执行了DefaultMessageStore.this.putMessagePositionInfo(request),而再跟进方法里面发现就是调用了putMessagePositionInfoWrapper来将consumerqueue文件写入到磁盘

同样我们debug到另一个index的实现类

跟进方法:

执行了putkey方法

而这个方法正是调用的是idexfile中的putkey方法:这个我们上面已经分析过:

所以在broker启动的时候就是会执行一个死循环的线程来将consumerqueue和indexfile进行刷盘。

同步刷盘

同步刷盘本质上是对异步的一种等待:

上面异步刷盘提到了submitFlushRequest这个方法,
可以看到在这个方法里面构建了一个groupcommitrequeest,提交给GroupCommitService执行:

然后在GroupCommitService里面是有个轮训看判断该request中的offset是否比commitlog的最大值小,如果小就将这个任务完成。

这里的思想和consumer的消费逻辑类似,都是发送一个request给service去轮询看是否完成。

那么这里并没有调用get进行阻塞,所以这里也就直接将future返回出去了,会不会有问题呢?

这里其实还是没有问题的,这里的同步刷盘,确实是将异步任务返回给了sendmessageprocesser,但是我们在回到org.apache.rocketmq.remoting.netty.NettyRemotingAbstract也就是调用的源头,

在第225行里面,这里是调用的sendmessageprocesser中的异步处理,我们明明配置的是同步刷盘,这里还是调用的异步处理,不会出问题吗?

原因是这里将callback这个回调,也加入到了这个异步回调里面,而最终返回给客户端的是在这个callback里面执行的,所以只要保证callback这个逻辑在整个 CompletableFuture之后就行了,

而asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor())这段逻辑正是将callback也加入到整个异步处理的最后:

所以即使这里的调用看上去是异步的,但是只要保证整个顺序就可以了,

而同步异步具体体现在commitlog里面的submitFlushRequest(result, msg)这个方法里面。

  1. 顺藤摸瓜RocketMQ之整体架构以及执行流程
  2. 顺藤摸瓜RocketMQ之注册中心debug解析
  3. 顺藤摸瓜RocketMQ之消息发送debug解析
  4. 顺藤摸瓜RocketMQ之消息消费debug解析
  5. 顺藤摸瓜RocketMQ之刷盘机制debug解析
  6. 顺藤摸瓜RocketMQ之主从同步(HA)解析

顺藤摸瓜RocketMQ之刷盘机制debug解析相关推荐

  1. RocketMQ的刷盘机制

    RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失.同时这样才可以让存储的消息量可以超出内存的限制.RocketMQ为了提高性能,会尽量保证磁盘的顺序写.消息在写入磁盘时,有两种写 ...

  2. RocketMQ消息存储之刷盘机制(原理篇)

    一.前言 RocketMQ的刷盘机制是一种确保消息可靠性的机制,简单来说就是Broker收到消息后,将消息存储到磁盘上.这样可以解决几个问题: 存储空间问题.内存空间有限,存入磁盘可以维护更多消息. ...

  3. RocketMQ5.0.0消息存储<四>_刷盘机制

    目录 一.刷盘概览 二.Broker刷盘机制 1. 同步刷盘 2. 异步刷盘 1):未开启堆外内存池 2):开启堆外内存池 三.参考资料 一.刷盘概览 RocketMQ存储与读写是基于JDK NIO的 ...

  4. RocketMQ刷盘机制

    概览 RocketMQ的存储读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中.在根据不同的刷盘策略在不同的时间进行刷盘.如果是同步刷盘,消息追加到内存后,将同步调用Mappe ...

  5. 深入源码聊聊RocketMQ刷盘机制

    大家好,我是Leo. 今天聊一下RocketMQ的三种刷盘机制. 同步刷盘 异步刷盘(RocketMQ默认) 异步刷盘+缓冲区 出自微信公众号[欢少的成长之路] 本章概括 同步刷盘 整个同步刷盘策略由 ...

  6. RocketMQ消息刷盘

    RocketMQ消息刷盘 流程图 1.源码分析 RocketMQ 存储与读写是基于 JDK NIO 的内存映射机制( MappedByteBuffer )的,消息存储时首先将消息追加到内存,再根据配 ...

  7. mysql刷盘机制详解

    目录 刷盘机制总览 log buffer(innodb的,由存储引擎分配) binlog cache(由server分配) buffer pool 自适应刷脏页Adaptive Flushing 刷盘 ...

  8. MySQL数据和日志的刷盘机制以及双一配置

    详细介绍了MySQL数据和日志的刷盘机制以及双一配置,双一配置可以保证Mysql日志数据不丢失. 文章目录 1 内存数据的刷盘机制 2 MySQL数据的刷盘 2.1 刷盘数据来源 2.2 脏页以及刷盘 ...

  9. 【RocketMQ】消息的刷盘机制

    刷盘策略 CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略: public class CommitLog { ...

最新文章

  1. Redis的7000字笔记总结,超详细!
  2. Netty傻瓜教程(五):不能不谈Redis
  3. python 搜索pdf文件中的文字_使用python查找搜索字符串在pdf文档中位于哪一页上...
  4. 在CentOS7.2上部署基于PostgreSQL10的citus分布式数据库
  5. Android Studio——字体大小的修改
  6. React开发(274):ant design 时间显示秒
  7. 【转载】wpf数据绑定binding与INotifyPropertyChanged
  8. 滚动截屏软件_华为指关节截屏不如三指截屏好用?一步到位,实践出真知
  9. kafak manager + zookeeper + kafka 消费队列快速清除
  10. sql多表查询的总结
  11. hibernate注解方式实现一对多映射
  12. ajax escape用法,ie11下ajax用escape发送中文参数失败
  13. 用servlet编写下载程序
  14. cd linux安装驱动程序,给CDlinux增添网卡驱动失败的原因和对策
  15. 明星热图|刘诗诗、倪妮出席品牌活动;王一博、任嘉伦、龚俊等签约新品牌成为代言人...
  16. java论文管理系统_Java人事管理系统(论文+源码)
  17. 10分钟西门子SMART200PLC轻松实现连接自建MQTT云平台操作教程
  18. mysql之外键以及小练习
  19. 电子科大计算机学院邱航教授,电子科大教计算机研室介绍
  20. linux学习宝典,Linux-学习方法附命令宝典

热门文章

  1. Retrieval task calculate Recall@k
  2. Android Jetpack总览
  3. opengl 画椭圆_卧蚕原来这么重要,没有也要画出来!
  4. 【HTML CSS】笔记2日 CSS基础
  5. android继电器指令,手机 蓝牙 控制 继电器 无线门锁 物联网 安卓源码 安卓开发 Android WIFI控制 详细注释测试稳定无bug...
  6. PowerDesigner实现版本控制,多人协作
  7. iPhone-NSAssert使用
  8. 亚马逊扫号侵权,卖家要如何自保?如何申诉?
  9. php自动生成新闻页,PHP静态新闻列表自动生成代码
  10. HTC VIVE Focus Wave VR SDK 新手入门开发配置全指南 Development Tutorial