问题

有一个疑问,当client给broker发送消息的时候,怎么知道在commitlog的第几个字节开始写呢?

文件格式概述

commitlog消息单元存储结构

commitlog中存储的是客户端发送的所有数据

ConsumeQueue消息单元存储结构

ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。

流程图

源码跟踪(broker启动流程里)

入口方法

DefaultMessageStore###load

public boolean load() {boolean result = true;try {//省略// 装载Commit Logresult = result && this.commitLog.load();if (result) {//省略//确定Commit Log文件下一个写的位置this.recover(lastExitOK);}} catch (Exception e) {}return result;}

装载commitlog:把commitlog中下的文件都映射成MappedFile,方便读写

CommitLog###load

public boolean load() {//跟进去,调用mappedFileQueue.load方法boolean result = this.mappedFileQueue.load();log.info("load commit log " + (result ? "OK" : "Failed"));return result;}

MappedFileQueue###load方法:在该方法中把commitlog下的文件映射成MappedFile

public boolean load() {//window上默认的目录:C:\Users\25682\store\commitlogFile dir = new File(this.storePath);//上面目录下子文件File[] files = dir.listFiles();if (files != null) {// ascending orderArrays.sort(files);for (File file : files) {if (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, please check it manually");return false;}try {//把每一个文件映射成MappedFile对象,方便读取MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);//此时wrotePosition设置的为mappedFileSize,不准确mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}}return true;}

此时CommitLog下的MappedFile的wrotePosition设置为mappedFileSize,但是最后这个MappedFile的wrotePosition还不对,因此下面需要修改

确定Commitlog要写的位置

DefaultMessageStore###recover

private void recover(final boolean lastExitOK) {//从ConsumeQueue中获取最大的物理偏移量long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();if (lastExitOK) {this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {//this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}this.recoverTopicQueueTable();}

DefaultMessageStore###recoverConsumeQueue:获取每一个主题里每一个队列里的最大commitlog偏移量

private long recoverConsumeQueue() {long maxPhysicOffset = -1;for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {logic.recover();if (logic.getMaxPhysicOffset() > maxPhysicOffset) {maxPhysicOffset = logic.getMaxPhysicOffset();}}}return maxPhysicOffset;}

CommitLog###recoverAbnormally

public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {// recover by the minimum time stampboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {int index = mappedFiles.size() - 1;//获取最后一个CommitLog的MapperFileMappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {//不断从MapperFile中根据CommitLog的数据单元格式读取数据,当读取到数据为0时,跳出循环,说明该位置为下个需要写的位置DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {// Normal dataif (size > 0) {mappedFileOffset += size;if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {this.defaultMessageStore.doDispatch(dispatchRequest);}} else {this.defaultMessageStore.doDispatch(dispatchRequest);}}else if (size == 0) {index++;if (index >= mappedFiles.size()) {log.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}} else {log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());break;}}processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);//该位置为真正要插入的位置,所以修正上面的设置的错误的wrotePositionthis.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant dataif (maxPhyOffsetOfConsumeQueue >= processOffset) {log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}}// Commitlog case files are deletedelse {log.warn("The commitlog files are deleted, and delete the consume queue files");this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}}

结论

CommitLog一开始是把wrotePosition设置为CommitLog文件的大小,这样只有最后一个CommitLog的wrotePosition的数据是不正确的,所以后面在确定最后一个CommitLog的wrotePosition的时候是通过读取CommitLog文件里的数据来确定wrotePosition位置的,因为CommitLog里前四个字节代表这条消息的大小,这样我读取前四个字节以后就可以读取这一条数据,然后以此类推,当读取消息的大小为0时,代表此处没有消息,则确定wrotePosition的位置。

参考

CommitLog格式
https://blog.csdn.net/meilong_whpu/article/details/76919267
ConsumeQueue格式
https://www.cnblogs.com/gaojy/p/15087869.html

RocketMQ给broker发送消息确定Commitlog的写入的位置相关推荐

  1. RocketMQ的broker处理消息commit时,加锁应该使用自旋锁还是重入锁

    讨论的主题 以下内容基于rocketmq4.7.1版本,并且集群模型采用的是DLedger 2主4从,主从节点在不同的机架上,所以关于其中提到的压测数据请勿直接套用自己的环境,仅供参考. rocket ...

  2. RocketMQ一行代码造成消息发送失败

    这是我的第 198 期分享 作者 | 丁威 来源 | 中间件兴趣圈(ID:dingwpmz_zjj) 分享 | Java中文社群(ID:javacn666) 1.问题现象 首先接到项目反馈使用 Roc ...

  3. rocketmq怎么保证消息一致性_从入门到入土(三)RocketMQ 怎么保证的消息不丢失?...

    精彩推荐 一百期Java面试题汇总SpringBoot内容聚合IntelliJ IDEA内容聚合Mybatis内容聚合 接上一篇:RocketMQ入门到入土(二)事务消息&顺序消息 面试官常常 ...

  4. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  5. 从入门到入土(三)RocketMQ 怎么保证的消息不丢失?

    精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(二)事务消息&顺序消息 面试 ...

  6. 如何使用Kafka可靠地发送消息-《Kafka权威指南(第二版)》阅读笔记

    可靠性是系统而不是某个独立组件的一个属性,所以,在讨论Kafka的可靠性保证时,需要从系统的整体出发.说到可靠性,那些与Kafka集成的系统与Kafka本身一样重要.正因为可靠性是系统层面的概念,所以 ...

  7. 微信网页版 发送消息

    使用微信wxpy 模块进行 消息发送 代码需要使用 python 3.6 版本 高版本python 不支持 wxpy 模块 使用微信号需要可以正常登录微信网页版 将需要发送消息 以一定格式写入文件中 ...

  8. RocketMQ源码解析:Producer发送消息+Broker消息存储

    文章目录 1. Producer 发送消息 2. Broker 接收消息 1. Producer 发送消息 先上一段简单的生产者代码 public static void main(String[] ...

  9. 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息

    RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...

最新文章

  1. Nature封面论文创意被剽窃?UC圣迭戈付向东实名举报中科院研究员抄袭
  2. Nginx前端设置反向代理,后端Apache如何获取访客的真实IP,结合PHP。
  3. 剑指offer——变态跳台阶
  4. C++的类型萃取技术
  5. shell字符串操作
  6. python openstack rabbitmq_OpenStack--Rabbitmq组件消息队列
  7. 秀操作:函数宏的三种封装方式
  8. 数据结构 | 实现串(定长顺序存储表示法)
  9. C 和 C++字符串详解
  10. ***常用的***手段
  11. python机械臂怎么控制_Python编程语言趣味学,EV3工业机械臂上线
  12. 数据结构与算法之递推算法 C++与PHP实现
  13. socket通信过程
  14. 三重积分(Triple Integral)
  15. We Dont Kown ....
  16. 提交代码到git仓库
  17. 成都职业技术学院计算机网络分院,成都职业技术学院2021年宿舍条件
  18. 全球与中国投影幕布市场深度研究分析报告
  19. 逻辑训练-爱因斯坦的推理题
  20. Python网络爬虫与信息提取笔记08-实例2:淘宝商品比价定向爬虫

热门文章

  1. 大盘盘口分析和分时解读的奥妙
  2. JSON Schema 简介
  3. 超外差接收机原理图讲解(二)--老版合成器
  4. Unity中通过mask组件裁剪出圆形图片,制作出圆形头像
  5. 零基础程序员想要学好.Net,跟着这7个步骤学习就可以了
  6. 高等数学第一章学习笔记
  7. 爱斯维尔-模式识别 Pattern Recognition论文投稿周期
  8. python编写高质量代码_用 Python 编写干净、可测试、高质量的代码
  9. 设置字体样式:字号大小,字体种类,字体粗细
  10. 4K画质修复神器,模糊照片秒变高清