Broker接收得到的来自provider的消息在sendMessageProcessor的sendMessage()方法当中处理。

在sendMessage()方法当中会直接将所接收得到的消息封装为MessageExtBrokerInner。

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner,MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),msgInner.getTags()));msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(sysFlag);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

然后将封装得到的消息交由消息存储层,默认defaultMessageStore,调用putMessage()方法。

在DefaultMessageStrore里将会调用commitLog的putMessage()方法来存储消息。

在broker中,存储消息的文件在MapedFileQueue当中将不连续的物理文件作为连续的逻辑文件进行处理。也就是说,在broker中,由MapedFile来管理物理文件的映射。所有的消息队列共用这里的物理文件进行消息的物理存储,用过记录各个消息的偏移量offset来在这里的物理文件上得到所要取得到的消息具体数据。

作为逻辑上连续的文件队列,通过规定每个文件统一的大小可以通过offset精准的取得消息所在的物理文件上的位置。

可以通过下面这个方法,可以清楚地看到是如何在逻辑的连续文件队列中通过offset定位到文件的。

public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {this.readWriteLock.readLock().lock();MapedFile mapedFile = this.getFirstMapedFile();if (mapedFile != null) {int index =(int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize));if (index < 0 || index >= this.mapedFiles.size()) {logError.warn("findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",//offset,//index,//this.mapedFileSize,//this.mapedFiles.size(),//UtilAll.currentStackTrace());}try {return this.mapedFiles.get(index);}catch (Exception e) {if (returnFirstOnNotFound) {return mapedFile;}}}}catch (Exception e) {log.error("findMapedFileByOffset Exception", e);}finally {this.readWriteLock.readLock().unlock();}return null;
}

在这个方法可以清楚的看到恰恰是通过offset与规定的文件大小相除得到具体的文件在队列中的位置。

我们可以在DefaultMessageCallBack中看到,在MapedFileQueue当中会调用他的doAppend()回调方法来具体看到消息的存储过程。

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,final int maxBlank, final Object msg) {// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;// PHY OFFSETlong wroteOffset = fileFromOffset + byteBuffer.position();String msgId =MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),wroteOffset);// Record ConsumeQueue informationString key = msgInner.getTopic() + "-" + msgInner.getQueueId();Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// Transaction messages that require special handlingfinal int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queuecase MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:queueOffset = 0L;break;case MessageSysFlag.TransactionNotType:case MessageSysFlag.TransactionCommitType:default:break;}/*** Serialize message*/final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes();final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;final byte[] topicData = msgInner.getTopic().getBytes();final int topicLength = topicData == null ? 0 : topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;final int msgLen = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCODE+ 4 // 3 BODYCRC+ 4 // 4 QUEUEID+ 4 // 5 FLAG+ 8 // 6 QUEUEOFFSET+ 8 // 7 PHYSICALOFFSET+ 4 // 8 SYSFLAG+ 8 // 9 BORNTIMESTAMP+ 8 // 10 BORNHOST+ 8 // 11 STORETIMESTAMP+ 8 // 12 STOREHOSTADDRESS+ 4 // 13 RECONSUMETIMES+ 8 // 14 Prepared Transaction Offset+ 4 + bodyLength // 14 BODY+ 1 + topicLength // 15 TOPIC+ 2 + propertiesLength // 16 propertiesLength+ 0;// Exceeds the maximum messageif (msgLen > this.maxMessageSize) {CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: "+ bodyLength + ", maxMessageSize: " + this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetMsgStoreItemMemory(maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);// 3 The remaining space may be any value//// Here the length of the specially set maxBlankbyteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId,msgInner.getStoreTimestamp(), queueOffset);}// Initialization of storage spacethis.resetMsgStoreItemMemory(msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.msgStoreItemMemory.put(msgInner.getBornHostBytes());// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.msgStoreItemMemory.put(msgInner.getStoreHostBytes());// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength > 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength > 0)this.msgStoreItemMemory.put(propertiesData);// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result =new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset);switch (tranType) {case MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:break;case MessageSysFlag.TransactionNotType:case MessageSysFlag.TransactionCommitType:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result;
}

在这个方法中,我们可以具体看到消息在broker当中的消息物理存储结构。在这里可以清楚的看到,消息将在这里按照存储规则写入byteBuffer。并通过byteBuffer的put()方法写入物理文件当中。

此时,消息的物理存储已经结束,但是,仍没有将消息的具体数据写入consumerQueue当中以便消费者消费。

DispatchRequest dispatchRequest = new DispatchRequest(//topic,// 1queueId,// 2result.getWroteOffset(),// 3result.getWroteBytes(),// 4tagsCode,// 5msg.getStoreTimestamp(),// 6result.getLogicsOffset(),// 7msg.getKeys(),// 8/*** Transaction*/msg.getSysFlag(),// 9msg.getPreparedTransactionOffset());// 10this.defaultMessageStore.putDispatchRequest(dispatchRequest);

在将消息存入物理文件之后,需要构造dispatchRequest,以便将消息的位置数据分发给相应的topic下面的ConsumerQueue以便消费者取得。

DefaultMessageStore将会将该条消息的位置信息放入DispatchMessageService的缓冲队列中,而DispatchMessageService将会周期性的从缓冲队列当中去封装有消息位置信息的dispatchRequest进行处理。

通过topic以及消息的queueId可以精确的得到消息所对应的ConsumQueue。

ConsumQueue的存储单元固定二十字节。

前八个字节存储消息在commitLog当中的偏移量offset位置。

中四个字节存储消息的大小。

后八个字节记录消息的tagsCode。

然后写在ConsumQueue自己的逻辑文件队列MapedFileQueue当中。

在生产者取消息的时候只需要根据offset取得consumQueue的相应存储单元,就可以在commitLog上定位得到所要取得的具体消息数据。

在消息的位置信息放入ConsumQueue后将会将消息加入indexService的阻塞队列,等待indexService定期构建索引。

在索引文件的key当中,通过 topic + “#” + key 进行构造key。 作为构造成员的key则是由topic和queueId组成。

private String buildKey(final String topic, final String key) {return topic + "#" + key;
}

而value则是在commitLog上的物理文件所在的位置,也是八字节来保存。

保存过程中,先通过keyHash与索引文件的slot数量取余,计算得到相对slot的存储位置。在用取余的结果根据文件头偏移量与每个slot的偏移量取得具体的的slot偏移量。

int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;

在计算得到slot的偏移量之后,根据索引文件的头大小以及所有slot的偏移量之和,加上之前所有的消息索引具体偏移量之和,得到消息索引的具体物理存放位置。

int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE+ this.indexHeader.getIndexCount() * INDEX_SIZE;

索引共占20字节,四字节keyHash,八字节commitLog的物理偏移量,四字节与索引文件建立时间的时间差,四字节的slot位置。

写入相应的索引文件,索引宣告建立完毕。

之后如果该broekr是同步master,将会在所有的broker将commitLog消息复制完毕到相应的offset之前,都会阻塞。

以上是Broker的消息存储。

RocketMQ源码解析-Broker的消息存储相关推荐

  1. RocketMQ源码解析-Broker的HA实现

    以master异步复制为例子. 在rocketmq的slave broker机子当中,会在DefaultMessageStore的启动当中启动自己的HaService来进行自己的ha服务. publi ...

  2. RocketMQ源码解析-Broker部分之Broker启动过程

    目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...

  3. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  4. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  5. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  6. RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...

  7. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

  8. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  9. RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】

    详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...

最新文章

  1. 使用VLC搭建RTSP服务器
  2. springboot之@ConfigurationProperties加载配置文件
  3. 05,pytorch_手写数字案例
  4. 2019/Province_C_C++_A/B/数列求值
  5. jeecg-framework-3.3.2-RELEASE 最新版本发布
  6. ExtJS4.0的数据集 .
  7. UI设计干货模板|首页设计技巧
  8. 爬虫原理与数据抓取----- Requests模块
  9. fiddler弱网测试_用fiddler实现弱网测试
  10. 鑫光芒引流客源篇微商加人的24种方法
  11. 进程同步与信号量机制的应用
  12. quora ios_企业家的Quora指南
  13. 【目标跟踪 MOT】JDE - Towards Real-Time Multi-Object Tracking
  14. i5 12490f和i5 12400f的区别
  15. 【Tableau Desktop 企业日常技巧9.0】打开第二个 Tableau 桌面实例时出现错误“连接错误:Tableau 无法连接到数据源“
  16. AI崛起,阿里的科技孵化力
  17. VB问题——ByRef参数类型不符
  18. 可编程中控 c 语言,LG-PGMIII可编程中控
  19. 计算机f8键的功能,f8键有什么作用(图文)
  20. 常用的git命令(实用)

热门文章

  1. BigInt:JavaScript 中的任意精度整数
  2. Intellij IDEA 自定义方法注释/方法模板
  3. 使用了未经检查或不安全的操作_违规操作就是对家庭的不负责!电气安全员提醒你的安全常识...
  4. HDU(1175),连连看,BFS
  5. 结对编程:黄金分割游戏
  6. array数组的若干操作
  7. shell-script(command groups)
  8. C++复习总结(涵盖所有C++基本考点)!
  9. AB Test 是什么
  10. ElasticSearch权威指南学习(结构化查询)