RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个
问题现象
RocketMQ3.2.2版本,测试时尝试发送消息时自动创建Topic,设置了队列数量为8:
producer.setDefaultTopicQueueNums(8);
同时设置broker服务器的配置文件broker.properties:
defaultTopicQueueNums=16
但实际创建后从控制台及后台打印代码观察到该Topic只创建了4个队列,反复重试确认发送消息时自动创建Topic,最大创建4个队列。
查找原因,服务端与客户端配置对比
阅读源码,在TopicConfigManager的createTopicInSendMessageMethod方法,有对比TopicConfig对象中的队列数和客户端设定队列数,并选择其中较小者为新建Topic队列数的逻辑:
int queueNums = clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig.getWriteQueueNums() : clientDefaultTopicQueueNums;
定位问题在服务端TopicConfig
打印这两个变量:
客户队列数clientDefaultTopicQueueNums为8,正确;
而defaultTopicConfig.getWriteQueueNums()为4,而非broker.properties中设定的16;
由可以确定是问题出在defaultTopicConfig上。
defaultTopicConfig数据来源
defaultTopicConfig是从ConcurrentHashMap<String,TopicConfig> topicConfigTable中取得,如下:
TopicConfig defaultTopicConfig =this.topicConfigTable.get(defaultTopic);
而defaultTopic默认值为MixAll.DEFAULT_TOPIC=“TBW102”。
为了确认topicConfigTable中的为MixAll.DEFAULT_TOPIC的Config对象属性值的真实来源,继续阅读源码,发现borker有两处改写DEFAULT_TOPIC的Config对象的位置:
一处是TopicConfigManager的构造方法,在borker服务器启动时运行,会读取broker.properties里的配置,此时DEFAULT_TOPIC的Config对象里的DefaultQueueNums为正确的我所配置的16;
一处是在BrokerController的initialize方法里调用了TopicConfigManager.load方法:
该load方法继承自ConfigManager类,读取了$ROCKETMQ_HOME\store\config下保存的配置信息,并调用抽象方法decode(),配置信息作为json字符串参数传入到decode();
TopicConfigManager类的decode实现方法里,读取了$ROCKETMQ_HOME\store\config\topics.json里的配置信息,并覆写到topicConfigTable,而此前生成的topics.json的“TBW102”的配置信息里的writeQueueNums及readQueueNums均为4。
最终结论
在发送消息自动创建Topic时,对于此前已运行的borker服务器,修改配置文件的defaultTopicQueueNums属性的值不起作用。
因为发送消息自动创建Topic的实现里,队列数取小对比操作的变量——defaultTopicConfig写在topics.json的配置信息里的writeQueueNums及readQueueNums,读取自Topics.json,所以即使修改配置文件并重启borker服务器后也不会改变。而服务端最终会用topics.json的值覆盖发送消息自动创建Topic时的TopicConfig配置信息。
阿里的解释
队列是资源,所以管控权会放到服务器。
但是每个用户的默认策略又不一样,所以会有一个默认topic作为模板,在未创建默认topic前,系统会自动创建一个。
这个可以占到运维的角度思考,例如你运维了10个集群,为1000个用户服务。有些用户需要动态的创建topic,但是不能给他足够的权限,想创建多少创建多少。
所有会给他一个模板的topic,就是defaultTopic,动态创建topic继承于defaultTopic配置,队列数不能超过defaultTopic。
解决办法
通过producer.createTopic方法创建(实现的方式是将创建命发送到控制台),可以创建成功,但会报应答超时失败,原因未查;
通过控制台方式创建;
修改metaq源码重新编译borker,使用broker的配置信息覆盖defaultTopic的配置信息。
转载于:https://blog.51cto.com/quantum/1581076
RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个相关推荐
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
- 【kafka系列】kafka之生产者发送消息实践
目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...
- RocketMQ自动创建topic原理-TBW102
自动创建Topic原理介绍 RocketMQ在发送消息的时候,我们一般会先去Broker创建Topic信息,Producer在发送消息的时候会先去nameSrv拉取Topic信息,那么如果拉取不到 ...
- 生产者发送消息的过程?
1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel). 2.Producer声明一个交换器并设置好相关属性. 3.Producer声明一个队列并设置好 ...
- Kafka生产者发送消息的三种方式
Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量.灵活的offset是其它消息系统所没有的. Kafka发送消息主要有三种方式: 1.发送并忘记 2.同步发送 3.异步发送+回调函数 下 ...
- 线上问题-kafka生产者发送消息总是失败
问题解决过程 线上一个界面发现老是没有数据,排查下来时生产者没有成功发送消息所致,报错如下: org.springframework.kafka.core.KafkaProducerException ...
- Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报
文章目录 1. 项目背景 2. 依赖和配置 3. 生产者配置 KafkaConfiguration 4. 同步数据Topic枚举 SyncDataTopicEnum 5. 请求体 DataSyncQo ...
- 10 kafka生产者发送消息的原理
1.发送原理: 在消息发送的过程中,涉及到了两个线程--main 线程和 Sender 线程.在 main 线程 中创建了一个双端队列 RecordAccumulator.main 线程将消息发送给 ...
- 【Kafka消息队列】生产者发送消息流程
如何描述一条消息? 如何描述一条消息,就是在问这条消息的数据结构是什么? public class ProducerRecord<K, V> {private final String t ...
最新文章
- Dubbo -- 系统学习 笔记 -- 示例 -- 参数验证
- python爬虫代码1000行-Python 你见过三行代码的爬虫吗
- NamespaceHandler 接口
- .NET Core开发日志——Linux版本的SQL Server
- 一张报表节约几十万能耗,新华扬解密精益生产的精髓
- .axf文件_ELF文件格式与readelf命令使用
- java进阶案例下载_登录案例java实现 ---- Java进阶篇
- java集合框架中迭代器的作用_Java中的集合框架之迭代器
- uniapp开发h5应用进行微信网页授权登录获取code失败
- java7jdk官网下载安装,JDK官方网站下载、安装教程及环境配置,jdk官方网站
- python矩阵操作_python矩阵操作
- Mysql如何按照指定间隔时间查询数据
- Linux下为Calibre书库打中文目录名与文件名补丁
- 安防行业安全产品分类、趋势分析
- 面试技巧: 轻松过关10种方法
- 小程序开发-用户对自己信息的更改
- 关于使用Navicat,Mysql Workbench,PowerDesigner根据mysql数据库生成ER(实体联系图)的解决方案的总结
- python股票量化分析_python实战之股票交易量化分析
- 百度地图JavaScript API 学习之自定义标注图标(一)
- 高数笔记(十六):无条件极值,条件极值(拉格朗日乘数法),最值求法,二元函数的泰勒公式