1. 前言

理论上来说,RocketMQ只要有CommitLog文件就可以正常运行了,那为何还要维护ConsumeQueue文件呢?

ConsumeQueue是消费队列,引入它的目的是为了提高消费者的消费速度。毕竟RocketMQ是基于Topic主题订阅模式的,消费者往往只关心自己订阅的消息,如果每次消费都从CommitLog文件中检索数据,无疑性能是非常差的。有了ConsumeQueue,消费者就可以根据消息在CommitLog文件中的偏移量快速定位到消息进行消费了。

之前的文章已经说过,Broker会将客户端发送的消息写入CommitLog文件,持久化存储。但是整个流程并没有涉及到ConsumeQueue文件的操作,那么ConsumeQueue文件是如何被构建的呢?

2. ReputMessageService

ReputMessageService是「消息重放服务」,请允许我这么命名。Broker在启动的时候,会开启一个线程每毫秒执行一次doReput()方法。

它的目的就是对写入CommitLog文件里的消息进行「重放」,它有一个属性reputFromOffset,记录的是消息重放的偏移量,MessageStore启动的时候会对其进行赋值。

它的工作原理是,根据重放偏移量reputFromOffset去读取CommitLog里的待重放的消息,并构建DispatchRequest对象,然后将DispatchRequest对象分发出去,交给各个CommitLogDispatcher处理。

MessageStore维护了CommitLogDispatcher对象集合,目前只有三个处理器:

  1. CommitLogDispatcherBuildConsumeQueue:构建ConsumeQueue索引。
  2. CommitLogDispatcherBuildIndex:构建Index索引。
  3. CommitLogDispatcherCalcBitMap:构建布隆过滤器,加速SQL92过滤效率。

本篇文章主要分析CommitLogDispatcherBuildConsumeQueue,看看RocketMQ是如何构建ConsumeQueue的。

3. 源码分析

笔者画了一下ConsumeQueue构建过程的时序图,整个构建过程并不算复杂。

1.doReput()方法1毫秒执行一次,它的方法体是一个for循环,只要reputFromOffset没有到达CommitLog文件的最大偏移量,就会一直继续重放消息。

