【RocketMQ工作原理】消息的消费
消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。消费者组对于消息消费的模
式又分为两种:集群消费Clustering和广播消费Broadcasting。
获取消费类型
拉取式消费
Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过
程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。
由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比
例会增加;间隔太长,消息的实时性太差
推送式消费
该模式下Broker收到数据后会主动推送给Consumer。该获取方式一般实时性较高。
该获取方式是典型的发布-订阅模式,即Consumer向其关联的Queue注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是Consumer去Queue中拉取消息。而这些都是基于Consumer与Broker间的长连接的。长连接的维护是需要消耗系统资源的。
对比
- pull:需要应用去实现对关联Queue的遍历,实时性差;但便于应用控制消息的拉取
- push:封装了对关联Queue的遍历,实时性强,但会占用较多的系统资源
消费模式
广播消费
广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条
消息都会被发送到Consumer Group中的每个Consumer。
集群消费
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消
息只会被发送到Consumer Group中的某个Consumer。
消息进度保存
- 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会
消费所有消息,但它们的消费进度是不同。所以consumer各自保存各自的消费进度。 - 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic
中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是
需要共享的。下图是broker中存放的各个Topic的各个Queue的消费进度。
Rebalance机制
Rebalance机制讨论的前提是:集群消费。
什么是Rebalance
Rebalance即再均衡,指的是,将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个
Consumer间进行重新分配的过程。
Rebalance机制的本意是为了提升消息的并行消费能力。例如,⼀个Topic下5个队列,在只有1个消费
者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加⼀个消费者,那么就可以给
其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。
Rebalance限制
由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,
多余的消费者实例将分配不到任何队列。
Rebalance危害
Rebalance的在提升消费能力的同时,也带来一些问题:
- 消费暂停:在只有一个Consumer时,其负责消费所有队列;在新增了一个Consumer后会触发
Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。 - 消费重复:Consumer 在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。
同步提交:consumer提交了其消费完毕的一批消息的offset给broker后,需要等待broker的成功
ACK。当收到ACK后,consumer才会继续获取并消费下一批消息。在等待ACK期间,consumer
是阻塞的。
异步提交:consumer提交了其消费完毕的一批消息的offset给broker后,不需要等待broker的成
功ACK。consumer可以直接获取并消费下一批消息。
对于一次性读取消息的数量,需要根据具体业务场景选择一个相对均衡的是很有必要的。因为
数量过大,系统性能提升了,但产生重复消费的消息数量可能会增加;数量过小,系统性能会
下降,但被重复消费的消息数量可能会减少。
- 消费突刺:由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。
Rebalance产生的原因
导致Rebalance产生的原因,无非就两个:消费者所订阅Topic的Queue数量发生变化,或消费者组中消
费者的数量发生变化。
1)Queue数量发生变化的场景:
Broker扩容或缩容
Broker升级运维
Broker与NameServer间的网络异常
Queue扩容或缩容
2)消费者数量发生变化的场景:
Consumer Group扩容或缩容
Consumer升级运维
Consumer与NameServer间网络异常
Rebalance过程
在Broker中维护着多个Map集合,这些集合中动态存放着当前Topic中Queue的信息、Consumer Group
中Consumer实例的信息。一旦发现消费者所订阅的Queue数量发生变化,或消费者组中消费者的数量
发生变化,立即向Consumer Group中的每个实例发出Rebalance通知
TopicCon
【RocketMQ工作原理】消息的消费相关推荐
- rocketMQ —— 02(集群搭建、rocketmq工作原理)
目录标题 一.相关推荐 二.基本架构图: 三.集群模式 1.单Master模式(这种单节点的理论上不叫集群) 2.多Master模式 3.多Master多Slave模式(异步) 4.多Master多S ...
- RocketMQ如何保证消息顺序消费?又为何不解决消息重复消费问题?
消息的顺序消费对于业务系统来说非常重要,一笔订单产生了3条消息,分别是订单创建.订单付款.订单完成.消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的. 如何保证消息顺序消费? ...
- 【RocketMQ工作原理】消息堆积与消费延迟
概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来 越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟. ...
- 分布式消息队列RocketMQ工作原理与应用(一)
第 1 章 RocketMQ概述 一.MQ概述 1 .MQ简介 MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产.存储.消费全过程API的软件系 ...
- 【RocketMQ工作原理】消息的清理
消息被消费过后会被清理掉吗?不会的. 消息是被顺序存储在commitlog文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以commitlog文件为单位进行清理的.否则会急 ...
- 【RocketMQ工作原理】消息的存储
RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中. abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失.若在没有 ...
- 【RocketMQ工作原理】消息的生产过程
1 消息的生产过程 Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程: Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求 ...
- RocketMQ工作原理 高级功能介绍
1.1 消息存储 分布式队列因为有高可靠性的要求,所以数据要进行持久化存储. 消息生成者发送消息 MQ收到消息,将消息进行持久化,在存储中新增一条记录 返回ACK给生产者 MQ push 消息给对应的 ...
- 【RocketMQ工作原理】
什么是消费幂等 当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消 费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的. 幂等:若某操作执行多次与 ...
- 【RocketMQ工作原理】offset管理
这里的offset指的是Consumer的消费进度offset 消费进度offset是用来记录每个Queue的不同消费组的消费进度的.根据消费进度记录器的不同,可以 分为两种模式:本地模式和远程模式. ...
最新文章
- Spring Roo 2 使用分析
- 已经到了快元旦,可是总是不自在
- Android下载文件
- 【ArcGIS遇上Python】ArcGIS Python获取Shapefile矢量数据字段名称
- CSS--选择符大全(常用css选择符)
- DynamipsGUI下CISCO SDM的安装配置
- 推荐分享一个自定义绑定控件(附源码)
- 对付ring0 inline hook
- 前端json转对象和数组
- 深圳房价三连跌,国内的房地产价格或将持续下跌,该持现金过冬了
- #10015. 「一本通 1.2 练习 2」扩散
- 房屋托管平台“朴邻”签约法大大,电子合同提升客户签约体验
- phaser3场景中的图片缩放scale
- 铜九铁路客运将于2008年9月1日正式开通
- Gallery3d 学习笔记(13)
- Android GPS 简介
- 记一次阿里云ECS服务器centos6.5无法使用epel源的爬坑
- minecraft 局域网联机问题一个可能的解决办法
- java通过http代理获取FTP的文件
- 叶君--国画大师笔下的“风韵传情”
热门文章
- 华中农大在土壤矿物-微生物相互作用研究方面取得新进展
- 一站式论文提升服务,助您顺利发文章!
- Nature:Gordon组采用甘露糖苷选择性抑制尿路致病性大肠杆菌
- QIIME 2用户文档. 12数据筛选Filtering data(2019.7)
- R语言list.dirs函数获取目录列表实战
- 使用神经网络做二分类,输出层需要几个神经元?应该选择哪一种激活函数?如果要处理minst数据、输出层需要几个神经元?使用那种激活函数?如果使用神经网络预测房价,输出层需要几个神经元、使用什么激活函数?
- python使用base64编码解码数据
- 语音识别、传统语音识别、带权有限转态转换器、深度语音识别、时序分类、CTC解码
- C++所提供的类模板应用(堆栈)
- 嵌入式linux设计报告,嵌入式linux课程设计报告