RocketMQ源码解析-Broker的消息存储
Broker接收得到的来自provider的消息在sendMessageProcessor的sendMessage()方法当中处理。
在sendMessage()方法当中会直接将所接收得到的消息封装为MessageExtBrokerInner。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner,MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),msgInner.getTags()));msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(sysFlag);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
然后将封装得到的消息交由消息存储层,默认defaultMessageStore,调用putMessage()方法。
在DefaultMessageStrore里将会调用commitLog的putMessage()方法来存储消息。
在broker中,存储消息的文件在MapedFileQueue当中将不连续的物理文件作为连续的逻辑文件进行处理。也就是说,在broker中,由MapedFile来管理物理文件的映射。所有的消息队列共用这里的物理文件进行消息的物理存储,用过记录各个消息的偏移量offset来在这里的物理文件上得到所要取得到的消息具体数据。
作为逻辑上连续的文件队列,通过规定每个文件统一的大小可以通过offset精准的取得消息所在的物理文件上的位置。
可以通过下面这个方法,可以清楚地看到是如何在逻辑的连续文件队列中通过offset定位到文件的。
public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {this.readWriteLock.readLock().lock();MapedFile mapedFile = this.getFirstMapedFile();if (mapedFile != null) {int index =(int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize));if (index < 0 || index >= this.mapedFiles.size()) {logError.warn("findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",//offset,//index,//this.mapedFileSize,//this.mapedFiles.size(),//UtilAll.currentStackTrace());}try {return this.mapedFiles.get(index);}catch (Exception e) {if (returnFirstOnNotFound) {return mapedFile;}}}}catch (Exception e) {log.error("findMapedFileByOffset Exception", e);}finally {this.readWriteLock.readLock().unlock();}return null;
}
在这个方法可以清楚的看到恰恰是通过offset与规定的文件大小相除得到具体的文件在队列中的位置。
我们可以在DefaultMessageCallBack中看到,在MapedFileQueue当中会调用他的doAppend()回调方法来具体看到消息的存储过程。
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,final int maxBlank, final Object msg) {// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;// PHY OFFSETlong wroteOffset = fileFromOffset + byteBuffer.position();String msgId =MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),wroteOffset);// Record ConsumeQueue informationString key = msgInner.getTopic() + "-" + msgInner.getQueueId();Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// Transaction messages that require special handlingfinal int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queuecase MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:queueOffset = 0L;break;case MessageSysFlag.TransactionNotType:case MessageSysFlag.TransactionCommitType:default:break;}/*** Serialize message*/final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes();final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;final byte[] topicData = msgInner.getTopic().getBytes();final int topicLength = topicData == null ? 0 : topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;final int msgLen = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCODE+ 4 // 3 BODYCRC+ 4 // 4 QUEUEID+ 4 // 5 FLAG+ 8 // 6 QUEUEOFFSET+ 8 // 7 PHYSICALOFFSET+ 4 // 8 SYSFLAG+ 8 // 9 BORNTIMESTAMP+ 8 // 10 BORNHOST+ 8 // 11 STORETIMESTAMP+ 8 // 12 STOREHOSTADDRESS+ 4 // 13 RECONSUMETIMES+ 8 // 14 Prepared Transaction Offset+ 4 + bodyLength // 14 BODY+ 1 + topicLength // 15 TOPIC+ 2 + propertiesLength // 16 propertiesLength+ 0;// Exceeds the maximum messageif (msgLen > this.maxMessageSize) {CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: "+ bodyLength + ", maxMessageSize: " + this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetMsgStoreItemMemory(maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);// 3 The remaining space may be any value//// Here the length of the specially set maxBlankbyteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId,msgInner.getStoreTimestamp(), queueOffset);}// Initialization of storage spacethis.resetMsgStoreItemMemory(msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.msgStoreItemMemory.put(msgInner.getBornHostBytes());// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.msgStoreItemMemory.put(msgInner.getStoreHostBytes());// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength > 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength > 0)this.msgStoreItemMemory.put(propertiesData);// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result =new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset);switch (tranType) {case MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:break;case MessageSysFlag.TransactionNotType:case MessageSysFlag.TransactionCommitType:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result;
}
在这个方法中,我们可以具体看到消息在broker当中的消息物理存储结构。在这里可以清楚的看到,消息将在这里按照存储规则写入byteBuffer。并通过byteBuffer的put()方法写入物理文件当中。
此时,消息的物理存储已经结束,但是,仍没有将消息的具体数据写入consumerQueue当中以便消费者消费。
DispatchRequest dispatchRequest = new DispatchRequest(//topic,// 1queueId,// 2result.getWroteOffset(),// 3result.getWroteBytes(),// 4tagsCode,// 5msg.getStoreTimestamp(),// 6result.getLogicsOffset(),// 7msg.getKeys(),// 8/*** Transaction*/msg.getSysFlag(),// 9msg.getPreparedTransactionOffset());// 10this.defaultMessageStore.putDispatchRequest(dispatchRequest);
在将消息存入物理文件之后,需要构造dispatchRequest,以便将消息的位置数据分发给相应的topic下面的ConsumerQueue以便消费者取得。
DefaultMessageStore将会将该条消息的位置信息放入DispatchMessageService的缓冲队列中,而DispatchMessageService将会周期性的从缓冲队列当中去封装有消息位置信息的dispatchRequest进行处理。
通过topic以及消息的queueId可以精确的得到消息所对应的ConsumQueue。
ConsumQueue的存储单元固定二十字节。
前八个字节存储消息在commitLog当中的偏移量offset位置。
中四个字节存储消息的大小。
后八个字节记录消息的tagsCode。
然后写在ConsumQueue自己的逻辑文件队列MapedFileQueue当中。
在生产者取消息的时候只需要根据offset取得consumQueue的相应存储单元,就可以在commitLog上定位得到所要取得的具体消息数据。
在消息的位置信息放入ConsumQueue后将会将消息加入indexService的阻塞队列,等待indexService定期构建索引。
在索引文件的key当中,通过 topic + “#” + key 进行构造key。 作为构造成员的key则是由topic和queueId组成。
private String buildKey(final String topic, final String key) {return topic + "#" + key;
}
而value则是在commitLog上的物理文件所在的位置,也是八字节来保存。
保存过程中,先通过keyHash与索引文件的slot数量取余,计算得到相对slot的存储位置。在用取余的结果根据文件头偏移量与每个slot的偏移量取得具体的的slot偏移量。
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;
在计算得到slot的偏移量之后,根据索引文件的头大小以及所有slot的偏移量之和,加上之前所有的消息索引具体偏移量之和,得到消息索引的具体物理存放位置。
int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE+ this.indexHeader.getIndexCount() * INDEX_SIZE;
索引共占20字节,四字节keyHash,八字节commitLog的物理偏移量,四字节与索引文件建立时间的时间差,四字节的slot位置。
写入相应的索引文件,索引宣告建立完毕。
之后如果该broekr是同步master,将会在所有的broker将commitLog消息复制完毕到相应的offset之前,都会阻塞。
以上是Broker的消息存储。
RocketMQ源码解析-Broker的消息存储相关推荐
- RocketMQ源码解析-Broker的HA实现
以master异步复制为例子. 在rocketmq的slave broker机子当中,会在DefaultMessageStore的启动当中启动自己的HaService来进行自己的ha服务. publi ...
- RocketMQ源码解析-Broker部分之Broker启动过程
目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析
深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...
- RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】
基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...
- RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码
转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
- RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】
详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...
最新文章
- 使用VLC搭建RTSP服务器
- springboot之@ConfigurationProperties加载配置文件
- 05,pytorch_手写数字案例
- 2019/Province_C_C++_A/B/数列求值
- jeecg-framework-3.3.2-RELEASE 最新版本发布
- ExtJS4.0的数据集 .
- UI设计干货模板|首页设计技巧
- 爬虫原理与数据抓取----- Requests模块
- fiddler弱网测试_用fiddler实现弱网测试
- 鑫光芒引流客源篇微商加人的24种方法
- 进程同步与信号量机制的应用
- quora ios_企业家的Quora指南
- 【目标跟踪 MOT】JDE - Towards Real-Time Multi-Object Tracking
- i5 12490f和i5 12400f的区别
- 【Tableau Desktop 企业日常技巧9.0】打开第二个 Tableau 桌面实例时出现错误“连接错误:Tableau 无法连接到数据源“
- AI崛起,阿里的科技孵化力
- VB问题——ByRef参数类型不符
- 可编程中控 c 语言,LG-PGMIII可编程中控
- 计算机f8键的功能,f8键有什么作用(图文)
- 常用的git命令(实用)