RocketMq 消息偏移量 Offset
消息偏移量 Offset
queue0 offset 0 0-20 offset 1 20-40
纠错:每条消息的tag对应的HashCode.
queue1 offset 0 0-20 offset 1 20-40
queue2 offset 0 0-20 offset 1 20-40
queue3 offset 0 0-20 offset 1 20-40
SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2BFB0000, offsetMsgId=C0A81F9800002A9F0000000000000000(每条消息偏移量,以十六进制表示消息长度,转换成十进制0), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0(队列偏移量,当前队列增加1条就自增1)] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DE20001, offsetMsgId=C0A81F9800002A9F00000000000000CB(十六进制表示消息长度,转换成十进制203), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0002, offsetMsgId=C0A81F9800002A9F0000000000000196(十六进制表示消息长度,转换成十进制406), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0003, offsetMsgId=C0A81F9800002A9F0000000000000261(十六进制表示消息长度,转换成十进制609), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60004, offsetMsgId=C0A81F9800002A9F000000000000032C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60005, offsetMsgId=C0A81F9800002A9F00000000000003F7, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60006, offsetMsgId=C0A81F9800002A9F00000000000004C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2E000007, offsetMsgId=C0A81F9800002A9F000000000000058D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]
概念
message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费
类型(父类是OffsetStore):
本地文件类型
DefaultMQPushConsumer 的 BROADCASTING 广播模式,各个 Consumer 没有互相干扰,使用 LocalFileOffsetStore,把 Offset 存储在本地
Broker 代存储类型
DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore
作用
主要是记录消息的偏移量,有多个消费者进行消费
集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
广播模式下采用 LocalFileOffsetStore,消费端存储
建议采用 pushConsumer,RocketMQ 自动维护 OffsetStore,如果用另外一种 pullConsumer 需要自己进行维护 OffsetStore
消息存储 CommitLog
消息存储是由 ConsumeQueue 和 CommitLog 配合完成
ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。Topic 下的每个 message queue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。默认地址:store/consumequeue/{topicName}/{queueid}/fileName
CommitLog:存储消息真正内容的文件。
生成规则:
每个文件的默认1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824 Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,消息存储的时候会顺序写入文件,当文件满了则写入下一个文件。
判断消息存储在哪个 CommitLog 上
例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。
Broker 里面一个 Topic 里面有多个 MesssageQueue,每个 MessageQueue 对应一个 ConsumeQueue,ConsumeQueue 里面记录的是消息在 CommitLog 里面的物理存储地址。
IndexFile 消息索引文件
ConsumerQueue是通过偏移量offset去CommitLog文件中查找消息,但实际工作应用中,我们想查找某条具体的消息,并不知道offset值,那该怎么办呢?那IndexFile作用就来了。 IndexFile是消息索引文件,如果一个生产者发送的消息包含key值的话,会使用IndexFile存储消息索引,主要用于使用key来查询消息。文件的内容结构如图
在Broker端,通过Key来计算Hash槽的位置,从而找到Index索引数据。从Index索引中拿到消息的物理偏移量,最后根据这个物理偏移量,直接到CommitLog文件中去找就可以了。另外说明下,通过IndexFile来查找消息的方法不影响RocketMQ的正常生产-消费流程,它只是查询定位消息的方法而已。
#世界读书日
京东【自营】百万图书钜惠
每满100减50 满减叠券享300减200
活动地址:https://u.jd.com/Q2S68Cz
更多文章请关注公众号"七星课堂"
RocketMq 消息偏移量 Offset相关推荐
- RocketMQ消息的存储结构
RocketMQ就是采用文件系统的方式来存储消息,消息的存储是由ConsumeQueue和CommitLog配合完成的.CommitLog是消息真正的物理存储文件.ConsumeQueue是消息的逻辑 ...
- 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解
导语 在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...
- [RocketMQ]消息中间件—RocketMQ消息消费(一)
2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...
- rocketmq 消息指定_rocketmq-常见问题总结(消息的顺序、重复、消费模式)
参考: http://www.cnblogs.com/wxd0108/p/6038543.html https://www.cnblogs.com/520playboy/p/6750023.html ...
- RocketMQ消息消费源码分析(二消息的消费)
首先回到DefaultMQPushConsumerImpl start方法 public synchronized void start() throws MQClientException {sw ...
- RocketMQ(七)RocketMQ消息生产及消息储存机制
目录 1.消息生产 1.1 消息的生产过程 1.2 Queue选择算法 2.消息储存 2.1 存储介质 2.2 消息的存储和发送 2.3 消息存储结构 2.4 刷盘机制 1.消息生产 1.1 消息的生 ...
- 从源码分析RocketMQ系列-RocketMQ消息设计详解
1 消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...
- RocketMQ 消息消费 轮询机制 PullRequestHoldService
1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥.首先需要补充一点消费相关的前置知识. 1.1 消息消费方式 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 ...
- RocketMQ 消息订阅Subscribe—— Push Pull 模式
2019独角兽企业重金招聘Python工程师标准>>> RocketMQ 消息订阅Subscribe-- Push & Pull 模式 RocketMQ消息订阅的两种模式 R ...
最新文章
- Photoshop图像处理操作汇总
- linux设置光标位置,linux下光标定位和输出颜色设置
- python 去除字符串的标点符号 用_Python输入和输出
- python每周小测验答案_python第一周小测验答案Centos下更新Python版本
- html5设置视频显示第一帧,如何检测HTML5视频何时播放第一帧?
- 【数据库】mysql常用的数据类型
- DPDK 锁:ticketlock和mcslock
- 嵌入式分享合集101-PLC
- 现代公司制度的法理学基础 (贾登勋 王勇)
- Google Guava简介
- 符合W3C的网站的开发模型和必要性的探讨(一)
- 怎么提取PDF页面,PDF页面提取的操作步骤
- 子平真诠释疑笔记(三)
- shiro学习之错误 No realms have been configured! One or more realms must be present to execute an authori
- p12..Matplotlib:Contours等高线图
- DeepDGA:基于生成对抗网络的DGA生成与检测
- 开过全新BMW 3系 “鱼与熊掌”都可兼得
- 丹东纺专99届计算机专业,皖南医学院麻醉专业99届校友毕业20周年返校聚会
- ctp linux 查询死循环,CTP行情接入常见的问题记录
- linux和docker的capabilities介绍