接着上文的Pull消费者启动继续讲。

public void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPullConsumer.changeInstanceNameToPID();}this.mQClientFactory =MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer,this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(//mQClientFactory,//this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPullConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();} else {switch (this.defaultMQPullConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore =new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore =new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup());break;default:break;}}this.offsetStore.load();boolean registerOK =mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group["+ this.defaultMQPullConsumer.getConsumerGroup()+ "] has been created before, specify another name please."+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);}mQClientFactory.start();log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//+ this.serviceState//+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);default:break;}
}

在前文的start()方法中,消费者已经通过checkConfig()和copySubscription()方法已经检查或复制了一部分配置类DefaultMQPullConsumer的属性,接下来会从MQClientMananger中尝试获取mq客户端实例。其实这一步,与生产者中获取客户端的实例一模一样,由于消费者和生产者的配置类都继承了clientConfig类,所以可以调用一个方法实现客户端的获取。自然,两者所实现的客户端其实一模一样,区别在于是否配置了rebalanceImpl等相关的配置。在就客户端而言上,其实消费者生产者的逻辑都是同一套。

接下来是对平衡接口实现的一系列配置,rebalanceImpl在DefaultMQClientImpl中一开始就初始化完毕,将这个消费者作为参数传入。

private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);

在pull消费者中,实现的是RebalancePullImpl继承自RebalanceImpl,在之前的copySubscription()方法中可以看到,在消费者配置类里所存放的topic信息在里面都被转化为SubscriptionData,作为键值对存放在rebalanceImpl下的map中。

接下来会将消费者的相关属性配置在rebalanceImpl中,比如消费者的consumerGroup(组名),消息模型(广播or集群,这里先默认采用集群),分配消息队列的策略(前文有解释,这里采用默认的平均分配策略),以及消费者客户端实例。

protected static final Logger log = ClientLogger.getLog();
protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable =new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;

以上是rebalanceImpl的结构,可以看到相关消费者启动的配置在这里都已经配置完毕。

接下来,建立pullAPIWrapper,这里调用了构造方法,只是简单的配置,在这里还没有具体的使用到。作为pull消费者,在pull具体的message的时候,正是要调用pullAPIWrapper的相关方法。

接下来将会初始化这个消费者的offsetStore,这里会根据选取的是广播模式还是消费者模式,来选取相关的策略。在广播模式下,所有的消费者都会收到所订阅的消息,那么显然,在这个模式下面的所有消费者都会将自己消费消费消息队列的进度保存在自己本地上。而在集群模式下,所有的消费者来平均消费消息,那么相应的,这里的消费进度将会保存在远程。所以,如果采用了广播模式,offset采用的是LocalFileOffsetStroe,对应的,集群模式采用的是,RemoteBrokerOffsetStore。

在向消费者客户端实例注册完毕当前的正处于启动过程的消费者之后,将会调用客户端MQClientInstance的start()方法来完成客户端的启动。可以看客户端的start()方法。

public void start() throws MQClientException {PackageConflictDetect.detectFastjson();synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}

客户端的启动代码与生产者客户端启动的是同一段代码。

在之前,与生产者一样,解析路由消息,完成路由消息的配置,启动netty客户端完成消息的发送,启动定时任务定时更新路由数据,定时发送心跳,调整线程数量大小,在前半部分过程与生产者一模一样,在前文生产者的启动中都已经详细解释过。我想,在pull模式下的消费者,应该更关心RebalanceService的启动。

可以看到RebalanceService的run()方法。

public void run() {log.info(this.getServiceName() + " service started");while (!this.isStoped()) {this.waitForRunning(WaitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");
}

只要客户端没有被关闭,那么将会一直循环调用客户端的doRebalance()方法。

public void doRebalance() {for (String group : this.consumerTable.keySet()) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null) {try {impl.doRebalance();}catch (Exception e) {log.error("doRebalance exception", e);}}}
}

在MQClientInstance的doRebalance方法里,将会循环调用每个consumerGroup下面的消费者的doRebalance()方法,用来试图达到平衡的目的。而doRebalance(0方法直接在DefaultMQPullConsumerImpl里实现。

public void doRebalance() {if (this.rebalanceImpl != null) {this.rebalanceImpl.doRebalance();}
}

直接调用了rebalanceImpl的doRebalance()方法。现在可以来仔细看rebalanceImpl的实现。

public void doRebalance() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic);} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();
}

首先根据所有在一开始根据topic所生成的订阅信息SubscriptionData都将会被遍历。分别调用reBalanceByTopic()根据topic来依次重新平衡负载。

