Rocketmq消费分为push和pull两种方式,push为被动消费类型,pull为主动消费类型,push方式最终还是会从broker中pull消息。不同于pull的是,push首先要注册消费监听器,当监听器处触发后才开始消费消息,所以被称为“被动”消费。
具体地,以pushConsumer的测试例子展开介绍,通常使用push消费的过程如下:
public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("Jodie_topic_1023", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800consumer.setConsumeTimestamp("20170422221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

上述过程背后设计到的点如下: 
I. checkConfig 检查内容:
1.消费组 -- (不能与默认DEFAULT_CONSUMER同名)
2.消费模型 -- (默认CLUSTERING)
3.从何处开始消费 -- (默认CONSUME_FROM_LAST_OFFSET)
4.消费时间戳 -- (消息回溯,默认Default backtracking consumption time Half an hour ago)
5.消费负载均衡策略 -- (默认AllocateMessageQueueAveragely)
6.订阅关系 --(map类型,即可订阅多个topic;key=Topic, value=订阅描述)
7.消费监听 --(必须为orderly or concurrently类型之一)
8.消费消息的线程数量控制 -- (消费线程池最大、最小数量)
9.检查单队列并行消费允许的最大跨度 --(consumeConcurrentlyMaxSpan)
10.检查拉消息本地队列缓存消息最大数 --(pullThresholdForQueue)(processQueue.getMsgCount()记数)
11.检查拉取时间间隔 --(拉消息间隔,由于是长轮询,所以默认为0)
12.检查批量消费的个数 --(一次消费多少条消息)
13.检查批量拉取消息的个数 --(一次最多拉多少条)
II. copySubscription:
将订阅信息设置到rebalanceImpl的map中用于负载。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到rebalanceImpl的map中用于负载。
III. 设置rebanlance信息
IV. 实例化pull消息的包装类型
V. 如果不存在offsetStore对象,实例化offsetStore
广播模式:
public class LocalFileOffsetStore implements OffsetStore {...}
注:load()函数体不为空
集群模式:
public class RemoteBrokerOffsetStore implements OffsetStore {...}
注:load()函数体为空
VI. 获取监听器,实例化consumeMessageService服务并启动
ConsumeMessageOrderlyService启动后会对拉取下来的消息进行处理。ConsumeMessageOrderlyService有两种类型:ConsumeMessageOrderlyService和ConsumeMessageConcurrentlyService。
1). 如果消息监听器是orderly类型,则创建ConsumeMessageOrderlyService实例
ConsumeMessageOrderlyService.start()只处理消息模式为CLUSTERING的消息消费。
public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);}}

线程启动后会每隔20s执行lockMQPeriodicallys(),lockMQPeriodicallys()会将消费的队列上锁,然后处理,具体过程,有机会单独成文分析。
2). 如果消息监听器是concurrently类型,则创建ConsumeMessageConcurrentlyService实例
ConsumeMessageConcurrentlyService.start()会定时清除过期消息 --> cleanExpireMsg()。
VII. 注册消费组
将group和consumer注册到MQClientInstance实例。
与生产者注册生产者组类似,一个客户端进程中一个consumerGroup只能有一个实例。
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {log.warn("the consumer group[" + group + "] exist already.");return false;
}

如果没有注册成功,则关闭消费服务,consumeMessageService.shutdown()。
VIII. 启动mQClientFactory及MQClientInstance
1). 获取client实例对象MQClientInstance -- getAndCreateMQClientInstance。一个进程只能产生一个MQClientInstance实例对象, 某个客户端的生产者与消费者共用这个实例对象。
2). 启动客户端实例的个各种服务:
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 1.判断NamesrvAddr是否为空,为空去远程http服务拉去地址if (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}// 2.开启通信服务this.mQClientAPIImpl.start();// 3.启动各种定时任务this.startScheduledTask();// 4.启动消息拉取服务,循环拉取阻塞队列pullRequestQueuethis.pullMessageService.start();// 5. 启动负载均衡服务this.rebalanceService.start();// 6.启动消息生产服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId()+ "] has been created before, and failed.", null);default:break;}}}

