前语:不要为了读文章而读文章,一定要带着问题来读文章,勤思考。

作者:kinnylee   来源:http://1t.click/g26

# 背景介绍

项目组使用阿里RocketMQ,对同一个消费组设置不同的tag订阅关系,出现消息丢失的问题,本文从rocketmq源码研究消息发布与订阅原理,并分析导致该问题的原因。

# 官方说明

  • 告诉使用者:同一个消费组,必须保持订阅关系一致

  • 为什么?它没有说!只能从源码找答案


# 问题复现

  • 启动消费者1,消费组为group1,订阅topicA的消息,tag设置为tag1 || tag2

  • 启动消费者2,消费组也为group1,也订阅topicA的消息,但是tag设置为tag3

  • 启动生产者,生产者发送含有tag1,tag2,tag3的消息各10条

  • 消费者1没有收到任何消息,消费者2收到部分消息

# 结论

  • 同一个消费组中,设置不同tag时,后启动的消费者会覆盖先启动的消费者设置的tag

  • tag决定了消息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分消息

# 原理说明

1、消息如何保存

CommitLog

  • 保存所有topic的原始消息

  • CommitLog分为多个文件,每个文件默认最大为1G

  • 每条记录包括:消息长度和消息文本(消息体,属性,uid等等)

  • 因每条消息长度不一致,每个commitLog的记录长度也不一致


ConsumerQueue

  • 保存某个Topic下某个Queue的索引信息

  • 每条记录包括:消息在commitLog中的offset,消息大小,消息tag的哈希值

  • 每条记录长度固定为20byte

  • producer发送消息后,先保存到commitLog,再异步建立该条消息对应的topic + queue对应的ConsumerQueue索引

  • 第三部分的Hash(tag)是服务端过滤消息的重要依据


2、consumer如何订阅消息?

注册订阅信息

  • consumer订阅时,会将订阅信息注册到到服务端

  • 保存订阅信息的是Map类,key为topic,value主要是tag

  • subVersion取当前时间。

这里的key是topic,subVersion版本号,这两点很关键!后面有用到!


拉取消息并过滤

  • 拉取消息时,首先从服务端获取订阅关系,得到tag的hash集合codeSet

  • 然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到消息过滤的目的,决定是否将该消息发送给consumer

  • 总之一句话:tag决定了消息是否发到客户端

3、消息过滤

服务端过滤

  • 过滤:tag的hash值过滤

  • 优点:

    • 减少不必要消息占用流量

  • 缺点:

    • Hash存在冲突,过滤不完全准确


客户端过滤

  • 服务端过滤存在不准确性,客户端再次精确过滤

  • 客户度过滤:tag的字符串值做对比。不相等的不返回给消费者

原因总结

  • 同一个consumer group的订阅关系,保存在RebalanceImpl类的Map中。key为topic

  • 不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只保存tag2.

  • 过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的消息

  • 客户端接收消息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何消息。消费者2能收到一半的消息(集群模式,假设消息平均分配,另外一半分给tag2)

# 源码分析

1、订阅关系数据结构


2、消费者1启动时注册的订阅关系


3、消费者2后启动覆盖订阅关系


4、服务端过滤时取出ConsumerQueue的Hash(tag)


5、对比消息的Hash(tag)和之前保存的订阅关系


7、客户端过滤


热文推荐

这份5G PPT这几天在我的朋友圈刷屏了。

作为一名Java程序员,你竟然不知道Intrumentation!

