针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。

网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。

自主搭建的RocketMQ

通过自主搭建RocketMQ,然后通过SpringBoot进行集成实现,可以参考在公众号【程序新视界】中的文章《Spring Boot快速集成RocketMQ实战教程》,可关注公众号搜索,也可以关注公众号之后回复“1003”,完整的实战步骤。

这里我们只摘取其中消费者的部分代码:

@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED, selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);@Overridepublic void onMessage(String message) {log.info("received registered message: {}", message);}
}

这是其中一个消费者,消费的topic为MqTopicConstant.DEMO_TOPIC,consumerGroup为REGISTERED的,tag便是selectorExpression指定的REGISTERED的tag。

针对同一的topic,另外一个tag的消费者的实现如下:

@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY, selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)
public class MqModifyListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);@Overridepublic void onMessage(String message) {log.info("received modify message: {}", message);}
}

我们可以看到topic是同一个,但consumerGroup和tag不同。这说明什么?这说明只要消费者的consumerGroup不同,那么topic相同的情况下,也可以通过tag进行区分的。

关于其他源码就不再这里贴出了,详情可关注公众号看对应文章。

基于云服务的RocketMQ

基于云服务的RocketMQ与自主搭建的基本一致,我们只要确保groupId(阿里云的叫法)不同,那么同一topic下的tag是可以进行区分处理的。

具体处理这里也只贴出部分代码:

@Configuration
public class ConsumerClient {@Resourceprivate MqConfigProperties mqConfigProperties;@Resourceprivate EquipmentMessageListener equipmentMessageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();// 配置文件Properties properties = mqConfigProperties.getMqProperties();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId());// 将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);// 订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();// --------业务板块开始--------Subscription subscription = new Subscription();// 设置需要消费的消息所属的topicsubscription.setTopic(MqConfigProperties.getInnerTopic());// 设置需要消费的消息所属的tagsubscription.setExpression(MqConfigProperties.getEquipmentMonitorTag());// 实现MessageListener接口,并且在consume方法中实现消费逻辑subscriptionTable.put(subscription, equipmentMessageListener);//订阅多个topic如上面设置// --------业务板块结束--------// 将订阅者消息放入consumerBean中,在Spring初始加载该bean时,监听MQ中的Topic和tag下的消息consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

在上面的代码中,重点是业务板块部分的代码,如果在订阅关系中重新将业务板块内的代码copy一份,然后修改对应的Expression值(也就是tag值),那么基本上是不会成功的。

往往发送大量消息,只能够收到一部分。其他的会被覆盖掉。当然,如果你想采用不同的topic来处理,只需将业务板块中的内容重新修改,添加到subscriptionTable中即可。

那么,如何解决标题中的问题呢?思路与第一种方案一样,阿里云这里只是创建了一个ConsumerBean,而上面的自主搭建时采用了多个Consumer。那么解决方案就是:初始化多个ConsumerBean,每个ConsumerBean中的配置不同的groupId和tag,同时注册不同的监听器。

如此一来,就可以监听一个topic下的不同tag了。

原理分析

两个一样的ConsumerGroup的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。

这种现象的原因是:消息的分配是Broker决定的,而不是Consumer端,Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。

这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?

那是因为:集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。

如果换成BROADCASTING,那后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。

如果还有其他相关问题,也可关注公众号“程序新视界”,相互沟通学习。

原文链接:《RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?》


程序新视界
公众号“程序新视界”,一个让你软实力、硬技术同步提升的平台,提供海量资料

RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?相关推荐

  1. 阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?

    面试官:同一个消费组内的消费者,如果订阅了相同的 topic,但是订阅的 tag 不一样,会有什么问题吗? 我:会出现丢消息的情况. 面试官:能详细说一说吗? 我:RocketMQ 要求同一个消费组内 ...

  2. rocketmq延时消息自定义配置;topic下tag使用

    概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...

  3. 消息中间件学习总结(11)——Kafka与RocketMQ的Topic数量对单机性能的影响比较分析

