首先回到DefaultMQPushConsumerImpl  start方法

public synchronized void start() throws MQClientException {switch(this.serviceState) {case CREATE_JUST:this.log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()});this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch(this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());}this.consumeMessageService.start();boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);} else {this.mQClientFactory.start();this.log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;}default:this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();return;case RUNNING:case SHUTDOWN_ALREADY:case START_FAILED:throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);}}

这里主要分析

this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());

先看下consumeMessageService的构造函数

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;this.messageListener = messageListener;this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();this.consumeExecutor = new ThreadPoolExecutor(//线程池的常驻线程数:consumeThreadMinthis.defaultMQPushConsumer.getConsumeThreadMin(),//线程池的最大线程数:consumeThreadMaxthis.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,//线程池中的线程名:ConsumeMessageThread_new ThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));}

在这个构造函数中,new了一个名字叫consumeExecutor的线程池,在并发消费的模式下,这个线程池也就是消费消息的方式,我们先回到消息消费的入口处,我们上篇(一)中也提到了在回调函数中

//消费消息服务提交
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);

把拉取到的消息(默认为32条)提交到consumeMessageService中,进入submitConsumeRequest方法:

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//consumeExecutor : 消费端消费线程池this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} 

第一步:获取默认的处理大小,一直觉得这个字段的命名有点歧义,这个字段是用来处理消费端每次消费消息的条数,不是从broker端拉取过来的消息的条数
第二步:判断从broker拉取过来的消息是否大于consumeBatchSize,一般consumeBatchSize都设置为1,默认值也是1,下面直接去看else逻辑

for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}
}

把消息按照consumeBatchSize分组,组装成ConsumeRequest对象,提交到consumeExecutor线程池中,我们看下ConsumeRequest的run方法

public void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}

第一步:判断processQueue的dropped属性,这个属性在负载均衡中会处理,判断需不需要继续消费这个processQueue拉取到的消息

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;

第二步:拿到业务系统定义的消息监听listener

ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}

第三步:判断是否有钩子函数,执行before方法

//设置消息的重试主题,并开始消费消息,并返回该批次消息消费结果:
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;
}

第四步:调用resetRetryTopic方法设置消息的重试主题
第五步:执行listener.consumeMessage,业务系统具体去消费消息,如果消费成功那么返回status返回CONSUME_SUCCESS,如果有异常想重试,那么返回RECONSUME_LATER

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}

第六步:执行钩子函数after方法

//对消费结果的处理
if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

第七步:processConsumeResult来对消费结果进行处理,进入processConsumeResult方法

int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())return;
switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;
}

第八步:定义了ackIndex,这个值初始化等于Integer.MAX_VALUE,如果返回成功,那么ackIndex=消息数-1,如果返回失败ackIndex=-1

case CLUSTERING:
//集群模式下
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);//发送sendMessageBackboolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}
}
if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);//更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

第九步:集群模式下,判断ackIndex,如果等于-1,那么就要调用sendMessageBack方法,就是消息的ACK,所以在RocketMQ中,只有失败的消息才会ACK,这个方法是把消费失败的消息重新发送给broker,broker的处理逻辑就是根据重试次数依托定时消息机制来完成消息重试,broker在重试消息的时候会创建一个条新的消息,而不是用老的消息,如果到达一定的次数,那么进入死信队列,我在工作中会把即将进入死信队列的消息拿出来以json的格式放入mongodb中,通过界面的方法展示这些失败的消息,并在界面上继续提供重试的功能来处理这些失败的消息。如果重新发送失败,那么会延迟5s后重新消费。

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

第十步:不管是消费成功还是消费失败的消息,都会更新消费进度,首先从processQueue中移除所有消费成功的消息并返回offset,这里要注意一点,就是这个offset是processQueue中的msgTreeMap的最小的key,为什么要这样做呢,我的理解也是无奈之举,因为消费进度的推进是offset决定的,因为是线程池消费,不能保证先消费的是offset大的那条消息,所以推进消费进度只能取最小的那条消息的offset,这样在消费端重启的时候就可能会导致消息重复消费。

