RocketMQ源码解析-Consumer启动(1)
DefaultMQPullConsumer继承了ClientConfig类,作为主动拉获取消息的消费者实现接口的管理与相关属性的配置(与PushConsumer对应)。相比生产者,消费者配置的属性要复杂得多。由于在RocketMQ中,生产者消费者共用一个客户端实现类MQClient,所以在前文中没有解析的方法是属于服务于消费者的。首先以主动拉取方式获得消息的消费者PullConsumer为例子。
在DefaultMQPullConsumer的构造方法中,实现的跟生产者一样简单,只是简单的配置了传入了的ConsumerGroup用以确认该消费者究竟是处于哪个消费者组名,然后调用DefaultMQPullConsumerImpl的构造方法。而DefaultMQPullConsumerImpl的构造方法也是非常简单,只是配置了调用他的消费者配置类,以及传进来的rpcHook。
既然DefaultMQPullConsumer在整个过程中充当着配置者的角色,那么显然可以直接在这里配置消费者相关的订阅的topic,维护着一个set用来存放订阅的topic。
private Set<String> registerTopics = new HashSet<String>();
整个消费者的启动由调用DefaultMQPullConsumer的start()方法开始。而在其中的start()也只是简单的调用了DefaultMQPullConsumerImpl的start(),其他全无任何额外操作,那么就可以直接从DefaultMQPullConsumerImpl的start()方法开始看起。
public void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPullConsumer.changeInstanceNameToPID();}this.mQClientFactory =MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer,this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(//mQClientFactory,//this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPullConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();} else {switch (this.defaultMQPullConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore =new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore =new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup());break;default:break;}}this.offsetStore.load();boolean registerOK =mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group["+ this.defaultMQPullConsumer.getConsumerGroup()+ "] has been created before, specify another name please."+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);}mQClientFactory.start();log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//+ this.serviceState//+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);default:break;}
}
一开始DefaultMQPullConsumerImpl的状态量就是CREAT_JUST,所以进入下面的开始阶段。
在checkConfig()方法里先是简单的对消费者的ConsumerGroup进行检查,防止为空或者非法,接下来也会防止与默认的消费者集群名冲突。接下来将要配置消费者的通知方式(MessageModel),在MQ的消费者当中,通知方式分为BROADCASTING(广播模式),以及CLUSTERINT(集群模式),默认为集群模式配置在DefaultMQPullConsumer,接下来先以集群模式为例子接下去消费者的启动。
消费者的消息队列分配策略
然后将会对消息队列分配策略进行检查。在DefaultMQPullConsumer中实现有默认的消息队列分配策略(Average平均分配策略),消息队列分配策略都实现了AllocateMessageQueneStrategy接口实现了相应的allocate()方法。以默认的平均分配方式为例子来看他的allocat()方法。
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //consumerGroup, //currentCID,//cidAll);return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;
}
在平均分配消息队列的方式下,传入的参数有当前消费者id,当前所有消费者数组,以及所有消息队列数组。在平均分配的策略下,将会根据消费者数组的大小以及消息队列数组的大小,以及该生产者在消费者数组的位置确定具体的生产者获取该消费者数组的哪一部分消费者队列。
而RocketMQ给出的分配策略还有循环平均分配,按照配置分配,按照机房分配等分配方式,这里采用默认的平均分配。
在检查完上述的消费者集群名,消息通知方式,消息队列分配方式之后,checkConfig()方法结束。
接下里将会调用copySubscription()方法,将DefaultMQPullConsumer里配置的注册topic复制过来并抽象成消费者具体能够接受的形式。
private void copySubscription() throws MQClientException {try {Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();if (registerTopics != null) {for (final String topic : registerTopics) {SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//topic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}
在这里会根据每一个注册在DefaultMQPusllConsumer里的topic,创建SubscriptionData来完成topic数据的转变。
public final static String SUB_ALL = "*";
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
从SubscriptionData的数据结构可以看出每一个topic对应一个相应的SubscriptionData,里面存储着相应的tag之类的数据。在创建完毕之后,DefaultMQPullConsumerImpl里面的rebalanceImpl将会把topic和SubscriptionData作为键值对存放在里面。
rebalanceImpl的作用将会在具体发挥作用的时候解释。
到这里,开始阶段的配置检查与准备告一段落,接下里将会开启消费者客户端。
RocketMQ源码解析-Consumer启动(1)相关推荐
- RocketMQ源码解析-Consumer启动(2)
接着上文的Pull消费者启动继续讲. public void start() throws MQClientException {switch (this.serviceState) {case CR ...
- RocketMQ源码解析-Producer启动
RocketMQ中生产者通过DefaultProducer来创建. protected final transient DefaultMQProducerImpl defaultMQProducerI ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】
详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- 消息中间件RocketMQ源码解析-- --调试环境搭建
1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...
最新文章
- mysql安装好需要优化配置一下_Mysql的安装、配置、优化
- [Oracle]为何Archivelog 没有马上被删除
- SQL SERVER 2008 利用发布订阅方式实现数据库同步
- 在WinRT中读取资源文件
- Windows Phone 7 浏览器打开新窗口问题
- MySQL用中间件ProxySQL实现读写分离和主节点故障应用无感应
- ATL中的C++模板
- 收藏 | 机器学习中需要了解的 5 种采样方法
- 下载地址jquery upload file demo (C#)
- Single sign-on,什么是单点登陆?
- JQuery的click、bind、delegate、off、unbind
- SQL Server 环形缓冲区(Ring Buffer) -- 环形缓冲在AlwaysOn的应用
- windows 远程桌面服务器,Windows系统的远程桌面服务是什么
- python房地产成本管理软件_大型房地产成本管理软件
- Apache JMeter使用教程
- Linux挂载Linux网络共享文件夹
- App登录方式和测试重点总结
- Linux C/C++编程之(十四)文件操作相关函数
- matlab DFA算法计算Hurst指数
- TIBCO.Rendezvous简单的发消息的过程