消息拉取后存入ProcessQueue中,然后调用consumeMessageService.submitConsumeRequest方法通知消费服务进行消费。

这是个异步过程,使消息消费和消息拉取解耦。Rocketmq消息消费支持并发消费和顺序消费,这里先介绍并发消费。

并发消费逻辑是由ConsumeMessageConcurrentlyService服务处理的。

ConsumeMessageConcurrentlyService.submitConsumeRequest方法:

public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}

主要就是构造一个消费任务ConsumeRequest,提交给线程池处理,如果消息数超过一次消费批次大小(默认1),分批提交任务。

ConsumeRequest.run方法:

public void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}

大致分为三个步骤:

1.首先检查消息队列是否有效,如果无效则停止消费。

例如重新负载均衡后,原来的队列不属于当前消费者,则设置原队列无效。

if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}

2.回调消费方法(启动consumer时设置的)进行消费。

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

3.对消费结果进行处理

if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}。。。
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

如果消费后未返回消费结果,则消费结果被设置为RECONSUME_LATER,会重复消费

ConsumeMessageConcurrentlyService.processConsumeResult方法:

    public void     processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}

消费结果的处理逻辑为:

如果消费成功设置ackIndex=msgSize-1

如果消费失败设置ackIndex=-1

后面根据ackIndex将消息发送回broker,只处理集群消息,如果是广播消息不处理,该批消息有一个失败,全部需要发ACK

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}

如果发送失败则调用submitConsumeRequestLater,延迟5将任务重新放入线程池中

最后将消息从队列中移除,并更新消费进度。

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}

11. Consumer-并发消费相关推荐

  1. Apache Kafka-通过concurrency实现并发消费

    文章目录 概述 演示过程 Code POM依赖 配置文件 生产者 消费者 单元测试 测试结果 方式二 @KafkaListener 配置项 分布式下的concurrency 源码地址 概述 默认情况下 ...

  2. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  3. Kafka学习-----Kafka消费者Consumer:消费方式,分区分配策略,RangeRoundRobin

    目录 一.消费方式 二.消费者的分配模式 1.分配时机? 2.Range策略 2.RoundRobin 策略 三.代码解释 RangeAssignor: RoundRobinAssignor 一.消费 ...

  4. 96秒100亿!如何抗住双11高并发流量?

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"加群"加入公众号专属技术群 来源:uee.me/c9UsN 今年双 11 全民购物狂欢 ...

  5. WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)

    对于一般的多线程操作,比如异步地进行基于文件系统的IO操作:异步地调用Web Service:或者是异步地进行数据库访问等等,是和具体的线程无关的.也就是说,对于这些操作,任意创建一个新的线程来执行都 ...

  6. 第一百一十二期:96秒100亿!如何抗住双11高并发流量?

    今年双 11 全民购物狂欢节进入第十一个年头,1 分 36 秒,交易额冲到 100 亿 !比 2018 年快了近 30 秒,比 2017 年快了近 1 分半!这个速度再次刷新天猫双 11 成交总额破 ...

  7. kong网关从入门到精通_可能国内最好的网关开源项目,支持 Dubbo、SpringCloud,经历多年双11高并发的场景验证

    Soul 网关自从去年10月我开源以来,经历了一年的事情,接受到了来自社区很多朋友的建议,并进行持续不断的优化,已经提供了非常丰富的功能,很多功能都是高度自定义,可视化,高度可扩展的,现在做一个归纳总 ...

  8. kafka consumer 停止消费topic

    现象 在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令 ...

  9. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

最新文章

  1. 在Ubuntu 16.04.6 LTS上升级python 3.5到3.7.3实录
  2. 关于开源精神和抄袭问题
  3. rhel6下配置ftp服务器
  4. Codechef Chef Cuts Tree
  5. AD9834 DDS 使用经验
  6. Android 目录
  7. Java内部具有原子更新的动态热交换环境
  8. 入门C语言10问10答
  9. 2020考研数学一大纲之完全解析(四)
  10. ashampoo(阿香婆) movie studio视频剪辑笔记
  11. 时域分析特征参数的计算代码(Matlab和Qt两种)
  12. springboot实现统一日志管理
  13. 超算计算机需要显卡吗,NVIDIA:中国超算性能世界第一认了 但省电我最强
  14. 加拿大签证办理时解释信Explanation of Letter参考
  15. 简单通俗的说一下什么是面向过程和面向对象
  16. SYN攻击原理以及防范技术
  17. <Zhuuu_ZZ>那些年我们踩过的Hadoop HA的坑--高可用集群
  18. 计算机病毒按危害程度分类可分为,计算机病毒按其危害程度可分为
  19. 利用菜单配置文件生成菜单
  20. CSR8670 spi方式软件烧录方法

热门文章

  1. 3D休闲游戏夺宝向前冲3D游戏源码H5+安卓+IOS三端源码
  2. 初中英语多词性单词怎么办_初中英语词性转换大全:
  3. 【OpenCV】opencv处理透明图片
  4. 达梦数据库导入.dmp文件标准教程
  5. 自动识别地址省市区、手机号、姓名,淘宝的地址自动识别
  6. 【离散数学】计算主析取范式(基于真值表)
  7. AD域USB禁用详解
  8. 浪潮5280m4导轨安装_NF5280M4 – 导轨安装
  9. 【Java工具类】学会MVEL2.0,表达式解析再不怕
  10. 农民伯伯的题难倒千万清北学生——才怪