本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点。本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。
介绍之前首先抛出几个问题:
1. 要做负载均衡,首先要解决的一个问题是什么?
2. 负载均衡是Client端处理还是Broker端处理?
个人理解:
1. 要做负载均衡,首先要做的就是信号收集。
所谓信号收集,就是得知道每一个consumerGroup有哪些consumer,对应的topic是谁。信号收集分为Client端信号收集与Broker端信号收集两个部分。
2. 负载均衡放在Client端处理。
具体做法是:消费者客户端在启动时完善rebalanceImpl实例,同时拷贝订阅信息存放rebalanceImpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有Broker,注册RegisterConsumer,等待上述过程准备好之后在Client端不断执行的负载均衡服务线程从Broker端获取一份全局信息(该consumerGroup下所有的消费Client),然后分配这些全局信息,获取当前客户端分配到的消费队列。
本文具体的内容:

I. copySubscription
Client端信号收集,拷贝订阅信息。

在DefaultMQPushConsumerImpl.start()时,会将消费者的topic订阅关系设置到rebalanceImpl的SubscriptionInner的map中用于负载:

private void copySubscription() throws MQClientException {try {//注:一个consumer对象可以订阅多个topicMap<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                                topic, subString);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:break;case CLUSTERING:final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                            retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}}catch (Exception e) {throw new MQClientException("subscription exception", e);}}

FilterAPI.buildSubscriptionData接口将订阅关系转换为SubscriptionData 数据,其中subString包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。
II. 完善rebalanceImpl实例
Client继续收集信息:
 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

本文以DefaultMQPushConsumerImpl为例,因此this对象类型为DefaultMQPushConsumerImp。
III. this.rebalanceService.start()
开启负载均衡服务。this.rebalanceService是一个RebalanceService实例对象,它继承与ServiceThread,是一个线程类。 this.rebalanceService.start()执行时,也即执行RebalanceService线程体:
   @Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStoped()) {this.waitForRunning(WaitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}

IV. this.mqClientFactory.doRebalance
客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:
 public void doRebalance() {for (String group : this.consumerTable.keySet()) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null) {try {impl.doRebalance();} catch (Exception e) {log.error("doRebalance exception", e);}}}}

V. MQConsumerInner.doRebalance
由于本文以DefaultMQPushConsumerImpl消费过程为例,即DefaultMQPushConsumerImpl.doRebalance:
@Overridepublic void doRebalance() {if (this.rebalanceImpl != null) {this.rebalanceImpl.doRebalance();}}

步骤II 中完善了rebalanceImpl实例,为调用rebalanceImpl.doRebalance()提供了初始数据。
rebalanceImpl.doRebalance()过程如下:
public void doRebalance() {// 前文copySubscription中初始化了SubscriptionInnerMap<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic);} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}

VI. rebalanceByTopic -- 核心步骤之一
rebalanceByTopic方法中根据消费者的消费类型为BROADCASTING或CLUSTERING做不同的逻辑处理。CLUSTERING逻辑包括BROADCASTING逻辑,本部分只介绍集群消费负载均衡的逻辑。
集群消费负载均衡逻辑主要代码如下(省略了log等代码):

//1.从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2. 从broker端获取消费该消费组的所有客户端clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);// 3.创建DefaultMQPushConsumer对象时默认设置为AllocateMessageQueueAveragelyAllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 4.调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,cidAll);} catch (Throwable e) {return;}// 5. 将分配得到的allocateResult 中的队列放入allocateResultSet 集合Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}
、//6. 更新updateProcessQueueboolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);if (changed) {this.messageQueueChanged(topic, mqSet, allocateResultSet);}
}

注:BROADCASTING逻辑只包含上述的1、6。
集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:
第1点:从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
第2点: 从broker端获取消费该消费组的所有客户端clientId
首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端ChannelInfoTable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:

第4点:调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列
注:上图中cId1、cId2、...、cIdN通过 getConsumerIdListByGroup 获取,它们在这个ConsumerGroup下所有在线客户端列表中。
当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。

