问题现象

RocketMQ3.2.2版本,测试时尝试发送消息时自动创建Topic,设置了队列数量为8:

producer.setDefaultTopicQueueNums(8);

同时设置broker服务器的配置文件broker.properties:

defaultTopicQueueNums=16

但实际创建后从控制台及后台打印代码观察到该Topic只创建了4个队列,反复重试确认发送消息时自动创建Topic,最大创建4个队列。

查找原因,服务端与客户端配置对比

阅读源码,在TopicConfigManager的createTopicInSendMessageMethod方法,有对比TopicConfig对象中的队列数和客户端设定队列数,并选择其中较小者为新建Topic队列数的逻辑:

int queueNums = clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig.getWriteQueueNums() : clientDefaultTopicQueueNums;

定位问题在服务端TopicConfig

打印这两个变量:

客户队列数clientDefaultTopicQueueNums为8,正确;

而defaultTopicConfig.getWriteQueueNums()为4,而非broker.properties中设定的16;

由可以确定是问题出在defaultTopicConfig上。

defaultTopicConfig数据来源

defaultTopicConfig是从ConcurrentHashMap<String,TopicConfig> topicConfigTable中取得,如下:

TopicConfig defaultTopicConfig =this.topicConfigTable.get(defaultTopic);

而defaultTopic默认值为MixAll.DEFAULT_TOPIC=“TBW102”。

为了确认topicConfigTable中的为MixAll.DEFAULT_TOPIC的Config对象属性值的真实来源,继续阅读源码,发现borker有两处改写DEFAULT_TOPIC的Config对象的位置:

一处是TopicConfigManager的构造方法,在borker服务器启动时运行,会读取broker.properties里的配置,此时DEFAULT_TOPIC的Config对象里的DefaultQueueNums为正确的我所配置的16;

一处是在BrokerController的initialize方法里调用了TopicConfigManager.load方法:

该load方法继承自ConfigManager类,读取了$ROCKETMQ_HOME\store\config下保存的配置信息,并调用抽象方法decode(),配置信息作为json字符串参数传入到decode();

TopicConfigManager类的decode实现方法里,读取了$ROCKETMQ_HOME\store\config\topics.json里的配置信息,并覆写到topicConfigTable,而此前生成的topics.json的“TBW102”的配置信息里的writeQueueNums及readQueueNums均为4。

最终结论

在发送消息自动创建Topic时,对于此前已运行的borker服务器,修改配置文件的defaultTopicQueueNums属性的值不起作用。

因为发送消息自动创建Topic的实现里,队列数取小对比操作的变量——defaultTopicConfig写在topics.json的配置信息里的writeQueueNums及readQueueNums,读取自Topics.json,所以即使修改配置文件并重启borker服务器后也不会改变。而服务端最终会用topics.json的值覆盖发送消息自动创建Topic时的TopicConfig配置信息。

阿里的解释

队列是资源,所以管控权会放到服务器。

但是每个用户的默认策略又不一样,所以会有一个默认topic作为模板,在未创建默认topic前,系统会自动创建一个。

这个可以占到运维的角度思考,例如你运维了10个集群,为1000个用户服务。有些用户需要动态的创建topic,但是不能给他足够的权限,想创建多少创建多少。

所有会给他一个模板的topic,就是defaultTopic,动态创建topic继承于defaultTopic配置,队列数不能超过defaultTopic。

解决办法

  • 通过producer.createTopic方法创建(实现的方式是将创建命发送到控制台),可以创建成功,但会报应答超时失败,原因未查;

  • 通过控制台方式创建;

  • 修改metaq源码重新编译borker,使用broker的配置信息覆盖defaultTopic的配置信息。

转载于:https://blog.51cto.com/quantum/1581076

RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个相关推荐

  1. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  2. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

  3. RocketMQ自动创建topic原理-TBW102

    自动创建Topic原理介绍   RocketMQ在发送消息的时候,我们一般会先去Broker创建Topic信息,Producer在发送消息的时候会先去nameSrv拉取Topic信息,那么如果拉取不到 ...

  4. 生产者发送消息的过程?

    1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel). 2.Producer声明一个交换器并设置好相关属性. 3.Producer声明一个队列并设置好 ...

  5. Kafka生产者发送消息的三种方式

    Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量.灵活的offset是其它消息系统所没有的. Kafka发送消息主要有三种方式: 1.发送并忘记 2.同步发送 3.异步发送+回调函数 下 ...

  6. 线上问题-kafka生产者发送消息总是失败

    问题解决过程 线上一个界面发现老是没有数据,排查下来时生产者没有成功发送消息所致,报错如下: org.springframework.kafka.core.KafkaProducerException ...

  7. Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报

    文章目录 1. 项目背景 2. 依赖和配置 3. 生产者配置 KafkaConfiguration 4. 同步数据Topic枚举 SyncDataTopicEnum 5. 请求体 DataSyncQo ...

  8. 10 kafka生产者发送消息的原理

    1.发送原理: 在消息发送的过程中,涉及到了两个线程--main 线程和 Sender 线程.在 main 线程 中创建了一个双端队列 RecordAccumulator.main 线程将消息发送给 ...

  9. 【Kafka消息队列】生产者发送消息流程

    如何描述一条消息? 如何描述一条消息,就是在问这条消息的数据结构是什么? public class ProducerRecord<K, V> {private final String t ...

最新文章

  1. Dubbo -- 系统学习 笔记 -- 示例 -- 参数验证
  2. python爬虫代码1000行-Python 你见过三行代码的爬虫吗
  3. NamespaceHandler 接口
  4. .NET Core开发日志——Linux版本的SQL Server
  5. 一张报表节约几十万能耗,新华扬解密精益生产的精髓
  6. .axf文件_ELF文件格式与readelf命令使用
  7. java进阶案例下载_登录案例java实现 ---- Java进阶篇
  8. java集合框架中迭代器的作用_Java中的集合框架之迭代器
  9. uniapp开发h5应用进行微信网页授权登录获取code失败
  10. java7jdk官网下载安装,JDK官方网站下载、安装教程及环境配置,jdk官方网站
  11. python矩阵操作_python矩阵操作
  12. Mysql如何按照指定间隔时间查询数据
  13. Linux下为Calibre书库打中文目录名与文件名补丁
  14. 安防行业安全产品分类、趋势分析
  15. 面试技巧: 轻松过关10种方法
  16. 小程序开发-用户对自己信息的更改
  17. 关于使用Navicat,Mysql Workbench,PowerDesigner根据mysql数据库生成ER(实体联系图)的解决方案的总结
  18. python股票量化分析_python实战之股票交易量化分析
  19. 百度地图JavaScript API 学习之自定义标注图标(一)
  20. 高数笔记(十六):无条件极值,条件极值(拉格朗日乘数法),最值求法,二元函数的泰勒公式

热门文章

  1. C# Winform 启动和停止进程
  2. 用Inno Setup来解决.NetFramework安装问题
  3. 2017-1-25总结 主框架设计
  4. 用函数制作简单的选项卡
  5. PostgreSQL 使用 pgbench 测试 sysbench 相关case
  6. ABBYY在MS Office中创建PDF文件的方法
  7. 汇编语言第二课作业-实验1
  8. 二级域名session 共享方案
  9. 请教一个算法问题,有两个数组A,B,判断A中是否至少有一个元素和B中元素相同...
  10. 微软,您的.net为中国程序员带来了什么?