消息偏移量 Offset

queue0  offset 0 0-20 offset 1 20-40

纠错:每条消息的tag对应的HashCode.

queue1  offset 0 0-20 offset 1 20-40

queue2  offset 0 0-20 offset 1 20-40

queue3  offset 0 0-20 offset 1 20-40

SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2BFB0000, offsetMsgId=C0A81F9800002A9F0000000000000000(每条消息偏移量,以十六进制表示消息长度,转换成十进制0), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0(队列偏移量,当前队列增加1条就自增1)] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DE20001, offsetMsgId=C0A81F9800002A9F00000000000000CB(十六进制表示消息长度,转换成十进制203), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0002, offsetMsgId=C0A81F9800002A9F0000000000000196(十六进制表示消息长度,转换成十进制406), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0003, offsetMsgId=C0A81F9800002A9F0000000000000261(十六进制表示消息长度,转换成十进制609), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60004, offsetMsgId=C0A81F9800002A9F000000000000032C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60005, offsetMsgId=C0A81F9800002A9F00000000000003F7, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60006, offsetMsgId=C0A81F9800002A9F00000000000004C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2E000007, offsetMsgId=C0A81F9800002A9F000000000000058D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]

概念

  • message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。

  • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。

  • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费

类型(父类是OffsetStore):

  • 本地文件类型

  • DefaultMQPushConsumer 的 BROADCASTING 广播模式,各个 Consumer 没有互相干扰,使用 LocalFileOffsetStore,把 Offset 存储在本地

  • Broker 代存储类型

  • DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore

作用

  • 主要是记录消息的偏移量,有多个消费者进行消费

  • 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值

  • 广播模式下采用 LocalFileOffsetStore,消费端存储

建议采用 pushConsumer,RocketMQ 自动维护 OffsetStore,如果用另外一种 pullConsumer 需要自己进行维护 OffsetStore

消息存储 CommitLog

消息存储是由 ConsumeQueue 和 CommitLog 配合完成

  • ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。Topic 下的每个 message queue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。默认地址:store/consumequeue/{topicName}/{queueid}/fileName

  • CommitLog:存储消息真正内容的文件。

  • 生成规则:

  • 每个文件的默认1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824 Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,消息存储的时候会顺序写入文件,当文件满了则写入下一个文件。

  • 判断消息存储在哪个 CommitLog 上

  • 例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。

Broker 里面一个 Topic 里面有多个 MesssageQueue,每个 MessageQueue 对应一个 ConsumeQueue,ConsumeQueue 里面记录的是消息在 CommitLog 里面的物理存储地址。

IndexFile 消息索引文件

ConsumerQueue是通过偏移量offset去CommitLog文件中查找消息,但实际工作应用中,我们想查找某条具体的消息,并不知道offset值,那该怎么办呢?那IndexFile作用就来了。 IndexFile是消息索引文件,如果一个生产者发送的消息包含key值的话,会使用IndexFile存储消息索引,主要用于使用key来查询消息。文件的内容结构如图

在Broker端,通过Key来计算Hash槽的位置,从而找到Index索引数据。从Index索引中拿到消息的物理偏移量,最后根据这个物理偏移量,直接到CommitLog文件中去找就可以了。另外说明下,通过IndexFile来查找消息的方法不影响RocketMQ的正常生产-消费流程,它只是查询定位消息的方法而已。

#世界读书日

京东【自营】百万图书钜惠

每满100减50  满减叠券享300减200

活动地址:https://u.jd.com/Q2S68Cz

更多文章请关注公众号"七星课堂"

RocketMq 消息偏移量 Offset相关推荐

  1. RocketMQ消息的存储结构

    RocketMQ就是采用文件系统的方式来存储消息,消息的存储是由ConsumeQueue和CommitLog配合完成的.CommitLog是消息真正的物理存储文件.ConsumeQueue是消息的逻辑 ...

  2. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  3. [RocketMQ]消息中间件—RocketMQ消息消费(一)

    2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...

  4. rocketmq 消息指定_rocketmq-常见问题总结(消息的顺序、重复、消费模式)

    参考: http://www.cnblogs.com/wxd0108/p/6038543.html https://www.cnblogs.com/520playboy/p/6750023.html ...

  5. RocketMQ消息消费源码分析(二消息的消费)

    首先回到DefaultMQPushConsumerImpl  start方法 public synchronized void start() throws MQClientException {sw ...

  6. RocketMQ(七)RocketMQ消息生产及消息储存机制

    目录 1.消息生产 1.1 消息的生产过程 1.2 Queue选择算法 2.消息储存 2.1 存储介质 2.2 消息的存储和发送 2.3 消息存储结构 2.4 刷盘机制 1.消息生产 1.1 消息的生 ...

  7. 从源码分析RocketMQ系列-RocketMQ消息设计详解

    1 消息存储   消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...

  8. RocketMQ 消息消费 轮询机制 PullRequestHoldService

    1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥.首先需要补充一点消费相关的前置知识. 1.1 消息消费方式 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 ...

  9. RocketMQ 消息订阅Subscribe—— Push Pull 模式

    2019独角兽企业重金招聘Python工程师标准>>> RocketMQ 消息订阅Subscribe-- Push & Pull 模式 RocketMQ消息订阅的两种模式 R ...

最新文章

  1. Photoshop图像处理操作汇总
  2. linux设置光标位置,linux下光标定位和输出颜色设置
  3. python 去除字符串的标点符号 用_Python输入和输出
  4. python每周小测验答案_python第一周小测验答案Centos下更新Python版本
  5. html5设置视频显示第一帧,如何检测HTML5视频何时播放第一帧?
  6. 【数据库】mysql常用的数据类型
  7. DPDK 锁:ticketlock和mcslock
  8. 嵌入式分享合集101-PLC
  9. 现代公司制度的法理学基础 (贾登勋 王勇)
  10. Google Guava简介
  11. 符合W3C的网站的开发模型和必要性的探讨(一)
  12. 怎么提取PDF页面,PDF页面提取的操作步骤
  13. 子平真诠释疑笔记(三)
  14. shiro学习之错误 No realms have been configured! One or more realms must be present to execute an authori
  15. p12..Matplotlib:Contours等高线图
  16. DeepDGA:基于生成对抗网络的DGA生成与检测
  17. 开过全新BMW 3系 “鱼与熊掌”都可兼得
  18. 丹东纺专99届计算机专业,皖南医学院麻醉专业99届校友毕业20周年返校聚会
  19. ctp linux 查询死循环,CTP行情接入常见的问题记录
  20. linux和docker的capabilities介绍

热门文章

  1. 避开这十个坑 | 自学编程很轻松
  2. 手把手教Kaggle账号注册-不翻墙
  3. STL 配置器(allocator)
  4. 1326: 取余运算(mod)
  5. leetcode(1.算数之和)【简单】
  6. 算法训练第五十一天 | 309.最佳买卖股票时机含冷冻期、714.买卖股票的最佳时机含手续费、股票问题总结
  7. 美通社中国网络与社交媒体监测产品(CMM)升级
  8. JavaScript字符串替换
  9. 2022年中国餐饮加盟行业白皮书
  10. 链表的10个题目——第2周打卡