RocketMQ消息消费源码分析(二消息的消费)相关推荐

  1. 9. 源码分析之消息消费

    源码分析之消息消费 Rebalance(针对集群消费模式) (1)消费Group下的所有消费者 (2)Topic的所有Queue队列 (3)Queue分配策略 触发时机 (1)消费者启动 (2)消费者 ...

  2. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  3. 物联网协议之MQTT源码分析(二)

    此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦. juejin.im/post/5cd66 ...

  4. ENS最新合约源码分析二

    ENS(以太坊域名服务)智能合约源码分析二 0.简介 ​ 本次分享直接使用线上实际注册流程来分析最新注册以太坊域名的相关代码.本次主要分析最新的关于普通域名注册合约和普通域名迁移合约,短域名竞拍合约不 ...

  5. SpringBoot源码分析(二)之自动装配demo

    SpringBoot源码分析(二)之自动装配demo 文章目录 SpringBoot源码分析(二)之自动装配demo 前言 一.创建RedissonTemplate的Maven服务 二.创建测试服务 ...

  6. gSOAP 源码分析(二)

    gSOAP 源码分析(二) 2012-5-24 flyfish 一 gSOAP XML介绍 Xml的全称是EXtensible Markup Language.可扩展标记语言.仅仅是一个纯文本.适合用 ...

  7. Android Q 10.1 KeyMaster源码分析(二) - 各家方案的实现

    写在之前 这两篇文章是我2021年3月初看KeyMaster的笔记,本来打算等分析完KeyMaster和KeyStore以后再一起做成一系列贴出来,后来KeyStore的分析中断了,这一系列的文章就变 ...

  8. 【投屏】Scrcpy源码分析二(Client篇-连接阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  9. Nouveau源码分析(二):Nouveau结构体的基本框架

    Nouveau源码分析(二) 在讨论Nouveau对Nvidia设备的初始化前,我准备先说一下Nouveau结构体的基本框架 Nouveau的很多结构体都可以看作是C++中的类,之间有很多相似的东西, ...

最新文章

  1. Mac环境下配置Java开发环境(jdk+maven+tomcat+idea)
  2. Linux Centos6.5 SVN服务器搭建 以及客户端安装
  3. A 01 如何理解会计中的借和贷
  4. 如何利用阿里云安全产品加强你的网站防护能力
  5. 2018-2019-1 20165315 实验三 实时系统
  6. 【STM32】FreeRTOS创建和删除任务示例(动态方法)
  7. ES6语法大全 export,import,for.of循环,promise等等
  8. jmeter性能分析_使用JMeter和Yourkit进行REST / HTTP服务的性能分析
  9. 30分钟,让你成为一个更好的程序员
  10. 1163 最高的奖励(贪心+优先队列)
  11. 分享一款好看的城市选择器
  12. 语音情感识别--RNN
  13. Google Chrome源码剖析【序】
  14. SQL Server存储过程实例
  15. 微信APP支付开发步骤及要点
  16. jclasslib插件_IDEA安装jclasslib 插件查看字节码详解
  17. MacBook Pro 2017版(带multi-touch bar)安装使用 windows10
  18. 中国移动 烽火HG6543C5光猫 获取超级密码教程
  19. 【拓扑学知识】1.拓扑空间与度量拓扑
  20. Array.prototype.pop()

热门文章

  1. [HEOI2013] 钙铁锌硒维生素
  2. 第二届“中国制造(深圳)高峰论坛”举行
  3. matlab2018在图片上添加文字并保存且图片没有白边
  4. UG NX 10 草图之草图基准设置
  5. 【Lesson 4】 和弦的大小增减属
  6. 量子计算机亨通光电,在量子领域新成果:在半导体“自旋量子位元”(量子计算机的一种基本组件) 利用悉尼...
  7. c#语言中怎么样把文本转换成数字,如何将字符串转换为数字 - C# 编程指南 | Microsoft Docs...
  8. 基于fastdfs-zyc搭建分布式监控系统
  9. CCF-CSP 201912-2 回收站选址(python实现)
  10. 使用selenium进行模拟登录