分析push消费的过程,需对上述过程第3点、第4点、第5点依次介绍。
第3点、启动各种定时任务过程:
 编号  任务 周期 启动时延 
 获取namesrv地址 每隔2分钟  0.01s
2  更新路由信息 每隔3分钟  001s
3  向所有broker发送心跳包,并清除无效broker   每隔30s  1s
4  持久化消费位置offset 每隔5s  10s
5  调整消费线程池大小 每隔1分钟  1min

注:编号3中,客户端会通过心跳消息,向broker注册消费信息。Broker收到该心跳消息,把它维护在一个叫做ConsumerManager的对象里面,为之后做消费的负载均衡提供数据,负载均衡在消费端做,消费端在负载均衡时首先要从broker那获取这份全局信息。
第4点 启动pullMessageService服务
初始化客户端实例时,创建PullMessageService服务对象。
this.pullMessageService = new PullMessageService(this),其中PullMessageService继承于ServiceThread,是一个线程对象。启动消息拉取服务线程后,在线程没有阻塞的情况下会不断地从循环阻塞队列pullRequestQueue拉取PullRequest对象,然后执行this.pullMessage(pullRequest)。
那么pullRequestQueue的数据如何put进去的?核心是doRebalance ,负载均衡具体细节可以参考:http://www.cnblogs.com/chenjunjie12321/p/7913323.html。

例如当前有N个客户端同时消费一个topic下的消息队列(如上图),当前客户端( clientId = currentCId),经过负载均衡处理后得到分配给当前消费者的消息队列(如上图的qM、qN),之后将这些队列与processQueueTable中的队列进行比对分析,见下面第五点。

第5点 RebalancePushImpl 负载均衡,分发pullRequest到pullRequestQueue。
负载均衡处理后得到分配给当前消费者的消息队列,然后将这些队列进行updateProcessQueueTableInRebalance 处理。updateProcessQueueTableInRebalance 的大致逻辑为如下 I、II 两步:

I. 首先检查当前RebalancePushImpl实例processQueueTable中与mqSet的包含关系
(1)如图中processQueueTable的灰色部分,表示与mqSet集合不互不包含的队列,对这些队列首先设置Dropped为true,然后看这些队列是否可以移除出processQueueTable--removeUnnecessaryMessageQueue,即每隔1s 看是否可以拿到当前队列的消费锁(tryLock()),拿到后返回true, 如果等待1s后仍然拿不到当前队列的消费锁则返回false,如果返回true则从processQueueTable移除对应的Entry<MessageQueue, ProcessQueue>;
(2) 如图中processQueueTable的白色部分,表示与mqSet集合的交集队列,对于这些队列,如果是消费类型是pull型,则不用管,如果是push型,看这些队列是否isPullExpired,如果是这些队列首先设置Dropped为true,则可以移除出processQueueTable--removeUnnecessaryMessageQueue。
II. 经过 I 处理,processQueueTable更新之后, 将processQueueTable集合与mqSet的的相对补集: processQueueTable(mq) - mqSet 里的消息队列依次封装成pullRequest,然后dispatchPullRequest到pullRequestQueue中。
经过上述处理后,待消费的队列放在了pullRequestList中,之后遍历pullRequestList,对遍历的每个队列进行消费,代码如下:
 @Overridepublic void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}}

executePullRequestImmediately的逻辑功能:

 public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}

总之,最终会将负载均衡得到的队列存放到pullRequestQueue。

回过来继续分析第4点,pullMessageService线程涉及到消费的核心过程DefaultMQPushConsumerImpl.pullMessage,pullMessageService线程线程体源码如下:
 @Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();if (pullRequest != null) {this.pullMessage(pullRequest);}} catch (InterruptedException e) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}

调用DefaultMQPushConsumerImpl.pullMessage方法:

 private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);} 

pullMessage具体体拉流程如下图所示:


下面对并发消费模型(concurrently)的消费代码进行展示:

class ConsumeRequest implements Runnable ,其线程体方法如下:

@Overridepublic void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}

View Code

consumerRequest逻辑:

processConsumeResult -- 对消费结果进行处理:

重试队列发消息逻辑:

