11. Consumer-并发消费
消息拉取后存入ProcessQueue中,然后调用consumeMessageService.submitConsumeRequest方法通知消费服务进行消费。
这是个异步过程,使消息消费和消息拉取解耦。Rocketmq消息消费支持并发消费和顺序消费,这里先介绍并发消费。
并发消费逻辑是由ConsumeMessageConcurrentlyService服务处理的。
ConsumeMessageConcurrentlyService.submitConsumeRequest方法:
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}
主要就是构造一个消费任务ConsumeRequest,提交给线程池处理,如果消息数超过一次消费批次大小(默认1),分批提交任务。
ConsumeRequest.run方法:
public void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
大致分为三个步骤:
1.首先检查消息队列是否有效,如果无效则停止消费。
例如重新负载均衡后,原来的队列不属于当前消费者,则设置原队列无效。
if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}
2.回调消费方法(启动consumer时设置的)进行消费。
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
3.对消费结果进行处理
if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}。。。
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
如果消费后未返回消费结果,则消费结果被设置为RECONSUME_LATER,会重复消费
ConsumeMessageConcurrentlyService.processConsumeResult方法:
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}
消费结果的处理逻辑为:
如果消费成功设置ackIndex=msgSize-1
如果消费失败设置ackIndex=-1
后面根据ackIndex将消息发送回broker,只处理集群消息,如果是广播消息不处理,该批消息有一个失败,全部需要发ACK
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}
如果发送失败则调用submitConsumeRequestLater,延迟5将任务重新放入线程池中
最后将消息从队列中移除,并更新消费进度。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
11. Consumer-并发消费相关推荐
- Apache Kafka-通过concurrency实现并发消费
文章目录 概述 演示过程 Code POM依赖 配置文件 生产者 消费者 单元测试 测试结果 方式二 @KafkaListener 配置项 分布式下的concurrency 源码地址 概述 默认情况下 ...
- Kafka Consumer多线程消费
概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...
- Kafka学习-----Kafka消费者Consumer:消费方式,分区分配策略,RangeRoundRobin
目录 一.消费方式 二.消费者的分配模式 1.分配时机? 2.Range策略 2.RoundRobin 策略 三.代码解释 RangeAssignor: RoundRobinAssignor 一.消费 ...
- 96秒100亿!如何抗住双11高并发流量?
点击上方"朱小厮的博客",选择"设为星标" 后台回复"加群"加入公众号专属技术群 来源:uee.me/c9UsN 今年双 11 全民购物狂欢 ...
- WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)
对于一般的多线程操作,比如异步地进行基于文件系统的IO操作:异步地调用Web Service:或者是异步地进行数据库访问等等,是和具体的线程无关的.也就是说,对于这些操作,任意创建一个新的线程来执行都 ...
- 第一百一十二期:96秒100亿!如何抗住双11高并发流量?
今年双 11 全民购物狂欢节进入第十一个年头,1 分 36 秒,交易额冲到 100 亿 !比 2018 年快了近 30 秒,比 2017 年快了近 1 分半!这个速度再次刷新天猫双 11 成交总额破 ...
- kong网关从入门到精通_可能国内最好的网关开源项目,支持 Dubbo、SpringCloud,经历多年双11高并发的场景验证
Soul 网关自从去年10月我开源以来,经历了一年的事情,接受到了来自社区很多朋友的建议,并进行持续不断的优化,已经提供了非常丰富的功能,很多功能都是高度自定义,可视化,高度可扩展的,现在做一个归纳总 ...
- kafka consumer 停止消费topic
现象 在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令 ...
- kafka Java客户端之 consumer API 消费消息
背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...
最新文章
- 在Ubuntu 16.04.6 LTS上升级python 3.5到3.7.3实录
- 关于开源精神和抄袭问题
- rhel6下配置ftp服务器
- Codechef Chef Cuts Tree
- AD9834 DDS 使用经验
- Android 目录
- Java内部具有原子更新的动态热交换环境
- 入门C语言10问10答
- 2020考研数学一大纲之完全解析(四)
- ashampoo(阿香婆) movie studio视频剪辑笔记
- 时域分析特征参数的计算代码(Matlab和Qt两种)
- springboot实现统一日志管理
- 超算计算机需要显卡吗,NVIDIA:中国超算性能世界第一认了 但省电我最强
- 加拿大签证办理时解释信Explanation of Letter参考
- 简单通俗的说一下什么是面向过程和面向对象
- SYN攻击原理以及防范技术
- <Zhuuu_ZZ>那些年我们踩过的Hadoop HA的坑--高可用集群
- 计算机病毒按危害程度分类可分为,计算机病毒按其危害程度可分为
- 利用菜单配置文件生成菜单
- CSR8670 spi方式软件烧录方法