关于 RocketMQ:The producer group has been created before, specify another name please.这个报错的解决办法...
1.在网上看了一些解决这个问题的办法,大部分朋友都说是要在实例化 DefaultMQProducer 的时候指定惟一的 instanceName 来解决,窃以为这样虽然解决了问题,但却是不应该用的解决办法。为什么这样说?因为官网介绍客户端公共参数的时候对这个instanceName有明确的说明
instanceName | DEFAULT |
客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
所以,这个 instanceName 所标识的实例会同时创建自己的网络连接,线程资源,如果每次创建一个 Producer 都指定不同的 instanceName 这样就会 浪费 更多资源 比如内存和线程,网络IO。还会降低消息处理的效率。按照说明,应该是尽可能多个Producer共用一个instanceName 才合理。
2.另外,题目上的报错,是因为 group 已被创建,为什么要用指定不同且唯一的 instanceName 来解决呢?不能因为这样能解决就这样解决。实际上,如果用 DefaultMQProducer 来实例 producer 则会把创建好的producer先放到一个 producerTable
ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
中,代码中的方法是
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
这个方法里 关键地方是
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
在添加的时候如果发以group为键的producer已存在,则注册失败。这里的键是group所以,当我们已经创建了同group的producer时,如果这个 producer没有shutdown,则再次以同样的group创建producer的时候就会报题目中的错误。
而shutdown之后之所以不报错是因为,shutdown这个方法本身调用 的是 unregisterProducer(String group) 在类 MQClientInstance 中。这个方法是包含从 producerTable 中把已添加的producer先移除,然后再shutdown的。具体代码是下面这样的
this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
this.defaultAsyncSenderExecutor.shutdown();
所以先 调用 DefaultMQProducer shutdown 之后再创建新的同group的producer是不会报错的。
3.我们再看为什么每次用 DefaultMQProducer 来创建 producer的时候如果 都设置不同的instanceName为什么也不会报错呢?这是因为如果设置的instanceName是唯一的。则在注册producer之前,如果设置的group 不是默认的,则每次 获取的mQClientFactory 都是不同的,而 producerTable 是 mQClientFactory类里的一个属性,这样当然producerTable也是不同的,这样注册producer当然是注册到不同的producerTable中去了,所以不会报错。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID(); }this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
但是,这种解决办法是不可取的。因为instanceName是一个比较重(隔离数据多,创建耗时长,消费资源多)的参数。
4.那么我们怎么更好的解决这个问题呢?我们可以参考源码中 logappender 模块中 的 ProducerInstance 类来实现。这个类在源码中位于 org.apache.rocketmq.logappender.common 下面
下面是这个类的源码
public class ProducerInstance {public static final String APPENDER_TYPE = "APPENDER_TYPE";public static final String LOG4J_APPENDER = "LOG4J_APPENDER";public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";public static final String DEFAULT_GROUP = "rocketmq_appender";private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();private static ProducerInstance instance = new ProducerInstance();public static ProducerInstance getProducerInstance() {return instance;}/**根据 nameServerAddress 和 group 生成 producer 在 producerMap 中的键**/ private String genKey(String nameServerAddress, String group) {return nameServerAddress + "_" + group;}/**
根据 nameServerAddress 和 group 获取已注册到producerMap中的producer,如果不存在,则调用 DefaultMQProducer 生成新的producer注册并返回
**/ public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {if (StringUtils.isBlank(group)) {group = DEFAULT_GROUP;}String genKey = genKey(nameServerAddress, group);MQProducer p = getProducerInstance().producerMap.get(genKey);if (p != null) {return p;}DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);defaultMQProducer.setNamesrvAddr(nameServerAddress);MQProducer beforeProducer = null;beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);if (beforeProducer != null) {return beforeProducer;}defaultMQProducer.start();return defaultMQProducer;}/**
根据 nameServerAddress 和 group 移除已注册到producerMap中的producer,同时shutdown
**/ public void removeAndClose(String nameServerAddress, String group) {if (group == null) {group = DEFAULT_GROUP;}String genKey = genKey(nameServerAddress, group);MQProducer producer = getProducerInstance().producerMap.remove(genKey);if (producer != null) {producer.shutdown();}}/**
移除 producerMap 中所有的 producer 并全部关闭。
**/public void closeAll() {Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();for (Map.Entry<String, MQProducer> entry : entries) {getProducerInstance().producerMap.remove(entry.getKey());entry.getValue().shutdown();}}}
可以把这个类直接复制到要使用的项目中,然后在要使用指定 nameServerAddress 和 group 的 producer 时,直接用下面的方法获取一个。
MQProducer producer = ProducerInstance.getProducerInstance().getInstance("localhost:9876", "test-group");
/*
自己生成message消息,然后下面发送
*/
producer.send(message);
/*
如果是比较频繁使用的producer,发送完消息后不用关闭和移除下次再用的时候可以直接再获取拿来就可以发送消息。
对于确定要隔比较长时间不用的producer,可以用下面的方法 移除并关闭
*/
ProducerInstance.getProducerInstance().removeAndClose("localhost:9876", "test-group");
我们会发现,这个类获取 producer 实例的时候只用了 nameServerAddress 和 group 这两个参数。如果我们确实需要操作不同的 instanceName 下的 producer 时,该怎么办呢?直接改造 这个类里的方法,添加上
instanceName 参数 即可。
加参数后的类如下,使用方式没什么差别只是多了个参数而已。
public class ProducerInstance {public static final String APPENDER_TYPE = "APPENDER_TYPE";public static final String LOG4J_APPENDER = "LOG4J_APPENDER";public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";public static final String DEFAULT_GROUP = "rocketmq_appender";private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();private static ProducerInstance instance = new ProducerInstance();public static ProducerInstance getProducerInstance() {return instance;}private String genKey(String nameServerAddress, String group,String instanceName) {return nameServerAddress + "_" + group + "_" + instanceName;}public MQProducer getInstance(String nameServerAddress, String group,String instanceName) throws MQClientException {if (StringUtils.isBlank(group)) {group = DEFAULT_GROUP;}if (StringUtils.isBlank(instanceName)) { instanceName = "DEFAULT"; }String genKey = genKey(nameServerAddress, group, instanceName);MQProducer p = getProducerInstance().producerMap.get(genKey);if (p != null) {return p;}DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);defaultMQProducer.setNamesrvAddr(nameServerAddress);defaultMQProducer.setInstanceName(instanceName);MQProducer beforeProducer = null;beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);if (beforeProducer != null) {return beforeProducer;}defaultMQProducer.start();return defaultMQProducer;}public void removeAndClose(String nameServerAddress, String group, String instanceName) {if (group == null) {group = DEFAULT_GROUP;}if (StringUtils.isBlank(instanceName)) {instanceName = "DEFAULT";}String genKey = genKey(nameServerAddress, group,instanceName);MQProducer producer = getProducerInstance().producerMap.remove(genKey);if (producer != null) {producer.shutdown();}}public void closeAll() {Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();for (Map.Entry<String, MQProducer> entry : entries) {getProducerInstance().producerMap.remove(entry.getKey());entry.getValue().shutdown();}}}
关于 RocketMQ:The producer group has been created before, specify another name please.这个报错的解决办法...相关推荐
- RocketMQ:The producer group has been created before, specify another name please.
RocketMQ新创建生产者时报异常: The producer group[ ] has been created before, specify another name please. 发送消息 ...
- MySQL报错1055解决办法:[Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains
[mysql报错1055 报错解决办法][Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and cont ...
- The producer group has been created before
为什么80%的码农都做不了架构师?>>> 编写一个help类 public class RunTimeUtil {private static AtomicInteger in ...
- sqlserver查询补全时间_mssql 按日期分组(group by)查询统计的时候,没有数据补0的解决办法...
摘要: 下文讲述一次报表制作的需求, 需制作一个月的销量的数据汇总,如果其中某一天没有数据,那么就补0处理 例: /* 统计2018-4月份的销量统计, 无数据的天补0 */ ---建立基础数据 cr ...
- Apache RocketMQ 安装、测试、报错解决
1. 准备 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 64bit OS, Linux/Unix/Mac 64bit JDK 1.8+; Mav ...
- 一个NVIDIA驱动安装报错——ERROR: The nvidia kernel module was not created.
Ubuntu18系统下,在安装cuda及nvidia驱动时,安装失败,查看日志显示"ERROR: The nvidia kernel module was not created." ...
- RocketMQ特性、专业术语(Producer,Producer Group,Consumer Group,Topic,Message,Tag,Broker,Name Server)等
RocketMQ 是什么 Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点 ...
- The consumer group has been created before, specify another name please RocketMQ异常
解决方案一: rocketmq 报 The consumer group has been created before, specify another name please. 错误 是因为一个s ...
- RocketMQ:Producer启动流程与消息发送源码分析
文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...
最新文章
- android 获取网卡mac_防亚马逊账号关联黑科技--如何修改我们的网卡MAC到底重要不?...
- TMG 2010 建立站对站***隧道
- 谜题81:烧焦到无法识别
- Android通过cat /sys/kernel/debug/usb/devices获取USB信息
- python中range和xrange的区别_python中range和xrange的区别
- 【MSTR产品】获取当前登陆用户的login_id
- mysql中int型的zerofill参数
- 今天除夕,给您拜年了!
- Ant Build.xml
- 【rman】list archivelog all与list backup of archivelog all
- cv mat的shape_将ndarray转换为cv::Mat的最简单方法是什么?
- linux进行显卡配置修改什么游戏,配置好Linux显卡驱动 爽快玩游戏
- mysql limit offset很大_MySQL查询中LIMIT的大offset导致性能低下浅析
- java.lang.ArrayIndexOutOfBoundsException
- 实体词典 情感词典_基于情感词典的情感分析
- 【计算机网络】湖科大微课堂 笔记目录(完结)
- PASCAL VOC数据集分析及下载、解压
- 微信小程序发送服务通知(模板消息)前后端实现代码附效果图
- 一文读懂什么是MRO采购
- gpsgate 配置过程