消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。消费者组对于消息消费的模
式又分为两种:集群消费Clustering和广播消费Broadcasting。

获取消费类型

拉取式消费

Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过
程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。

由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比
例会增加;间隔太长,消息的实时性太差

推送式消费

该模式下Broker收到数据后会主动推送给Consumer。该获取方式一般实时性较高。
该获取方式是典型的发布-订阅模式,即Consumer向其关联的Queue注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是Consumer去Queue中拉取消息。而这些都是基于Consumer与Broker间的长连接的。长连接的维护是需要消耗系统资源的。

对比

  • pull:需要应用去实现对关联Queue的遍历,实时性差;但便于应用控制消息的拉取
  • push:封装了对关联Queue的遍历,实时性强,但会占用较多的系统资源

消费模式

广播消费

广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条
消息都会被发送到Consumer Group中的每个Consumer。

集群消费

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消
息只会被发送到Consumer Group中的某个Consumer。

消息进度保存

  • 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会
    消费所有消息,但它们的消费进度是不同。所以consumer各自保存各自的消费进度。
  • 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic
    中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是
    需要共享的。下图是broker中存放的各个Topic的各个Queue的消费进度。

Rebalance机制

Rebalance机制讨论的前提是:集群消费。

什么是Rebalance

Rebalance即再均衡,指的是,将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个
Consumer间进行重新分配的过程。

Rebalance机制的本意是为了提升消息的并行消费能力。例如,⼀个Topic下5个队列,在只有1个消费
者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加⼀个消费者,那么就可以给
其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。

Rebalance限制

由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,
多余的消费者实例将分配不到任何队列。

Rebalance危害

Rebalance的在提升消费能力的同时,也带来一些问题:

  • 消费暂停:在只有一个Consumer时,其负责消费所有队列;在新增了一个Consumer后会触发
    Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。
  • 消费重复:Consumer 在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。

同步提交:consumer提交了其消费完毕的一批消息的offset给broker后,需要等待broker的成功
ACK。当收到ACK后,consumer才会继续获取并消费下一批消息。在等待ACK期间,consumer
是阻塞的。
异步提交:consumer提交了其消费完毕的一批消息的offset给broker后,不需要等待broker的成
功ACK。consumer可以直接获取并消费下一批消息。
对于一次性读取消息的数量,需要根据具体业务场景选择一个相对均衡的是很有必要的。因为
数量过大,系统性能提升了,但产生重复消费的消息数量可能会增加;数量过小,系统性能会
下降,但被重复消费的消息数量可能会减少。

  • 消费突刺:由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。

Rebalance产生的原因

导致Rebalance产生的原因,无非就两个:消费者所订阅Topic的Queue数量发生变化,或消费者组中消
费者的数量发生变化。

1)Queue数量发生变化的场景:
Broker扩容或缩容
Broker升级运维
Broker与NameServer间的网络异常
Queue扩容或缩容
2)消费者数量发生变化的场景:
Consumer Group扩容或缩容
Consumer升级运维
Consumer与NameServer间网络异常

Rebalance过程

在Broker中维护着多个Map集合,这些集合中动态存放着当前Topic中Queue的信息、Consumer Group
中Consumer实例的信息。一旦发现消费者所订阅的Queue数量发生变化,或消费者组中消费者的数量
发生变化,立即向Consumer Group中的每个实例发出Rebalance通知

TopicCon

【RocketMQ工作原理】消息的消费相关推荐

  1. rocketMQ —— 02(集群搭建、rocketmq工作原理)

    目录标题 一.相关推荐 二.基本架构图: 三.集群模式 1.单Master模式(这种单节点的理论上不叫集群) 2.多Master模式 3.多Master多Slave模式(异步) 4.多Master多S ...

  2. RocketMQ如何保证消息顺序消费?又为何不解决消息重复消费问题?

    消息的顺序消费对于业务系统来说非常重要,一笔订单产生了3条消息,分别是订单创建.订单付款.订单完成.消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的. 如何保证消息顺序消费? ...

  3. 【RocketMQ工作原理】消息堆积与消费延迟

    概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来 越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟. ...

  4. 分布式消息队列RocketMQ工作原理与应用(一)

    第 1 章 RocketMQ概述 一.MQ概述 1 .MQ简介 MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产.存储.消费全过程API的软件系 ...

  5. 【RocketMQ工作原理】消息的清理

    消息被消费过后会被清理掉吗?不会的. 消息是被顺序存储在commitlog文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以commitlog文件为单位进行清理的.否则会急 ...

  6. 【RocketMQ工作原理】消息的存储

    RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中. abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失.若在没有 ...

  7. 【RocketMQ工作原理】消息的生产过程

    1 消息的生产过程 Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程: Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求 ...

  8. RocketMQ工作原理 高级功能介绍

    1.1 消息存储 分布式队列因为有高可靠性的要求,所以数据要进行持久化存储. 消息生成者发送消息 MQ收到消息,将消息进行持久化,在存储中新增一条记录 返回ACK给生产者 MQ push 消息给对应的 ...

  9. 【RocketMQ工作原理】

    什么是消费幂等 当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消 费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的. 幂等:若某操作执行多次与 ...

  10. 【RocketMQ工作原理】offset管理

    这里的offset指的是Consumer的消费进度offset 消费进度offset是用来记录每个Queue的不同消费组的消费进度的.根据消费进度记录器的不同,可以 分为两种模式:本地模式和远程模式. ...

最新文章

  1. Spring Roo 2 使用分析
  2. 已经到了快元旦,可是总是不自在
  3. Android下载文件
  4. 【ArcGIS遇上Python】ArcGIS Python获取Shapefile矢量数据字段名称
  5. CSS--选择符大全(常用css选择符)
  6. DynamipsGUI下CISCO SDM的安装配置
  7. 推荐分享一个自定义绑定控件(附源码)
  8. 对付ring0 inline hook
  9. 前端json转对象和数组
  10. 深圳房价三连跌,国内的房地产价格或将持续下跌,该持现金过冬了
  11. #10015. 「一本通 1.2 练习 2」扩散
  12. 房屋托管平台“朴邻”签约法大大,电子合同提升客户签约体验
  13. phaser3场景中的图片缩放scale
  14. 铜九铁路客运将于2008年9月1日正式开通
  15. Gallery3d 学习笔记(13)
  16. Android GPS 简介
  17. 记一次阿里云ECS服务器centos6.5无法使用epel源的爬坑
  18. minecraft 局域网联机问题一个可能的解决办法
  19. java通过http代理获取FTP的文件
  20. 叶君--国画大师笔下的“风韵传情”

热门文章

  1. 华中农大在土壤矿物-微生物相互作用研究方面取得新进展
  2. 一站式论文提升服务,助您顺利发文章!
  3. Nature:Gordon组采用甘露糖苷选择性抑制尿路致病性大肠杆菌
  4. QIIME 2用户文档. 12数据筛选Filtering data(2019.7)
  5. R语言list.dirs函数获取目录列表实战
  6. 使用神经网络做二分类,输出层需要几个神经元?应该选择哪一种激活函数?如果要处理minst数据、输出层需要几个神经元?使用那种激活函数?如果使用神经网络预测房价,输出层需要几个神经元、使用什么激活函数?
  7. python使用base64编码解码数据
  8. 语音识别、传统语音识别、带权有限转态转换器、深度语音识别、时序分类、CTC解码
  9. C++所提供的类模板应用(堆栈)
  10. 嵌入式linux设计报告,嵌入式linux课程设计报告