    引言 上一期我们对比了三类消息产品(Kafka.RabbitMQ.RocketMQ)单纯发送小消息的性能,受到了程序猿们的广泛关注,其中大家对这种单纯的发送场景感到并不过瘾,因为没有任何一个网站的业务 ...

  4. RocketMQ 中Topic、Tag、GroupName基本概念介绍

    本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...

  5. [Azure][Event hub]Kafka无法同时连接到同一个namespace下的两个Event hub

    1.问题背景 有一个应用需要用kafka消费event hub的消息,其中两个kafka consumer同时连接到了同一个namespace下的两个event hub. kafka:consumer ...

  6. IntelliJ IDEA导入多个eclipse项目到同一个workspace下

    IntelliJ IDEA 与eclipse在新建项目上工作区的叫法略有不同,区别见下图. 我们在eclipse都是在新建的workspace目录下新建我们的项目,但是在IDEA中没有workspac ...

  7. php只显示一部分文章,typecho同一个页面下调用不同分类的文章但是却只显示一个分类文章...

    typecho同一个页面下调用不同分类的文章但是却只显示一个分类文章 作者:佚名 来源:爱好者 时间:2018-04-30 问题描述: 同页面调用分类下文章,只显示一第一个分类下的文章 在一个页面中, ...

  8. 启动mq命令 linux,RocketMQ:Linux下启动server和broker的命令

    目录 QUESTION:RocketMQ:Linux下启动server和broker的命令? ANSWER: 一.启动mqnamesrv 1.1当前执行 1.2后台运行 二.启动mqbroker 2. ...

  9. RocketMQ:Linux下启动server和broker的命令

    目录 QUESTION:RocketMQ:Linux下启动server和broker的命令? ANSWER: 一.启动mqnamesrv 1.1当前执行 1.2后台运行 二.启动mqbroker 2. ...

最新文章

  1. 通信信号处理中为什么要分为I、Q两路
  2. 信息检索的评价指标(Precision、Recall、F-score、MAP、ROC、AUC)
  3. 对话农民丰收节交易会-林裕豪:从玉农业2021新年贺词
  4. 内向者优势 原版_未来内向的人具有越来越高的竞争力——心理学:学会运用性格优势...
  5. [云炬python3玩转机器学习笔记] 2-1机器学习基础概念
  6. 用C语言写HMI程序,HMI画面元素组成设计及代码生成方法与流程
  7. AUTOSAR从入门到精通100讲(一)-SPI、UART、I2C总线详解
  8. 应用架构设计“着火”“防火”经验之谈
  9. 支付宝Android接口4.0以上报错Failure calling remote service
  10. centos设置时间时区
  11. Leetcode每日一题:面试题17.12 binode
  12. 20171103_Python学习三周五次课
  13. 详解基于图卷积的半监督学习(附代码)
  14. java jpa 字段 关联_jpa查询关联表懒加载数据initialize proxy no session
  15. MySQLl优化【附带优化视频教程全套】
  16. 基本知识 100028
  17. 《Expert C Programming》(C专家编程)读书笔记
  18. 为什么正定矩阵等于转置_正定矩阵的定义性质-正定矩阵的判定方法-正定矩阵转置和本身...
  19. 圣诞节计算机老师贺卡祝福语,[圣诞节贺卡教师祝福语]2020圣诞节贺卡祝福语
  20. iis6 增加PHP+MYSQL等时,记得 PHP目录要给EVERYONE权限

热门文章

  1. NXP_RT1172 eFlexPWM 模块简介
  2. 魔兽世界:在网吧玩WOW,遇到很多旁观者(旁)
  3. 随机预言机(random oracle)和PRF(Pseudorandom Function)是什么,区别在哪里?
  4. 番茄花园站长被拘幕后:微软向个人盗版开刀
  5. 超经典爆笑的人生格言
  6. 专科毕业学习python有前途吗_专科生学IT有前途吗?
  7. Excel原封不动导入txt文件
  8. 如何打造一支无法跨部门合作的团队?
  9. text-justify属于css3,css3 text-justify属性怎么用
  10. 羊皮卷的故事-第八章-羊皮卷之一