rocketmq--push消费过程
public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("Jodie_topic_1023", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800consumer.setConsumeTimestamp("20170422221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);}}
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {log.warn("the consumer group[" + group + "] exist already.");return false;
}
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 1.判断NamesrvAddr是否为空,为空去远程http服务拉去地址if (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}// 2.开启通信服务this.mQClientAPIImpl.start();// 3.启动各种定时任务this.startScheduledTask();// 4.启动消息拉取服务,循环拉取阻塞队列pullRequestQueuethis.pullMessageService.start();// 5. 启动负载均衡服务this.rebalanceService.start();// 6.启动消息生产服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId()+ "] has been created before, and failed.", null);default:break;}}}
编号 | 任务 | 周期 | 启动时延 |
1 | 获取namesrv地址 | 每隔2分钟 | 0.01s |
2 | 更新路由信息 | 每隔3分钟 | 001s |
3 | 向所有broker发送心跳包,并清除无效broker | 每隔30s | 1s |
4 | 持久化消费位置offset | 每隔5s | 10s |
5 | 调整消费线程池大小 | 每隔1分钟 | 1min |
![](/assets/blank.gif)
例如当前有N个客户端同时消费一个topic下的消息队列(如上图),当前客户端( clientId = currentCId),经过负载均衡处理后得到分配给当前消费者的消息队列(如上图的qM、qN),之后将这些队列与processQueueTable中的队列进行比对分析,见下面第五点。
![](/assets/blank.gif)
@Overridepublic void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}}
executePullRequestImmediately的逻辑功能:
public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}
总之,最终会将负载均衡得到的队列存放到pullRequestQueue。
@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();if (pullRequest != null) {this.pullMessage(pullRequest);}} catch (InterruptedException e) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
调用DefaultMQPushConsumerImpl.pullMessage方法:
private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
pullMessage具体体拉流程如下图所示:
![](/assets/blank.gif)
下面对并发消费模型(concurrently)的消费代码进行展示: class ConsumeRequest implements Runnable ,其线程体方法如下:
![](/assets/blank.gif)
![](/assets/blank.gif)
@Overridepublic void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
View Code
consumerRequest逻辑:
processConsumeResult -- 对消费结果进行处理:
重试队列发消息逻辑:
//这里的 this 为一个 DefaultMQPushConsumerImpl 实例对象
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
...
pullRequest.setNextOffset(offset);
其中 computePullFromWhere采用的策略有如下三种(另外还有几个已经被弃用的(@Deprecated)):
CONSUME_FROM_LAST_OFFSET(默认): 一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
CONSUME_FROM_FIRST_OFFSET: 一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
CONSUME_FROM_TIMESTAMP: 一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
DefaultMQPushConsumer 中默认采用 CONSUME_FROM_LAST_OFFSET 这种方式,当然可以根据自己需要修改computePullFromWhere的策略
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
转载于:https://www.cnblogs.com/chenjunjie12321/p/7922362.html
rocketmq--push消费过程相关推荐
- rocketmq消费负载均衡--push消费为例
本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分 ...
- java rocketmq消费_rocketmq消费负载均衡--push消费详解
前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...
- rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...
- rocketmq 重复消费_消息队列 RocketMQ
引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...
- RocketMQ消息消费方式 推拉模式
RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息. consumer被分为2类:MQPullConsumer和MQPushConsumer ...
- RocketMQ 消息消费 轮询机制 PullRequestHoldService
1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥.首先需要补充一点消费相关的前置知识. 1.1 消息消费方式 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 ...
- rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?
上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...
- 电商场景下,如何处理消费过程中的重复消息?
摘要:比如一个消费订单消息,统计下单金额的微服务.若不正确处理重复消息,就会出现重复统计.那仅靠MQ能保证消息不重复吗? 本文分享自华为云社区<如何处理消费过程中的重复消息?>,作者:Ja ...
- [RocketMQ]消息中间件—RocketMQ消息消费(一)
2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...
- RocketMq之消费方式
一.如何选择消息消费的方式-Pull or Push? 1.1 MQ中Pull和Push的两种消费方式 对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费: (1)Pus ...
最新文章
- pandas中的3种基本数据类型介绍
- R行数据过滤基于dplyr包filter函数
- 为什么 React Elements 会有 $$typeof 这个属性?
- Python收发邮件
- Java纸牌拖拉机简单模拟
- 三篇文章了解 TiDB 技术内幕——说存储
- 电子工程可以报考二建_毕业证上财务管理专业,可以报考二建吗?
- php 从字符中随机挑一个数,php 对中文字符串的处理- 随机取出指定个数的汉字...
- php js 防止重复提交表单,php如何防止form重复提交
- 大数据平台应用开发的痛点有哪些
- 如何下载Visual Studio Code及配置教程
- PDF Checkpoint for mac(pdf文件批量处理工具)
- 阶段3 2.Spring_04.Spring的常用注解_2 常用IOC注解按照作用分类
- 哈萨克斯坦--出行攻略
- html小写罗马字符怎么写,如何在 LATEX 中插入大小写的罗马字符
- LM358电压跟随器
- 比特bit,字节Byte,带宽流量和流速的关系,存储容量单位
- 安装MySQL---已删除服务,却显示服务已存在
- 基因结构注释软件PASA安装全纪录
- c语言else if函数的使用方法,ELSE IF THEN 函数的用法