1.问题背景

一个事件中心接收网关通过ActiveMq上报的告警事件,处理后持久化到数据库,消息模型为发布订阅模式。为了实现高可用,决定将事件中心进行集群部署,运行两个实例。
但是由于消息模型为发布/订阅(publish/subscribe,topic),每个eps实例都会收到告警消息。如不加以控制,势必会造成告警消息重复消费的问题。

即我们需要不同的应用系统关心相同的消息,同时单个应用系统内部又可以部署多个实例达到负载均衡和故障转移,提高系统的健壮性。

2.解决思路

我们首先想到可以通过消费端增加检测重复消息的逻辑,来解决重复消费的问题,但是这种方式增加额外判断逻辑,且浪费消费端性能,并不可取。
实际上我们可以通过ActiveMq的一些高级特性来很好的解决此问题。ActiveMq提供了虚拟主题(Virtual Topics)的功能,如下图:

即通过一些配置后,单个应用系统内多个实例监听同一个queue,实例之间即可对消息进行平均消费,共同承担消费的功能。如果其中一个eps实例宕机,其他实例仍可以正常消费消息,消息不会丢失遗漏。
Virtual Topic这个功能特性可以关闭,即useVirtualTopics属性,默认为true,设置为false即可关闭此功能。当此功能开启,并且使用了持久化的存储时,ActiveMq的broker启动的时候会从持久化存储里拿到所有的Topic的名称,如果名称模式与VirtualTopics匹配,则把它们添加到系统的Virtual Topics列表中去。当有consumer访问此VirtualTopics时,系统会自动创建持久化的queue,并在每次Topic收到消息时,分发到具体的queue。

3.使用方法

虚拟主题有两种使用方式,下面分别进行介绍。

3.1 topic-queue名称匹配方式

此种方式需要发布者发布的Topic以“VirtualTopic.”这样的前缀来命名(大小写敏感)。比如我们定义一个VirtualTopic.event,然后发布者将消息发布到VirtualTopic.event。订阅者需要订阅名称为”Consumer.*. VirtualTopic.event”这样的队列。
下面使用springboot搭建工程进行演示,由于配置简单只涉及到topic和queue的名称,下面只显示关键步骤。
首先发布者创建名称VirtualTopic.event的topic,然后每隔3秒向此topic发布一条消息,

