自动创建Topic原理介绍

  RocketMQ在发送消息的时候,我们一般会先去Broker创建Topic信息,Producer在发送消息的时候会先去nameSrv拉取Topic信息,那么如果拉取不到Topic,会如何处理呢?这里RocketMQ提供一种自动创建Topic的机制。

Producer流程

  • 先从本地缓存topicPublishInfoTable中获取Topic。
  • 如果本地缓存不存在或者失效,则调用**mQClientFactory.updateTopicRouteInfoFromNameServer(topic)**取nameSrv获取Topic。
  • 如果nameSrv也没有获取到Topic,最后调用**mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)**获取默认TTBW102。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {// 如果没有Topic,并且自动创建topic,则先投递到TBW102主题,在Broker再创建topic           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

在调用org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)的时候,第二个参数isDefault为true标识拉取TBW102,这时候如果Broker开启了自动创建Topic,那么TBW102是肯定可以拉取成功的。

 if (isDefault && defaultMQProducer != null) {// 拉取TBW102topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}

Broker流程

  Broker在启动初始化TopicConfigManager的时候,autoCreateTopicEnable配置为true(默认也是开启的,但是生产上一般为了防止Topic随意创建和安全性,一般会选择关闭),在初始化Broker的时候会自动创建TBW102.

 public TopicConfigManager(BrokerController brokerController) {this.brokerController = brokerController;// 忽略前面的代码{// MixAll.AUTO_CREATE_TOPIC_KEY_TOPICif (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;TopicConfig topicConfig = new TopicConfig(topic);this.systemTopicList.add(topic);topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;topicConfig.setPerm(perm);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}}

  在接收到消息的时候会分发到org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage处理,其中调用了msgCheck方法来检查Topic信息。

 protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {// 省略代码TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {int topicSysFlag = 0;if (requestHeader.isUnitMode()) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);} else {topicSysFlag = TopicSysFlag.buildSysFlag(true, false);}}log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());// 创建TopictopicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);}

总结

整个流程总结一下,Producer发送一个不存在的Topic消息时,首先会从NameServer拉取Topic路由数据,第一次拉取必然失败,第二次会直接拉取TBW102的路由数据,基于它创建TopicPublishInfo并缓存到本地,进行正常的消息发送,在Header里将defaultTopic设置为TBW102。Broker接收到消息时,先对消息做Check,检查到Topic不存在,会基于defaultTopic的配置去创建该Topic,然后注册到NameServer上,这样一个全新的Topic就被自动创建了。

RocketMQ自动创建topic原理-TBW102相关推荐

  1. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  2. RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个

    问题现象 RocketMQ3.2.2版本,测试时尝试发送消息时自动创建Topic,设置了队列数量为8: producer.setDefaultTopicQueueNums(8); 同时设置broker ...

  3. kafka-0.10.2.1:Producer生产时无法自动创建Topic

    https://www.cnblogs.com/likethis/p/9988741.html

  4. RocketMQ 客户端 解决 No topic route info in name server for the topic:TBW102

    Apache-RocketMQ 版本 4.7.1 项目部署好了 ,也能正常的 收发消息,但是 rocketmq_client 日志 在 ${user.home}/logs/rocketmqlogs/r ...

  5. 生产环境下 RocketMQ 为什么不能开启自动创建主题?

    作者 | 丁威 责编 | 胡巍巍 现象 很多网友会问,为什么明明集群中有多台Broker服务器,autoCreateTopicEnable设置为true,表示开启Topic自动创建,但新创建的Topi ...

  6. java 创建topic,RocketMQ在Java代码之中手动创建Topic

    Rocketmq在Java代码之中手动创建Topic [原创,转载请注明出处] 我的 [博客园主页] [CSDN主页] [简书主页] 加V进Java交流群,备注Java交流:w1129574379 * ...

  7. RocketMQ快速入门之手动创建topic

    package cn.learn.rocketmq.topic;import org.apache.rocketmq.client.producer.DefaultMQProducer;public ...

  8. rocketmq消息存储原理_RocketMQ到底快在哪里?深入探索RocketMQ消息存储和查询原理...

    RocketMQ 作为一款优秀的分布式消息中间件,可以为业务方提供高性能低延迟的稳定可靠的消息服务.其核心优势是可靠的消费存储.消息发送的高性能和低延迟.强大的消息堆积能力和消息处理能力. 从存储方式 ...

  9. RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?

    针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ.但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的. 网上有很多分析 ...

最新文章

  1. 01 使用AFN3 0上传图片时间慢的问题
  2. 【十五分钟Talkshow】fmplan(十五分钟计划)的初步想法
  3. 同一MODBUS读写多(两)个BH32角度传感器
  4. 优秀大数据GitHub项目一览
  5. Selenium2+python自动化58-读取Excel数据(xlrd)
  6. 表现层(jsp)、持久层(类似dao)、业务层(逻辑层、service层)、模型(javabean)、控制层(action)...
  7. Notepad++的安装和基本使用
  8. 华为云PB级数据库GaussDB(for Redis)揭秘第13期:如何搞定推荐系统存储难题
  9. 星巴克饮品中竟喝出活蟑螂?官方回应了...
  10. php编写一个学生类_PHP 结合 Boostrap 结合 js 实现学生列表删除编辑及搜索功能
  11. [算法]华为笔试题——字母和十进制数映射
  12. LSOF 安装与使用
  13. 点云定义、PCL数据类型和点云处理方式
  14. iOS UIImageView设置为圆形
  15. BIN转HEX,HEX转BIN,互相转换工具,PIC
  16. 因为迁移,所以出卖(1)
  17. python之控制台版本(电影)增删改查
  18. 手机充值业务python_小伙利用Python爆破某会员网站,充会员?不存在的!
  19. 词云图制作:15张炫酷的词云图海报、PPT报告词云图、3D词云图,MagicCloud词云图一键制作软件
  20. TMS320C6678开发笔记---IBL编译与分析1

热门文章

  1. 面试之-----SSR优缺点
  2. 雷林鹏分享:MySQL 序列使用
  3. Nginx服务安装与启动脚本配置
  4. 基于51单片机列车车门自动开关测试台开发
  5. Unity基础 单点和多点触摸
  6. android 链接网络成功,Android之网络连接判断是否成功
  7. 软件系统的架构,反映人是公司组织结构
  8. android游戏母包打渠道包jar包无法编译为dex解决思路
  9. 智能优化算法对TSP问题的求解研究
  10. echart地图下钻