RocketMQ源码解析-PushConsumer(1)
PushConsumer的启动。
DefaultMQPushConusmer执行start()方法,然后直接调用DefaultMQPushConusmer的start()方法。
public void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}",this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(),this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}this.mQClientFactory =MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(//mQClientFactory,//this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();}else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore =new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore =new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}}this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this,(MessageListenerOrderly) this.getMessageListenerInner());}else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();boolean registerOK =mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group["+ this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please."+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//+ this.serviceState//+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();
}
首先还是与pullConsumer的启动一样,需要调用checkConfig()方法来确认Consumer的配置是否合法。主要还是确认消费者的消息模型(广播or集群),组名的合法与否,是否是顺序消费,平衡分配消息队列的策略等。
之后copySubscription()方法与pull一样,将订阅信息从DefaultMQPullConsumer当中复制在rebalanceImpl当中。
接下里仍旧是尝试获得消费者的客户端MQClientInstance实例的过程。
配置完毕reblanceImpl以及pullAPIWrapper之后,根据消费者属于广播还是集群,用以选择消费进度的存储方式是本地文件方式的存储还是远程存储在Broker当中。
以LocalFileOffsetStore为例子来说,是以Map的方式来存储各个消息队列的存储进度的。
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();@Override
public void load() throws MQClientException {OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",//this.groupName,//mq,//offset.get());}}
}
在接下来的load()方法中,LocalFileOffsetStore将会从本地路径读取本来已经储存的消费进度(json),保存在当前类当中。在后面的客户端实例也会运行一个定时任务来定时持久化各个消费队列的消费进度在本地的json文件当中。
之后会跟据配置的是否是顺序消费选择消费消息的方式。如果选择了顺序消费,那么将会以ConsumeMessageOrderlyService类作为consumeMessageService成员,并在接下来直接执行start()方法。
如果没有选择顺序消费,那么生成的ConsumeMessageConcur’rentlyService的start()方法并没有任何操作,而在顺序消费的情况下则会给相关的消费队列根据Broker给出的信息进行加锁和解锁。
public void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);}
}
在start()方法当中,在集群模式的情况下,将会在定时任务线程池丢入一个线程定时来执行lockMQPeriodically()来进行加锁的操作。
public synchronized void lockMQPeriodically() {if (!this.stopped) {this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}
}
真正锁的操作在defaultMQPushConsumerImpl的rebalanceImpl的lockAll()方法当中。
public void lockAll() {HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();final String brokerName = entry.getKey();final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty())continue;FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.setMqSet(mqs);try {Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup,mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}
}
在lockAll()方法当中,会遍历所有的BrokerName,从客户端根据brokerName获取相应的地址,将消费者组名,相应消费者的客户端id,以及该broker下面的消费者队列作为参数发送给Broker通过clientAPIInstance发送给broker获取需要加锁的消费队列作为结果返回。
而后根据得到的消费队列获取对应的processQueue,并依次加锁。接下来遍历该Broker下面的所有消费队列,对不需要加锁的消费队列解锁。这样的操作在定时任务线程池中每隔一秒会执行一次。
接下里继续回到defaultMQPushConsumerImpl的start()方法。
接下来会执行MQClientInstance的start()方法。
在Push消费者里,这里的start()方法很关键。
public void start() throws MQClientException {PackageConflictDetect.detectFastjson();synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
首先,如果一开始没有配置地址服务的地址,这里会主动去请求地址服务的地址,以及APIImpl的启动与netty客户端的启动。
private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();}catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();}catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();}catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();}catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();}catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);
}
接下来会起来五个定时任务。
可以分别来看。此时,如果client仍旧没有设置相关的地址服务地址,会每隔10秒去尝试获取一次地址服务的地址。接下来是定时更新路由数据,以及发送心跳数据的两个定时任务,这两个在前面的文章中都已经解释过。
接下来会每隔十秒会对每个消息队列的存储进度进行持久化,每隔十秒执行persistAllConsumer()方法。
private void persistAllConsumerOffset() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();impl.persistConsumerOffset();}
}
对所有的Consumer执行persistConsumerOffset(0方法。
@Override
public void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();if (allocateMq != null) {mqs.addAll(allocateMq);}this.offsetStore.persistAll(mqs);}catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup()+ " persistConsumerOffset exception", e);}
}
在offsetStore执行persistAll()方法,如果选择广播模式,则会将消息队列消费的进度持久化在本地。
@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (MessageQueue mq : this.offsetTable.keySet()) {if (mqs.contains(mq)) {AtomicLong offset = this.offsetTable.get(mq);offsetSerializeWrapper.getOffsetTable().put(mq, offset);}}String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {MixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}
}
在这里,将会将所有消费队列的消费进度存储在本地的json文件里。
如果是集群模式呢,将会相对复杂一点。
@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();long times = this.storeTimesTotal.getAndIncrement();if (mqs != null && !mqs.isEmpty()) {for (MessageQueue mq : this.offsetTable.keySet()) {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {if (mqs.contains(mq)) {try {this.updateConsumeOffsetToBroker(mq, offset.get());if ((times % 12) == 0) {log.info("Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", //this.groupName,//this.mQClientFactory.getClientId(),//mq, //offset.get());}} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} else {unusedMQ.add(mq);}}}}if (!unusedMQ.isEmpty()) {for (MessageQueue mq : unusedMQ) {this.offsetTable.remove(mq);log.info("remove unused mq, {}, {}", mq, this.groupName);}}
}
在这里,会将各个消息队列的消费进度更新到broker当中远程存储在broker上(需要的参数topic,消费组名,消费队列id,消费进度),并且每隔12次记录一次。在这之中,将没有消费进度的消息队列移除。
这就是消费队列的持久化。
在这之后,会每隔1分钟根据配置的拉取消息的进程进行数量调整。
接下来将是push消费者最核心的地方。
RocketMQ源码解析-PushConsumer(1)相关推荐
- RocketMQ源码解析-PushConsumer(2)
继续之前文章的内容. PushConsumer的启动已经到了mqClientInstance. public void start() throws MQClientException {Packag ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- 消息中间件RocketMQ源码解析-- --调试环境搭建
1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...
- RocketMQ源码解析-Consumer启动(1)
DefaultMQPullConsumer继承了ClientConfig类,作为主动拉获取消息的消费者实现接口的管理与相关属性的配置(与PushConsumer对应).相比生产者,消费者配置的属性要复 ...
- RocketMQ源码解析-Broker的HA实现
以master异步复制为例子. 在rocketmq的slave broker机子当中,会在DefaultMessageStore的启动当中启动自己的HaService来进行自己的ha服务. publi ...
最新文章
- 如何用PS软件取得色块的颜色值?
- java流程图什么代表活动_举例分析流程图与活动图的区别与联系
- Java Annotaion认识
- RejectedExecutionException 分析
- 使用jQuery的load()进行页面模块化加载
- 空间变换与计算_02_3x3矩阵
- 郝斌PHP视频教程,郝斌数据结构自学视频_IT教程网
- jq实现点击复制文本功能
- java 异或表示状态
- 四则运算当中的加号“+”有常见的三种用法
- OOM and SOF
- 全志D1-H裸奔工具XFEL
- java计算机毕业设计失物招领信息交互平台源代码+数据库+系统+lw文档
- 数据库实验报告【学会使用企业管理器和查询分析器管理工具】
- 四川一度智信:电商商家实效获取流量,不要花钱做无用功
- ECharts 修改背景格子线条的颜色
- python编写我的世界
- 测试管理工具列表大全
- 单数复数php单元格背景颜色,PHP-php 如何实现 英语单词 单复数 转换
- 电脑突然没声音了怎么办?
热门文章
- Python学习笔记之元组
- Linux查看网卡带宽
- cachestat、cachetop、pcstat-linux系统缓存命中率分析工具
- linux系统软件包依赖关系,Ubuntu解决包依赖关系
- java作业不能运行_从Windows运行时,YARN作业失败
- ContextLoaderListener的作用详解
- POJ 3154 Graveyard【多解,数论,贪心】
- [LintCode] 最长上升子序列
- java.lang.IncompatibleClassChangeError:
- vsftp 简易部署使用