ConsumeQueue构建过程分析
1. 前言
理论上来说,RocketMQ只要有CommitLog文件就可以正常运行了,那为何还要维护ConsumeQueue文件呢?
ConsumeQueue是消费队列,引入它的目的是为了提高消费者的消费速度。毕竟RocketMQ是基于Topic主题订阅模式的,消费者往往只关心自己订阅的消息,如果每次消费都从CommitLog文件中检索数据,无疑性能是非常差的。有了ConsumeQueue,消费者就可以根据消息在CommitLog文件中的偏移量快速定位到消息进行消费了。
之前的文章已经说过,Broker会将客户端发送的消息写入CommitLog文件,持久化存储。但是整个流程并没有涉及到ConsumeQueue文件的操作,那么ConsumeQueue文件是如何被构建的呢?
2. ReputMessageService
ReputMessageService是「消息重放服务」,请允许我这么命名。Broker在启动的时候,会开启一个线程每毫秒执行一次doReput()
方法。
它的目的就是对写入CommitLog文件里的消息进行「重放」,它有一个属性reputFromOffset
,记录的是消息重放的偏移量,MessageStore启动的时候会对其进行赋值。
它的工作原理是,根据重放偏移量reputFromOffset
去读取CommitLog里的待重放的消息,并构建DispatchRequest对象,然后将DispatchRequest对象分发出去,交给各个CommitLogDispatcher处理。
MessageStore维护了CommitLogDispatcher对象集合,目前只有三个处理器:
- CommitLogDispatcherBuildConsumeQueue:构建ConsumeQueue索引。
- CommitLogDispatcherBuildIndex:构建Index索引。
- CommitLogDispatcherCalcBitMap:构建布隆过滤器,加速SQL92过滤效率。
本篇文章主要分析CommitLogDispatcherBuildConsumeQueue,看看RocketMQ是如何构建ConsumeQueue的。
3. 源码分析
笔者画了一下ConsumeQueue构建过程的时序图,整个构建过程并不算复杂。
1.doReput()
方法1毫秒执行一次,它的方法体是一个for循环,只要reputFromOffset没有到达CommitLog文件的最大偏移量,就会一直继续重放消息。
private boolean isCommitLogAvailable() {return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
它首先会根据reputFromOffset去CommitLog文件中截取一段ByteBuffer,这个缓冲区里就是待重放的消息数据。
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {// CommitLog单个文件的大小int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();// 根据索引构建进度找到等待构建的文件,文件名就是起始Offset,遍历文件即可找到MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {// 计算Offset在当前文件的读指针位置int pos = (int) (offset % mappedFileSize);/*** 基于MappedFile的MappedByteBuffer派生出一个ByteBuffer对象* 共享同一块内存,但是拥有自己的指针*/SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null;
}
SelectMappedBufferResult类属性如下:
// 起始偏移量
private final long startOffset;
// 缓冲区
private final ByteBuffer byteBuffer;
// 长度
private int size;
// 关联的MappedFile对象
private MappedFile mappedFile;
2.有了SelectMappedBufferResult,就可以读取消息数据了。由于消息重放并不需要知道消息主体内容,因此不会读取消息Body,只是读取相关属性,并构建DispatchRequest对象。读取的属性如下:
// 消息所属Topic
private final String topic;
// 消息所属队列ID
private final int queueId;
// 消息在CommitLog文件中的偏移量
private final long commitLogOffset;
// 消息大小
private int msgSize;
// 消息Tag哈希码
private final long tagsCode;
// 消息存盘时间
private final long storeTimestamp;
// 逻辑消费队列位点
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// 消息唯一键
private final String uniqKey;
// 消息系统标记
private final int sysFlag;
// 事务消息偏移量
private final long preparedTransactionOffset;
// 属性
private final Map<String, String> propertiesMap;
3.有了DispatchRequest对象,接下来就是调用doDispatch
方法将请求分发出去了。此时CommitLogDispatcherBuildConsumeQueue将被触发,它会将请求转交给DefaultMessageStore执行。
DefaultMessageStore.this.putMessagePositionInfo(request);
4.MessageStore先根据消息Topic和QueueID定位到ConsumeQueue文件,然后将索引追加到文件中。
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {// 根据Topic和QueueID定位到ConsumeQueue文件ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());// 追加索引到文件cq.putMessagePositionInfoWrapper(dispatchRequest);
}
写索引之前,会先确保消息仓库是可写状态:
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
然后,初始化一个ByteBuffer,容量为20字节,依次往里面写入:消息Offset、size、tagsCode。
// 每个索引的长度是20字节,byteBufferIndex是循环使用的
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/**
* 索引结构:Offset+size+tagsCode
* 8字节 4字节 8字节
*/
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
根据消费队列位点和单个索引的长度计算索引应该写入的文件位置,因为是顺序写的嘛,所以获取最新的ConsumeQueue文件,如果文件写满会创建新的继续写。
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
写之前,校验预期的偏移量和逻辑偏移量是否相等,正常情况下两者应该相等,如果不等说明数据构建错乱了,需要重新构建了。
if (cqOffset != 0) {// 偏移量:当前文件的写指针位置+文件起始偏移量(文件名)long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();// 正常情况下,expectLogicOffset和currentLogicOffset应该相等if (expectLogicOffset < currentLogicOffset) {log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}
}
检验通过后,就可以正常写了。先更新当前ConsumeQueue记录消息的最大偏移量maxPhysicOffset,再将20个字节的索引数据写入到文件。
// 更新当前ConsumerQueue记录的消息在CommitLog中的最大偏移量
this.maxPhysicOffset = offset + size;
// 将20字节的索引数据写入文件
return mappedFile.appendMessage(this.byteBufferIndex.array());
至此,就完成了CommitLog中的消息到ConsumeQueue文件里的索引同步。
ConsumeQueue索引条目结构:
长度 | 说明 |
---|---|
8 | 消息在CommitLog文件中的偏移量 |
4 | 消息长度 |
8 | 消息Tag哈希码,根据Tag过滤消息 |
4. 总结
ConsumeQueue是RocketMQ用来加速消费者消费效率的索引文件,它是一个逻辑消费队列,并不保存消息本身,只是一个消息索引。索引长度为20个字节,记录了消息在CommitLog文件里的偏移量,消息长度,和消息Tag的哈希值。Consumer消费消息时可以根据Tag哈希值快速过滤消息,然后根据偏移量快速定位到消息,再根据消息长度读取出一条完整的消息。
Broker将消息写入CommitLog后并不会马上写ConsumeQueue,而是由一个异步线程ReputMessageService将消息进行重放,重放的过程中由CommitLogDispatcherBuildConsumeQueue将消息构建到ConsumeQueue文件,构建的频率为1毫秒一次,几乎是近实时的,不用担心消费会延迟。
ConsumeQueue构建过程分析相关推荐
- 深入Vue - 源码目录及构建过程分析
摘要: Vue源码阅读第一步. 原文:深入vue - 源码目录及构建过程分析 公众号:前端小苑 Fundebug经授权转载,版权归原作者所有. 本文主要梳理一下vue代码的目录,以及vue代码构建流程 ...
- Kubernetes构建过程分析
构建方式 Kubernetes的构建方式可以分为3种,分别是本地环境构建.容器环境构建.Bazel环境构建. Kubernetes构建方式: 本地环境构建 make make all 容器环境构建 m ...
- MPB:南京湖泊所王建军组-群落构建过程的定量指标——扩散-生态位连续体指数...
为进一步提高<微生物组实验手册>稿件质量,本项目新增大众评审环节.文章在通过同行评审后,采用公众号推送方式分享全文,任何人均可在线提交修改意见.公众号格式显示略有问题,建议电脑端点击文末阅 ...
- 优酷 Android 构建速度优化实践
作者:苏彦郊(木磊) Android 项目一般使用 gradle 作为构建打包工具,gradle 简洁.动态的功能特性为人津津乐道,同样,构建执行速度缓慢的缺陷也一直为人诟病. 近年来,随着优酷功能特 ...
- vue源码解析(3)—— Vue.js 源码构建
Vue.js 源码构建 Vue.js 源码是基于 Rollup 构建的,它的构建相关配置都在 scripts 目录下. 构建脚本 通常一个基于 NPM 托管的项目都会有一个 package.json ...
- Android系统开发和性能优化——查漏补缺【建议收藏】
做了这么久性能相关的工作,也接触了不少模块,说实话要做好性能这一块,真心不容易.为什么这么说? 是因为需要接触的知识实在是太多了, Android 是一个整体,牵一发而动全身,不是说只懂一个模块就可以 ...
- as安装过程中gradle_重新认识AndroidStudio和Gradle,这些都是我们应该知道的
前言 主要从AndroidStudio的环境安装升级GradleEclipse转AS,多渠道配置Maven私服Action,Option快捷键等几个方面出发讲一些操作技巧以及我对AndroidStud ...
- MyBatis 源码分析 - 映射文件解析过程
1.简介 在上一篇文章中,我详细分析了 MyBatis 配置文件的解析过程.由于上一篇文章的篇幅比较大,加之映射文件解析过程也比较复杂的原因.所以我将映射文件解析过程的分析内容从上一篇文章中抽取出来, ...
- APK反编译得工具总结(转载)
Android反编译技术总结 转自UncleChen's Blog,作者:UncleChen 一.Apk反编译工具及其使用方法 1.原理 学习反编译之前,建议先学习一下Apk打包的过程,明白打包完成后 ...
最新文章
- 如果有电脑——计算机达人成长之路(36)
- Matlab与线性代数 -- 逆矩阵
- 蚂蚁森林合种计划(2020.11.14,7天有效,每周6更新)
- 无人驾驶还有多久才能全面推开?
- c语言水印添加,[求助]C语言 bmp文件加上水印
- SQL SERVER 数据库清空语句 忽略外键 触发器 等
- JavaScript入门(part1)--初识JavaScript
- 制作点击文字变颜色_腾讯的动态时间轴PPT火了!制作简单又有逼格,都学起来啊...
- 4.9_bridge_结构型模式:桥模式
- 微软高层人士变动!张祺晋升为微软公司全球资深副总裁
- POJ 3111 K Best 贪心 二分
- JavaScript学习_第2章_JS语法规则
- MCU——SRAM和Flash
- 关于18183-王者荣耀专区网站的TDK简要分析(更多内容请访问http://www.eduaskx6.com/)...
- DIY强大的虚拟化环境-组装于测试部分-2.L5420主机
- hp刀片服务器性能分析,IBM刀片服务器与高性能计算-20210723075634.ppt-原创力文档...
- ZZULIOJ1051-1055Python解法
- NRF24L01模块使用(老干妈笔记)
- webp格式以及工具介绍
- UML图:用例图详细介绍
热门文章
- 用python写跑酷游戏脚本,用Python写一个天天酷跑
- 【华为OD机试真题 python】星际篮球争霸赛【2022 Q4 | 100分】
- 【疯壳·嵌入式平板开发教程5】手把手教你做平板电脑-触摸屏驱动实验教程
- Kubernetes 学习总结(28)—— Kubernetes 常见问题总结
- vue3学习之旅--邂逅vue3-了解认识Vue3(二)
- 香甜的黄油 图论—最短路径
- 【华为_WLAN】AP4030DN 由 FIT AP 更新至 FAT AP(Uboot方式)
- SQL左连接副表取最新的一条记录的多中方式
- 计算机音乐最早出现,电脑一直无缘无故出现音乐是为什么?
- Baumer工业相机堡盟相机如何通过BGAPI SDK联合OpenCVSharp进行图像简单拼接并显示固定数量保存和持续保存(C#)