@Service
public class JmsSendMessageDemo {@Autowiredprivate JmsTemplate jmsTemplate;private Integer i = 0;@Scheduled(fixedRate = 3000)public void sendMessage(){i++;Event event = new Event("触碰", i, new Date());jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.event"),JsonUtil.object2Json(event));//System.out.println("发送消息: "+event);}}

然后建立两个应用作为消费者分别监听名为“Consumer.A.VirtualTopic.event“的queue,
第一个消费者:

@Service
public class JmsMessageListener {@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")public void receiveMessage(String message){System.out.println("1  接收到消息: "+JsonUtil.json2Object(message,Event.class));}
}

第二个消费者:

@Service
public class JmsMessageListener {@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")public void receiveMessage(String message){System.out.println(" 2  接收到消息: "+JsonUtil.json2Object(message,Event.class));}
}

最后启动两个消费者和发布者,通过两个消费者的输出窗口可以看到他们共同承担了消息的消费:

1  接收到消息: Event{name='触碰', id=2, date=Tue Aug 28 21:18:50 CST 2018}
1  接收到消息: Event{name='触碰', id=4, date=Tue Aug 28 21:18:56 CST 2018}
1  接收到消息: Event{name='触碰', id=6, date=Tue Aug 28 21:19:02 CST 2018}
1  接收到消息: Event{name='触碰', id=8, date=Tue Aug 28 21:19:08 CST 2018}
1  接收到消息: Event{name='触碰', id=10, date=Tue Aug 28 21:19:14 CST 2018}
1  接收到消息: Event{name='触碰', id=12, date=Tue Aug 28 21:19:20 CST 2018}
1  接收到消息: Event{name='触碰', id=14, date=Tue Aug 28 21:19:26 CST 2018}
1  接收到消息: Event{name='触碰', id=16, date=Tue Aug 28 21:19:32 CST 2018}
1  接收到消息: Event{name='触碰', id=18, date=Tue Aug 28 21:19:38 CST 2018}
2  接收到消息: Event{name='触碰', id=1, date=Tue Aug 28 21:18:47 CST 2018}2  接收到消息: Event{name='触碰', id=3, date=Tue Aug 28 21:18:53 CST 2018}2  接收到消息: Event{name='触碰', id=5, date=Tue Aug 28 21:18:59 CST 2018}2  接收到消息: Event{name='触碰', id=7, date=Tue Aug 28 21:19:05 CST 2018}2  接收到消息: Event{name='触碰', id=9, date=Tue Aug 28 21:19:11 CST 2018}2  接收到消息: Event{name='触碰', id=11, date=Tue Aug 28 21:19:17 CST 2018}2  接收到消息: Event{name='触碰', id=13, date=Tue Aug 28 21:19:23 CST 2018}2  接收到消息: Event{name='触碰', id=15, date=Tue Aug 28 21:19:29 CST 2018}2  接收到消息: Event{name='触碰', id=17, date=Tue Aug 28 21:19:35 CST 2018}2  接收到消息: Event{name='触碰', id=19, date=Tue Aug 28 21:19:41 CST 2018}2  接收到消息: Event{name='触碰', id=21, date=Tue Aug 28 21:19:47 CST 2018}2  接收到消息: Event{name='触碰', id=23, date=Tue Aug 28 21:19:53 CST 2018}

然后我们将消费者2关闭(宕机),可以看到消费者1消费了所有的消息

1  接收到消息: Event{name='触碰', id=134, date=Tue Aug 28 21:25:26 CST 2018}
1  接收到消息: Event{name='触碰', id=135, date=Tue Aug 28 21:25:29 CST 2018}
1  接收到消息: Event{name='触碰', id=136, date=Tue Aug 28 21:25:32 CST 2018}
1  接收到消息: Event{name='触碰', id=137, date=Tue Aug 28 21:25:35 CST 2018}
1  接收到消息: Event{name='触碰', id=138, date=Tue Aug 28 21:25:38 CST 2018}

通过以上试验,我们看到通过虚拟主题的方式,两个实例可以通过监听同一个queue来平均消费消息,如果其中一个宕机,另一个会承担起所有的消息消费。
上面的方式需要发布者和订阅者统一对命名规范,如果发布者和订阅者已经存在,就需要统一升级,比较麻烦。实际上我们还可以拦截器的方法。

3.2 拦截器方式

这种方式需要修改ActiveMq的配置文件/conf/activemq.xml,添加以下拦截配置:

<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="event" prefix="demo.*." selectorAware="false"/>    </virtualDestinations></virtualDestinationInterceptor>
</destinationInterceptors>

然后发布者只需要往event主题上发布消息。订阅者通过订阅类似demo.A. event的队列的方式来消费。其他没有集群部署的应用仍可以订阅event主题进行消费。
此种方式的优点在于我们不需要对发布者和不需要改造的订阅者做任何变动,需要增加或者改造的订阅者使用虚拟主题的方式进行订阅即可达到负载均衡和故障转移的目的,当无法约束发布者发布的topic规范时,这种方式很有用。当然缺点就是需要修改ActiveMQ的配置,也就是说需要重启ActiveMQ,这对于已经上线的平台来说可能造成消息丢失。

参考:虚拟主题开发

ActiveMq消费端实现集群部署相关推荐

  1. ActiveMQ 集群部署

    ActiveMQ 集群部署 本章演示 ActiveMQ 集群部署,默认您已经安装了 zookeeper 集群,并在各服务器上成功安装了 ActiveMQ 单节点实例如果您的环境还不满足条件请参考前面的 ...

  2. apollo集群部署_ribbon+apollo实现灰度发布

    一.前言 在一般情况下,升级服务器端应用,需要将应用源码或程序包上传到服务器,然后停止掉老版本服务,再启动新版本.但是这种简单的发布方式存在两个问题,一方面,在新版本升级过程中,服务是暂时中断的,另一 ...

  3. Kafka 入门之集群部署遇到问题