private boolean isCommitLogAvailable() {return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

它首先会根据reputFromOffset去CommitLog文件中截取一段ByteBuffer,这个缓冲区里就是待重放的消息数据。

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {// CommitLog单个文件的大小int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();// 根据索引构建进度找到等待构建的文件,文件名就是起始Offset,遍历文件即可找到MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {// 计算Offset在当前文件的读指针位置int pos = (int) (offset % mappedFileSize);/*** 基于MappedFile的MappedByteBuffer派生出一个ByteBuffer对象* 共享同一块内存,但是拥有自己的指针*/SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null;
}

SelectMappedBufferResult类属性如下:

// 起始偏移量
private final long startOffset;
// 缓冲区
private final ByteBuffer byteBuffer;
// 长度
private int size;
// 关联的MappedFile对象
private MappedFile mappedFile;

2.有了SelectMappedBufferResult,就可以读取消息数据了。由于消息重放并不需要知道消息主体内容,因此不会读取消息Body,只是读取相关属性,并构建DispatchRequest对象。读取的属性如下:

// 消息所属Topic
private final String topic;
// 消息所属队列ID
private final int queueId;
// 消息在CommitLog文件中的偏移量
private final long commitLogOffset;
// 消息大小
private int msgSize;
// 消息Tag哈希码
private final long tagsCode;
// 消息存盘时间
private final long storeTimestamp;
// 逻辑消费队列位点
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// 消息唯一键
private final String uniqKey;
// 消息系统标记
private final int sysFlag;
// 事务消息偏移量
private final long preparedTransactionOffset;
// 属性
private final Map<String, String> propertiesMap;

3.有了DispatchRequest对象,接下来就是调用doDispatch方法将请求分发出去了。此时CommitLogDispatcherBuildConsumeQueue将被触发,它会将请求转交给DefaultMessageStore执行。

DefaultMessageStore.this.putMessagePositionInfo(request);

4.MessageStore先根据消息Topic和QueueID定位到ConsumeQueue文件,然后将索引追加到文件中。

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {// 根据Topic和QueueID定位到ConsumeQueue文件ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());// 追加索引到文件cq.putMessagePositionInfoWrapper(dispatchRequest);
}

写索引之前,会先确保消息仓库是可写状态:

boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();

然后,初始化一个ByteBuffer,容量为20字节,依次往里面写入:消息Offset、size、tagsCode。

// 每个索引的长度是20字节,byteBufferIndex是循环使用的
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/**
* 索引结构:Offset+size+tagsCode
* 8字节 4字节 8字节
*/
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

根据消费队列位点和单个索引的长度计算索引应该写入的文件位置,因为是顺序写的嘛,所以获取最新的ConsumeQueue文件,如果文件写满会创建新的继续写。

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

写之前,校验预期的偏移量和逻辑偏移量是否相等,正常情况下两者应该相等,如果不等说明数据构建错乱了,需要重新构建了。

if (cqOffset != 0) {// 偏移量:当前文件的写指针位置+文件起始偏移量(文件名)long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();// 正常情况下,expectLogicOffset和currentLogicOffset应该相等if (expectLogicOffset < currentLogicOffset) {log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}
}

检验通过后,就可以正常写了。先更新当前ConsumeQueue记录消息的最大偏移量maxPhysicOffset,再将20个字节的索引数据写入到文件。

// 更新当前ConsumerQueue记录的消息在CommitLog中的最大偏移量
this.maxPhysicOffset = offset + size;
// 将20字节的索引数据写入文件
return mappedFile.appendMessage(this.byteBufferIndex.array());

至此,就完成了CommitLog中的消息到ConsumeQueue文件里的索引同步。

ConsumeQueue索引条目结构:

长度 说明
8 消息在CommitLog文件中的偏移量
4 消息长度
8 消息Tag哈希码,根据Tag过滤消息

4. 总结

ConsumeQueue是RocketMQ用来加速消费者消费效率的索引文件,它是一个逻辑消费队列,并不保存消息本身,只是一个消息索引。索引长度为20个字节,记录了消息在CommitLog文件里的偏移量,消息长度,和消息Tag的哈希值。Consumer消费消息时可以根据Tag哈希值快速过滤消息,然后根据偏移量快速定位到消息,再根据消息长度读取出一条完整的消息。

Broker将消息写入CommitLog后并不会马上写ConsumeQueue,而是由一个异步线程ReputMessageService将消息进行重放,重放的过程中由CommitLogDispatcherBuildConsumeQueue将消息构建到ConsumeQueue文件,构建的频率为1毫秒一次,几乎是近实时的,不用担心消费会延迟。

ConsumeQueue构建过程分析相关推荐

  1. 深入Vue - 源码目录及构建过程分析

    摘要: Vue源码阅读第一步. 原文:深入vue - 源码目录及构建过程分析 公众号:前端小苑 Fundebug经授权转载,版权归原作者所有. 本文主要梳理一下vue代码的目录,以及vue代码构建流程 ...

  2. Kubernetes构建过程分析

    构建方式 Kubernetes的构建方式可以分为3种,分别是本地环境构建.容器环境构建.Bazel环境构建. Kubernetes构建方式: 本地环境构建 make make all 容器环境构建 m ...

  3. MPB:南京​湖泊所王建军组-​群落构建过程的定量指标——扩散-生态位连续体指数...

    为进一步提高<微生物组实验手册>稿件质量,本项目新增大众评审环节.文章在通过同行评审后,采用公众号推送方式分享全文,任何人均可在线提交修改意见.公众号格式显示略有问题,建议电脑端点击文末阅 ...

  4. 优酷 Android 构建速度优化实践

    作者:苏彦郊(木磊) Android 项目一般使用 gradle 作为构建打包工具,gradle 简洁.动态的功能特性为人津津乐道,同样,构建执行速度缓慢的缺陷也一直为人诟病. 近年来,随着优酷功能特 ...

  5. vue源码解析(3)—— Vue.js 源码构建

    Vue.js 源码构建 Vue.js 源码是基于 Rollup 构建的,它的构建相关配置都在 scripts 目录下. 构建脚本 通常一个基于 NPM 托管的项目都会有一个 package.json ...

  6. Android系统开发和性能优化——查漏补缺【建议收藏】

    做了这么久性能相关的工作,也接触了不少模块,说实话要做好性能这一块,真心不容易.为什么这么说? 是因为需要接触的知识实在是太多了, Android 是一个整体,牵一发而动全身,不是说只懂一个模块就可以 ...

  7. as安装过程中gradle_重新认识AndroidStudio和Gradle,这些都是我们应该知道的

    前言 主要从AndroidStudio的环境安装升级GradleEclipse转AS,多渠道配置Maven私服Action,Option快捷键等几个方面出发讲一些操作技巧以及我对AndroidStud ...

  8. MyBatis 源码分析 - 映射文件解析过程

    1.简介 在上一篇文章中,我详细分析了 MyBatis 配置文件的解析过程.由于上一篇文章的篇幅比较大,加之映射文件解析过程也比较复杂的原因.所以我将映射文件解析过程的分析内容从上一篇文章中抽取出来, ...

  9. APK反编译得工具总结(转载)

    Android反编译技术总结 转自UncleChen's Blog,作者:UncleChen 一.Apk反编译工具及其使用方法 1.原理 学习反编译之前,建议先学习一下Apk打包的过程,明白打包完成后 ...

最新文章

  1. 如果有电脑——计算机达人成长之路(36)
  2. Matlab与线性代数 -- 逆矩阵
  3. 蚂蚁森林合种计划(2020.11.14,7天有效,每周6更新)
  4. 无人驾驶还有多久才能全面推开?
  5. c语言水印添加,[求助]C语言 bmp文件加上水印
  6. SQL SERVER 数据库清空语句 忽略外键 触发器 等
  7. JavaScript入门(part1)--初识JavaScript
  8. 制作点击文字变颜色_腾讯的动态时间轴PPT火了!制作简单又有逼格,都学起来啊...
  9. 4.9_bridge_结构型模式:桥模式
  10. 微软高层人士变动!张祺晋升为微软公司全球资深副总裁
  11. POJ 3111 K Best 贪心 二分
  12. JavaScript学习_第2章_JS语法规则
  13. MCU——SRAM和Flash
  14. 关于18183-王者荣耀专区网站的TDK简要分析(更多内容请访问http://www.eduaskx6.com/)...
  15. DIY强大的虚拟化环境-组装于测试部分-2.L5420主机
  16. hp刀片服务器性能分析,IBM刀片服务器与高性能计算-20210723075634.ppt-原创力文档...
  17. ZZULIOJ1051-1055Python解法
  18. NRF24L01模块使用(老干妈笔记)
  19. webp格式以及工具介绍
  20. UML图:用例图详细介绍

热门文章

  1. 用python写跑酷游戏脚本,用Python写一个天天酷跑
  2. 【华为OD机试真题 python】星际篮球争霸赛【2022 Q4 | 100分】
  3. 【疯壳·嵌入式平板开发教程5】手把手教你做平板电脑-触摸屏驱动实验教程
  4. Kubernetes 学习总结(28)—— Kubernetes 常见问题总结
  5. vue3学习之旅--邂逅vue3-了解认识Vue3(二)
  6. 香甜的黄油 图论—最短路径
  7. 【华为_WLAN】AP4030DN 由 FIT AP 更新至 FAT AP(Uboot方式)
  8. SQL左连接副表取最新的一条记录的多中方式
  9. 计算机音乐最早出现,电脑一直无缘无故出现音乐是为什么?
  10. Baumer工业相机堡盟相机如何通过BGAPI SDK联合OpenCVSharp进行图像简单拼接并显示固定数量保存和持续保存(C#)