生成一个重试队列,重试队列topic =  %RETRY% + consumerGroup的形式。
附:
值得注意的是每次消费pullRequest上的一条数据后上更新消费到达的 offset,然后将pullRequest.setNextOffset(offset);
//这里的 this 为一个 DefaultMQPushConsumerImpl 实例对象
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
...
pullRequest.setNextOffset(offset);

其中 computePullFromWhere采用的策略有如下三种(另外还有几个已经被弃用的(@Deprecated)):

CONSUME_FROM_LAST_OFFSET(默认): 一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
CONSUME_FROM_FIRST_OFFSET: 一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
CONSUME_FROM_TIMESTAMP: 一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。

DefaultMQPushConsumer 中默认采用 CONSUME_FROM_LAST_OFFSET 这种方式,当然可以根据自己需要修改computePullFromWhere的策略

private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

IX. updateTopicSubscribeInfoWhenSubscriptionChanged
X. sendHeartbeatToAllBrokerWithLock
XI. rebalanceImmediately

(完)

转载于:https://www.cnblogs.com/chenjunjie12321/p/7922362.html

rocketmq--push消费过程相关推荐

  1. rocketmq消费负载均衡--push消费为例

    本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分 ...

  2. java rocketmq消费_rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...

  3. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  4. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  5. RocketMQ消息消费方式 推拉模式

    RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息. consumer被分为2类:MQPullConsumer和MQPushConsumer ...

  6. RocketMQ 消息消费 轮询机制 PullRequestHoldService

    1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥.首先需要补充一点消费相关的前置知识. 1.1 消息消费方式 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 ...

  7. rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?

    上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...

  8. 电商场景下,如何处理消费过程中的重复消息?

    摘要:比如一个消费订单消息,统计下单金额的微服务.若不正确处理重复消息,就会出现重复统计.那仅靠MQ能保证消息不重复吗? 本文分享自华为云社区<如何处理消费过程中的重复消息?>,作者:Ja ...

  9. [RocketMQ]消息中间件—RocketMQ消息消费(一)

    2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...

  10. RocketMq之消费方式

    一.如何选择消息消费的方式-Pull or Push? 1.1 MQ中Pull和Push的两种消费方式 对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费: (1)Pus ...

最新文章

  1. pandas中的3种基本数据类型介绍
  2. R行数据过滤基于dplyr包filter函数
  3. 为什么 React Elements 会有 $$typeof 这个属性?
  4. Python收发邮件
  5. Java纸牌拖拉机简单模拟
  6. 三篇文章了解 TiDB 技术内幕——说存储
  7. 电子工程可以报考二建_毕业证上财务管理专业,可以报考二建吗?
  8. php 从字符中随机挑一个数,php 对中文字符串的处理- 随机取出指定个数的汉字...
  9. php js 防止重复提交表单,php如何防止form重复提交
  10. 大数据平台应用开发的痛点有哪些
  11. 如何下载Visual Studio Code及配置教程
  12. PDF Checkpoint for mac(pdf文件批量处理工具)
  13. 阶段3 2.Spring_04.Spring的常用注解_2 常用IOC注解按照作用分类
  14. 哈萨克斯坦--出行攻略
  15. html小写罗马字符怎么写,如何在 LATEX 中插入大小写的罗马字符
  16. LM358电压跟随器
  17. 比特bit,字节Byte,带宽流量和流速的关系,存储容量单位
  18. 安装MySQL---已删除服务,却显示服务已存在
  19. 基因结构注释软件PASA安装全纪录
  20. c语言else if函数的使用方法,ELSE IF THEN 函数的用法

热门文章

  1. Android 窗口全屏
  2. 关于 单窗口服务模型模拟 进行的小测试
  3. gdb对应vc调试命令
  4. aspxgridview将所选项导出ASPxGridViewExporter1
  5. Java 使用BigDecimal类处理高精度计算
  6. CentOS安装系统时硬盘分区建议
  7. HDU1712:ACboy needs your help(分组背包)
  8. 分布式团队中沟通引发的问题, itest 解决之道
  9. JAVA多线程之男朋友和女朋友之间的故事
  10. mysql备份的三种方式