    最近,因为上级主管部门需要通过使用Kafka向其传输文件,又因为此前没有接触过kafka,所以在部署测试kafka程序期间遇到很多问题,在这里总结4个问题与1个建议,方便入门者参考也便于遇到类似问题进 ...

  4. ActiveMQ的几种集群配置

    ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很 ...

  5. Kafka集群部署详细步骤(包含zookeeper安装步骤)

    Kafka集群部署 注意:如果jdk1.8和zookeeper都安装设置过之后可以直接安装kafka跳过其它步骤 kafka基础简介及基本命令 1.环境准备 1.1集群规划 node01  node0 ...

  6. RocketMQ初探(五)之RocketMQ4.2.6集群部署(单Master+双Master+2m+2s+async异步复制)

    以下部署方式结合众多博友的博客,经过自己一步一步实际搭建,如有雷同,侵权行为,请见谅...其中遇到不少的坑,希望能帮到更多的人,现在很少能找到一份完整版4.2.6版本的搭建教程了,如果你有幸遇见,那么 ...

  7. Kafka集群部署搭建完美标准版

    Kafka集群部署并启动 在本文中将从演示如何搭建一个Kafka集群开始,然后简要介绍一下关于Kafka集群的一些基础知识点.但本文仅针对集群做介绍,对于Kafka的基本概念不做过多说明,这里假设读者 ...

  8. Kafka SASL/SCRAM动态认证集群部署

    Kafka SASL/SCRAM动态认证集群部署 目的:配置SASL/PLAIN验证,实现了对Kafka的权限控制.但SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer中配置 ...

  9. K8S 学习笔记三 核心技术 Helm nfs prometheus grafana 高可用集群部署 容器部署流程

    K8S 学习笔记三 核心技术 2.13 Helm 2.13.1 Helm 引入 2.13.2 使用 Helm 可以解决哪些问题 2.13.3 Helm 概述 2.13.4 Helm 的 3 个重要概念 ...

最新文章

  1. 3种时间序列混合建模方法的效果对比和代码实现
  2. Swift3.0语言教程比较、判断字符串
  3. Android性能优化之APK优化,内容太过真实
  4. redis的关键路径和lazy-free
  5. 九九乘法表编程上三角python_java语言打印上三角和下三角,进一步得到九九乘法表...
  6. 水题(water)(非详细解答)
  7. mysql 两个查询结果合并去重_《MySQL 入门教程》第 21 篇 集合操作符
  8. LeetCode 646. 最长数对链(区间 贪心)
  9. jdbc显示mysql的数据_JDBC链接mysql插入数据后显示问号的原因及解决办法
  10. C语言基础教程之常量
  11. 我的第一个Python程序:Luogu1001 A+B Problem
  12. 大数据分页实现与性能优化【转】
  13. 微服务架构-实现技术之六大基础组件:服务通信+事件驱动+负载均衡+服务路由+API网关+配置管理
  14. 计算机丢失GetU,u盘启动引导文件丢失如何修复
  15. 导致联想拯救者y7000触控板失灵的一种可能
  16. Java之QQ界面实现
  17. 计算机ram数据原理,每日一科普:了解RAM是什么?有何用?
  18. 易语言大漠找字FindStrE系列
  19. 2022.06.07 前端-uniApp跨域解决办法
  20. 电脑杀蚊器:超级蚊霸+电子蚊香

热门文章

  1. cad文本改宋体字型lisp_CAD的40个常用命令和20个常见问题解决方法 撩妹必备技能...
  2. 两个python文件怎么联系在一起_【新手求助】怎样把两个程序连接在一起?老师作业,谢谢啦...
  3. java获取环境路径方法_JAVA获取服务器路径的方法
  4. java图片转成字符串_JAVA将图片(本地或者网络资源)转为Base64字符串,将base64字符串存储为本地图片...
  5. 面具卡米怎么删模块_魔兽8.3咋肥事——面具带几个收益高?对小怪宝箱水晶有加成吗?...
  6. 五十九、如何求N个数的最大公约数和最小公倍数
  7. 二十三、Java类中重载和重写的区别
  8. 新赛题上线!2021CCF大数据与计算智能大赛全面开赛!
  9. 理论+技术+代码已经准备完毕!2021年啃透花书!
  10. 知乎问题:概率图模型是否有必要系统地学习