精彩推荐

一百期Java面试题汇总

SpringBoot内容聚合

IntelliJ IDEA内容聚合

Mybatis内容聚合

接上一篇:RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象

一、问题描述

面试官:RocketMQ的Consumer是如何做的负载均衡?比如:5个Consumer进程同时消费一个Topic,这个Topic只有4个queue会出现啥情况?反之Consumer数量小于queue的数据是啥情况?

应聘者:一脸懵逼。

二、源码剖析

1、RebalancePushImpl

public class RebalancePushImpl extends RebalanceImpl {public RebalancePushImpl(String consumerGroup, MessageModel messageModel,AllocateMessageQueueStrategy allocateMessageQueueStrategy,MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {// 可以看到很简单,调用了父类RebalanceImpl的构造器super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;}

2、RebalanceImpl

public abstract class RebalanceImpl {// 很简单,就是初始化一些东西,关键在于下面的doRebalancepublic RebalanceImpl(String consumerGroup, MessageModel messageModel,AllocateMessageQueueStrategy allocateMessageQueueStrategy,MQClientInstance mQClientFactory) {this.consumerGroup = consumerGroup;this.messageModel = messageModel;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;this.mQClientFactory = mQClientFactory;}/*** 分配消息队列,命名抄袭spring,doXXX开始真正的业务逻辑** @param isOrder:是否是顺序消息 true:是;false:不是*/public void doRebalance(final boolean isOrder) {// 分配每个topic的消息队列Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 这个是关键了this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 移除未订阅的topic对应的消息队列this.truncateMessageQueueNotMyTopic();}
}

2.1、rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case CLUSTERING: {// 获取topic对应的队列和consumer信息,比如mqSet如下/*** 0 = {MessageQueue@2151} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=3]"* 1 = {MessageQueue@2152} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=0]"* 2 = {MessageQueue@2153} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=2]"* 3 = {MessageQueue@2154} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=1]"*/Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 所有的Consumer客户端cid,比如:172.16.20.246@7832List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();// 为什么要addAll到list里,因为他要排序mqAll.addAll(mqSet);// 排序消息队列和消费者数组,因为是在进行分配队列,排序后,各Client的顺序才能保持一致。Collections.sort(mqAll);Collections.sort(cidAll);// 默认选择的是org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyAllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;// 根据队列分配策略分配消息队列List<MessageQueue> allocateResult = null;try {// 这个才是要介绍的真正C位,strategy.allocate()allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {return;}}}}
}

3、AllocateMessageQueueAveragely

3.1、allocate

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {/*** 参数校验的代码我删了。*/List<MessageQueue> result = new ArrayList<MessageQueue>();/*** 第几个Consumer,这也是我们上面为什么要排序的重要原因之一。* Collections.sort(mqAll);* Collections.sort(cidAll);*/int index = cidAll.indexOf(currentCID);// 取模,多少消息队列无法平均分配 比如mqAll.size()是4,代表4个queue。cidAll.size()是5,代表一个consumer,那么mod就是4int mod = mqAll.size() % cidAll.size();// 平均分配// 4 <= 5 ? 1 : (4 > 0 && 1 < 4 ? 4 / 5 + 1 : 4 / 5)int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());// 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息队列。int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
}

3.2、解释

看着这算法凌乱的很,太复杂了!说实话,确实挺复杂,蛮罗嗦的,但是代数法可以得到如下表格:

假设4个queue Consumer有2个 可以整除 Consumer有3个 不可整除 Consumer有5个 无法都分配
queue[0] Consumer[0] Consumer[0] Consumer[0]
queue[1] Consumer[0] Consumer[0] Consumer[1]
queue[2] Consumer[1] Consumer[1] Consumer[2]
queue[3] Consumer[1] Consumer[2] Consumer[3]

所以得出如下真香定律(也是回击面试官的最佳答案):

  • queue个数大于Consumer个数,且queue个数能整除Consumer个数的话, 那么Consumer会平均分配queue。(比如上面表格的Consumer有2个 可以整除部分)

  • queue个数大于Consumer个数,且queue个数不能整除Consumer个数的话, 那么会有一个Consumer多消费1个queue,其余Consumer平均分配。(比如上面表格的Consumer有3个 不可整除部分)

  • queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue上。(比如上面表格的Consumer有5个 无法都分配部分)

4、补充

queue选择算法也就是负载均衡算法有很多种可选择:

  • AllocateMessageQueueAveragely:是前面讲的默认方式

  • AllocateMessageQueueAveragelyByCircle:每个消费者依次消费一个partition,环状。

  • AllocateMessageQueueConsistentHash:一致性hash算法

  • AllocateMachineRoomNearby:就近元则,离的近的消费

  • AllocateMessageQueueByConfig:是通过配置的方式

三、何时Rebalance

那就得从Consumer启动的源码开始看起,先看Consumer的启动方法start()

public class DefaultMQPushConsumerImpl implements MQConsumerInner {private MQClientInstance mQClientFactory;// 启动Consumer的入口函数public synchronized void start() throws MQClientException {this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);// 调用MQClientInstance的start方法,追进去看看。mQClientFactory.start();}
}

看看mQClientFactory.start();都干了什么

public class MQClientInstance {private final RebalanceService rebalanceService;public void start() throws MQClientException {synchronized (this) {// 调用RebalanceService的start方法,别慌,继续追进去看看this.rebalanceService.start();}}
}

看看rebalanceService.start();都干了什么,先看下他的父类ServiceThread

/** 首先可以发现他是个线程的任务,实现了Runnable接口* 其次发现上步调用的start方法居然就是thread.start(),那就相当于调用了RebalanceService的run方法*/
public abstract class ServiceThread implements Runnable {public void start() {this.thread = new Thread(this, getServiceName());this.thread.setDaemon(isDaemon);this.thread.start();}
}

最后来看看RebalanceService.run()

public class RebalanceService extends ServiceThread {/*** 等待时间的间隔,毫秒,默认是20s*/private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));@Overridepublic void run() {while (!this.isStopped()) {// 等待20s,然后超时自动释放锁执行doRebalancethis.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}}
}

到这里真相大白了。

当一个consumer出现宕机后,默认最多20s,其它机器将重新消费已宕机的机器消费的queue,同样当有新的Consumer连接上后,20s内也会完成rebalance使得新的Consumer有机会消费queue里的msg。

等等,好像有问题:新上线一个Consumer要等20s才能负载均衡?这不是搞笑呢吗?肯定有猫腻。

确实,新启动Consumer的话会立即唤醒沉睡的线程, 让他立马进行this.mqClientFactory.doRebalance();,源码如下

public class DefaultMQPushConsumerImpl implements MQConsumerInner {// 启动Consumer的入口函数public synchronized void start() throws MQClientException {        // 看到了没!!!, 见名知意,立即rebalance负载均衡this.mQClientFactory.rebalanceImmediately();}
}

END

我知道你 “在看”

从入门到入土(八)RocketMQ的Consumer是如何做的负载均衡的相关推荐

  1. 【RocketMQ】消息的高可用与负载均衡

    消息生产的高可用机制 在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一 ...

  2. (八)企业部分之nginx+tomcat+memcached负载均衡集群搭建

    [server1] vim /usr/local/lnmp/tomcat/conf/context.xml <Context> ...... <Manager className=& ...

  3. 2018最新Nginx从入门到精通5阶段视频教程附代码(Http代理+负载均衡+静动分离)

    课程介绍 系统讲解Nginx ,课程通用性非常高,几乎所有与后台相关的技术人员都会用到,了解Nginx 的人很多,但真正掌握Nginx 的却很少,系统掌握Nginx配置与快速搭建高可用架构的技术方法. ...

  4. nginx一篇入门:安装、静态网站部署、反向代理、负载均衡

    前言: 本文章的nginx和tomcat是在Linux中,使用docker来安装和讲解 本人刚学完nginx,如有不对地方,欢迎指正 目录 ⼀.Nginx的安装与启动 1.什么是Nginx Nginx ...

  5. 从入门到入土(九)手摸手教你搭建RocketMQ双主双从同步集群,不信学不会!...

    精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:从入门到入土(八)RocketMQ的Consumer是如何做的负 ...

  6. RocketMQ入门到入土(四)producer生产消息源码剖析

    精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:从入门到入土(三)RocketMQ 怎么保证的消息不丢失? 篇幅 ...

  7. RocketMQ消息存储、刷盘、负载均衡

    消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分. 消息存储总体架构 消息存储架构图: minOffset:当前队列的最小消息偏移量,如果消费时指定从最早消费,就是从该偏移量消费. m ...

  8. rocketmq消费负载均衡--push消费为例

    本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分 ...

  9. java rocketmq消费_rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...

最新文章

  1. STM32 RS485 和串口 只能接收不能发送问题解决
  2. Java-接口的定义与实现
  3. 装有支付宝的手机丢了可咋办
  4. java 指令接口架构,JavaSE 基础大纲
  5. Apache OpenNLP提供的文档
  6. ctags 的最简单使用
  7. ajax显示dataframe,如何使用ajax在运行时显示shell脚本的输出
  8. VBA抓取双色球、大乐透开奖数据
  9. 大麦DW33D路由器假死
  10. 基于遗传算法的柔性车间调度优化研究附Matlab代码
  11. Eclipse 输入的汉字都变成了繁体
  12. 4GL+T100程序设计开发常用方法、技巧总结
  13. 红楼梦人物分析系统c语言,Gephi分析红楼梦
  14. 梳妆谐振器的matlab实现,基于HOPF振荡器的CPG单元模型matlab实现
  15. 网站分析高级细分六脉神剑之第五脉
  16. react-emotion_如何使用Web Speech API和Node.js构建语音转Emotion Converter
  17. java vips_Java IConfigManager.getAllVIPs方法代碼示例
  18. JPA的Repository详解
  19. 「Charles 应用」通过 Charles 分析社区话题功能
  20. 如何使用Fresco

热门文章

  1. 中国联通公布8月运营数据出炉 4G用户大幅度提升
  2. iPhone 11外壳保护套曝光:噢,这个浓厚的老干部风格
  3. 乐视网:公司董事、总经理、财务总监张巍因个人原因辞职
  4. 神马专车喊话特斯拉:修不修、赔不赔、认不认?
  5. 晨哥真有料丨这样的你很掉价!
  6. go 访问数据库mysql基础
  7. freetype在Linux平台编译小记
  8. mysql用的sql标准_标准SQL语言的用法_MySQL
  9. 【Clickhouse】Clickhouse Cannot create table with column ‘Int256‘ because experimental bigint types
  10. 「kafka」kafka增加主分区