消息长度_填坑笔记:RocketMQ消息订阅失败问题?相关推荐

  1. 即将上线的Hive服务器面临的一系列填坑笔记

    即将上线的Spark服务器面临的一系列填坑笔记 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.18/10/19 16:36:31 WARN metastore.ObjectSt ...

  2. 即将上线的flume服务器面临的一系列填坑笔记

      即将上线的flume服务器面临的一系列填坑笔记 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任.   一.flume缺少依赖包导致启动失败! 报错信息如下: 2018-10-17 ...

  3. [react-native]react-native填坑笔记

    填坑笔记 开始入坑RN,从最开始的学起难免有不少乱七八糟的问题,记录在这里. 1. 8081端口占用问题 按照官网教程搭建开发环境并按照下面代码运行时候有报错,显示8081端口的问题 react-na ...

  4. unity代码更换ui图片_Unity3d 低分辨率UI素材换高分辨率素材填坑笔记

    迷糊 RectTransform PosX.PosY.Left .Right . Top .Bottom 以及 AchorMax/AnchorMin 的,开卷有益哟~ 背景 笔者开发的(PC)APP ...

  5. 填坑笔记-linux下安装cadence

    在Linux下有很多spice的仿真软件, 我之前用过ngspice, 也还在折腾. cadence也是一个比较优秀的软件,主要是有Linux版本支持,我于是又来了. windows之所以那么好用,给 ...

  6. tcp前4字节消息长度_网络基础篇之TCP

    ​网络分层 什么是 TCP TCP 是面向连接的.可靠的.基于字节流的传输层通信协议. - 面向连接:通过三次握手建立一对一的连接( UDP 协议 可以一个主机同时向多个主机发送消息,即一对多): - ...

  7. rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring

    博客地址:朝·闻·道​www.wuwenliang.net 本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友 ...

  8. mq发送消息到两个服务器问题,RocketMQ消息发送常见错误与解决方案

    本文将结合本身使用RocketMQ的经验,对消息发送常见的问题进行分享,基本会遵循出现问题,分析问题.解决问题.web 一.No route info of this topic 没法找到路由信息,其 ...

  9. 企业微信推送消息延迟_企业微信发送应用消息的实现

    企业号升级到企业微信后,发送应用消息的接口也变化了不少,除了原来的文本.图片.文件.语音.视频.图文消息等消息外,增加了文本卡片.markdown消息.小程序通知消息等内容,不过它们都可以共用一个接口 ...

最新文章

  1. Pycharm 和 vscode 多光标、折叠代码和代码格式化快捷键
  2. python---骰子游戏
  3. 使用Docker swarm构建wordpress集群
  4. Active Directory的复制拓扑,Active Directory系列之八
  5. 安全市场五巨头将面临新兴厂商的挑战
  6. GetProcAddress()用法
  7. 【Elasticsearch】在Elasticsearch中查询Term Vectors词条向量信息
  8. 中维云视通录像文件存储及设置_视频监控存储方式选择,磁盘阵列与NVR优势对比...
  9. utl_file包的使用
  10. 用JavaScript实现更复杂的交互
  11. chown、chgrp 更改文件属主属组
  12. spring boot设置http https端口
  13. 真正菜鸟用教程之WQSG Scrip Export WQSG (脚本导出导入工具,PSP、NDS汉化必备 )
  14. cocos2dx fnt字体制作
  15. Java 正则表达式匹配规则
  16. Maven镜像仓库替换为阿里云镜像仓库
  17. 两个小米路由器mini无线桥接
  18. 数据结构总结(线性结构,树型结构,图型结构,顺序结构,链式结构)
  19. 制作CAB包以及文件签名
  20. vue3 如何给动态渲染的组件添加ref

热门文章

  1. 前端学习(1022):jquery学习目标
  2. 前端学习(610):js执行过程
  3. mybatis学习(8):The server time zone value '???ú±ê×??±??' is unrecognized or represents more
  4. 第一百一十四期:盘点十大最新Web UI测试工具
  5. spring学习(34):构造函数依赖注入
  6. eclipse没有server怎么办
  7. java学习(97):中断线程的另一种处理
  8. C 如何对指针进行指定字节的偏置操作
  9. linux下有四个作业优先级,第一次作业:对Linux系统分析
  10. Linux scp 免密码 传输文件