private void rebalanceByTopic(final String topic) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",//consumerGroup,//topic,//mqSet,//mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(//this.consumerGroup, //this.mQClientFactory.getClientId(), //mqAll,//cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",strategy.getName(), e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);if (changed) {log.info("rebalanced allocate source. allocateMessageQueueStrategyName={}, group={}, topic={}, mqAllSize={}, cidAllSize={}, mqAll={}, cidAll={}",strategy.getName(), consumerGroup, topic, mqSet.size(), cidAll.size(), mqSet, cidAll);log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, ConsumerId={}, rebalanceSize={}, rebalanceMqSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(),allocateResultSet.size(), mqAll.size(), cidAll.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}
}

根据广播模式和集群模式都各有做法,先来看集群模式。

集群模式中,先通过topic得到topic对应的messageQueue数据,messageQueue的数据结构如下。

private String topic;
private String brokerName;
private int queueId;

保存着topic的Broker名以及具体的队列id。

之后根据topic来获取所有订阅这个topic的消费者。

接下来就会根据在之前配置的消息队列分配策略,调用分配策略的allocate()方法,完成消息队列的重新分配。(默认采用的是平均分配策略,具体分配策略的解释在上一篇)。

接下来调用updateProcessQueueTableInRebalance()方法来根据重新平衡的而结果来更新处理队列。

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet) {boolean changed = false;Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) {if (!mqSet.contains(mq)) {pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}}else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(new ProcessQueue());long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {pullRequest.setNextOffset(nextOffset);pullRequestList.add(pullRequest);changed = true;this.processQueueTable.put(mq, pullRequest.getProcessQueue());log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}this.dispatchPullRequest(pullRequestList);return changed;
}

首先,通过遍历所有消息队列与处理队列的对应关系,如果在新的分配之后,该消息队列已经不再是负责原来topic下的消息传送,那么这一对应关系将会被清除,这一消息队列的数据messageQueue也会被相应的从消费者的存储中remove掉。既然有旧的无用消息队列被清除,那自然有新的消息队列需要建立新的处理队列processQueue与其建立对应关系。在这里将会生成pullRequest来建立新的对应关系。并通过computePullFromWhere()得到下次拉取数据的位置,在RebalancePullImpl中具体实现了这一方法,直接返回0,来确认下一次拉取数据的位置。并将新的处理队列与对应的messageQueue放入map保存。

如果消息队列在这次rebalance的过程中发生了修改,那么则会调用messageQueueChanged()方法来处理相应的改变。具体实现在RebalancePullImpl中。

集群模式的队列rebalance就此结束。

在客户端启动完毕之后,PullConsumer也启动完毕。

RocketMQ源码解析-Consumer启动(2)相关推荐

  1. RocketMQ源码解析-Consumer启动(1)

    DefaultMQPullConsumer继承了ClientConfig类,作为主动拉获取消息的消费者实现接口的管理与相关属性的配置(与PushConsumer对应).相比生产者,消费者配置的属性要复 ...

  2. RocketMQ源码解析-Producer启动

    RocketMQ中生产者通过DefaultProducer来创建. protected final transient DefaultMQProducerImpl defaultMQProducerI ...

  3. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  4. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  5. RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】

    详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...

  6. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  7. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  8. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  9. 消息中间件RocketMQ源码解析-- --调试环境搭建

    1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...

最新文章

  1. 整理一下自己手撸的博客
  2. 增量式pid调节方式有何优点_增量式PID的“假抗饱和”性
  3. BZOJ3998:[TJOI2015]弦论——题解
  4. 04.MyBatis别名的设置和类型转换器
  5. 测试员不可不知的几款bug管理工具
  6. H264解码的一个測试程序
  7. Vue3+Cli4 中使用 Echarts 5
  8. 听说你不会用datetime处理时间?
  9. Thread中的静态代理
  10. 20155207 2016-2017-2 《Java程序设计》第七周学习总结
  11. java使用python爬虫,如何使用 Python 爬虫爬取 Java 题库?
  12. 使用OBS录屏神器,完美录制第二块屏幕。
  13. 迅速崛起 盘点2018年中国AI芯片“四小龙”
  14. linux开发板网口连接测试方法
  15. 云服务器需要芯片吗,什么时候手机不再需要芯片——计算云端化是不可逆转的未来...
  16. 几个常见的 Socket 连接错误及原因[转]
  17. kdj超卖_如何正确理解KDJ?
  18. 科学计数法 转换为数值
  19. RxJava过滤操作符
  20. KA算法:一种低复杂度的预编码/接收机设计思路

热门文章

  1. 获取iview中表单组件Table的选中数据
  2. echarts map 点击地图区域变色_pyecharts 地图可视化
  3. 数据结构(二)——堆
  4. 计算机控制考试试卷及答案,计算机控制系统A考试试卷带答案.doc
  5. 小学python还是c_python和c先学哪个
  6. Golang 学习笔记(08)—— 文件操作
  7. iOS 权限判断 跳转对应设置界面
  8. [20160606]windows下使用bbed的疑问.txt
  9. 【转】shell十三问,为linux学习打基础(上)
  10. Optimizing regular expressions in Java