转载于:https://www.cnblogs.com/chenjunjie12321/p/7913323.html

rocketmq消费负载均衡--push消费为例相关推荐

  1. java rocketmq消费_rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...

  2. RocketMQ的负载均衡

    在了解了RocketMQ的发送与接收后,也好奇RocketMQ内部是如何处理好生产端.消费端的负载均衡的,下面通过分析源码.查阅相关文档资料以及结合自己的理解,做了下归纳总结. RocketMQ的消息 ...

  3. RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash

    RocketMQ 提供了一致性hash 算法来做Consumer 和 MessageQueue的负载均衡. 源码中一致性hash 环的实现是很优秀的,我们一步一步分析. 一个Hash环包含多个节点, ...

  4. RocketMQ 消息负载均衡策略解析——图解、源码级解析

  5. 消费者广播模式和负载均衡模式

    消费消息 1)负载均衡模式 消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同 2)广播模式 消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的 packa ...

  6. 【2】SCN-Ribbon负载均衡

    [2]SCN-Ribbon负载均衡 关键词 消费客户端负载均衡: Ribbon,利⽤从Eureka获取服务列表 负载策略:区域权衡策略,轮询,随机,重试,最小连接数,可用过滤策略 原理:Ribbon给 ...

  7. Nginx反向代理和负载均衡部署指南

    nginx不单能够作为强大的webserver,也能够作为一个反向代理server,并且nginx还能够依照调度规则实现动态.静态页面的分离.能够依照轮询.ip哈希.URL哈希.权重等多种方式对后端s ...

  8. LoadBalance自定义负载均衡策略

    LoadBalance已有策略   LoadBalance的源码中已有两种策略,RandomLoadBalancer(随机).RoundRobinLoadBalancer(轮询,默认的负载均衡策略). ...

  9. RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

    文章目录 前言 流程解析 总结 前言 在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的. PullMes ...

最新文章

  1. css选择器匹配没有属性x的元素[重复]
  2. 富文本NSMutableAttributedString用法大全
  3. 全排列—leetcode46
  4. 1057 数零壹(PAT乙级 C++实现)
  5. 视频理解新方向:时域语言定位 综述
  6. 如何自动填充网页表单_流量型称重型充绒机充棉机如何选择,教你轻松辨别全自动填充机器...
  7. 线程安全的set_「Java」 - 多线程四 amp; ThreadLocal
  8. 二十年后的回眸(1)——报到上班
  9. C语言小项目——动手打造属于自己的C语言IDE
  10. 安装服务器系统提示没有驱动程序,安装windows server系统提示“无法在此驱动器上安装windows”的解决办法 安装Win...
  11. 取自开源,分享于开源 —— 利用CVE-2017-8890漏洞ROOT天猫魔屏A1
  12. Unity TalkingData接入
  13. DPDK-实战之load_balancer(负载均衡)
  14. 2019年电赛准备程序STM32版本
  15. vue 实现电子签名功能 支持生成图片
  16. 森林的先序和中序遍历
  17. MYSQL加密解密 中文、数字 AES_ENCRYPT,AES_DECRYPT
  18. [树形dp] Jzoj P5906 传送门
  19. 计算机一打开就卡在更新失败,电脑卡在配置Windows Update失败界面怎么解决
  20. 网页端视频倍速修改方法

热门文章

  1. 等价类划分方法的应用
  2. 一些有关计组实验中Quartus中的名词或术语的解释
  3. ubuntu 18.04设置系统自带系统截图快捷键
  4. asp.net程序调用NTFS分区下Oracle客户端
  5. 重写equals所要遵守的约定
  6. 蓝桥杯C/C++ 带分数
  7. DBA巡检常用的SQL语句
  8. JSON和JSONP【JS+AJAX跨域原理和实现】
  9. BT项目的运作之一项目建设方案与BT总包方的选择
  10. 成功修改fastreport3.15,使其支持中日韩四(CJK)中编码PDF的导出