AMQ 虚拟topic
业务场景:
为了做到高可用性,topic的consumer服务通常是多台服务。如果用普通的Topic,则多个consumer的服务就会出现重复消费的情况。
解决方案:
AMQ引入了虚拟Topic,如果Topic的名字是以"VirtualTopic."开头,则AMQ自动将其识别为虚拟主题的Topic,如 VirtualTopic.NORMAL。
其对应的consumer则需要以 "Consumer.groupName.VirtualTopic.X"的格式命名,其中groupName是为了标记consumer的分组,如 Consumer.normal.VirtualTopic.NORMAL。
原理:
AMQ是通过Queue来实现这个功能。
package com.hayden.amq;import java.util.concurrent.atomic.AtomicInteger;import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;public class TestVirtualTopic { @Testpublic void testNormalTopic() { try { ActiveMQConnectionFactory factoryA = getAMQConnectionFactory(); ActiveMQTopic queue = new ActiveMQTopic(getNormalTopicName()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer( queue ); MessageConsumer consumer2 = session.createConsumer( queue ); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { int index = aint1.incrementAndGet();System.out.println(index+ " => receive from "+ getNormalTopicName() +": " + message); Thread.sleep(10L);} catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalTopicName())); int index = 0; while (index++ < 100) {
// System.out.println("Start to send msg");TextMessage message = session.createTextMessage(index + " message."); producer.send(message); Thread.sleep(5L);} } catch (Exception e) { e.printStackTrace(); } }@Testpublic void testNoralVirtualTopic() { try { ActiveMQConnectionFactory factoryA = getAMQConnectionFactory(); Queue queue = new ActiveMQQueue(getVirtualTopicConsumerName()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer( queue ); MessageConsumer consumer2 = session.createConsumer( queue ); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { int index = aint1.incrementAndGet();System.out.println(index+ " => receive from "+ getNormalVirtualTopicName() +": " + message);
// if(index % 2==0){
// this.wait(1000L);
// }} catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalVirtualTopicName())); int index = 0; while (index++ < 100) { TextMessage message = session.createTextMessage(index + " message."); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } }@Testpublic void testVirtualTopic() { try { ActiveMQConnectionFactory factoryA = getAMQConnectionFactory(); Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer( queue ); MessageConsumer consumer2 = session.createConsumer( queue ); MessageConsumer consumer3 = session.createConsumer( new ActiveMQQueue(getVirtualTopicConsumerNameB()) ); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { System.out.println(aint1.incrementAndGet() + " => receive from "+ getVirtualTopicConsumerNameA() +": " + message); } catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); final AtomicInteger aint2 = new AtomicInteger(0); MessageListener listenerB = new MessageListener() { public void onMessage(Message message) { try { System.out.println(aint2.incrementAndGet() + " => receive from "+ getVirtualTopicConsumerNameB() +": " + message); } catch (Exception e) { e.printStackTrace(); } } }; consumer3.setMessageListener(listenerB); MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName())); int index = 0; while (index++ < 10) { TextMessage message = session.createTextMessage(index + " message."); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } }private ActiveMQConnectionFactory getAMQConnectionFactory() {return new ActiveMQConnectionFactory( "tcp://127.0.0.1:61616");} protected static String getNormalTopicName() { return "nomal.TEST"; } protected static String getNormalVirtualTopicName() { return "VirtualTopic.NORMAL"; } protected static String getVirtualTopicName() { return "VirtualTopic.TEST"; } protected static String getVirtualTopicConsumerName() { return "Consumer.normal.VirtualTopic.NORMAL"; }protected static String getVirtualTopicConsumerNameA() { return "Consumer.A.VirtualTopic.TEST"; } protected static String getVirtualTopicConsumerNameB() { return "Consumer.B.VirtualTopic.TEST"; } }
AMQ 虚拟topic相关推荐
- RabbitMq系列(九):主题交换Topic Exchange
系列文章 RabbitMq系列(一):服务器搭建 RabbitMq系列(二):最简单的例子 RabbitMq系列(三):工作队列 RabbitMq系列(四):消息确认和持久性 RabbitMq系列(五 ...
- Rabbitmq小书
Rabbitmq小书 RabbitMQ 安装 Docker安装 Rabbitmq初识 AMQP 0.9.1 协议解析 AMQP协议简介 消息代理和他们所扮演的角色 AMQP 0-9-1 模型简介 AM ...
- RabbitMQ简介以及AMQP协议
RabbitMQ能为你做些什么? 消息系统允许软件.应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步 ...
- RabbitMq详解之AMQP协议
AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信.通过了解Rabb ...
- AMQP协议简介(源自官网的翻译)
感谢作者的翻译:AMQP 0 9 1 Model Explained - RabbitMQ 中文 对AMQP有了更深刻的认识. 目录 AMQP 0-9-1 和 AMQP 模型高阶概述 AMQP是什么 ...
- ActiveMQ学习(七)
2019独角兽企业重金招聘Python工程师标准>>> Destination的高级特性 1.组合destinations(虚拟queue) 指的是组合队列,就是虚拟destinat ...
- 张洋:浅析PageRank算法
本文引自http://blog.jobbole.com/23286/ 很早就对Google的PageRank算法很感兴趣,但一直没有深究,只有个轮廓性的概念.前几天趁团队outing的机会,在动车上看 ...
- RabbitMQ入门教程——发布/订阅
什么是发布订阅 发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象.这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态. 为了描述这种 ...
- ActiveMQ Destination高级特性
1.1通配符 1"." 用于作为路径上名字间的分割符 2">" 用于递归的匹配任何以这个名字开始的Destination 3 "*& ...
最新文章
- 关于线程执行顺序的问题
- linux获取网卡的ip,Windows和Linux系统下获取多网卡的ip地址
- iphone保修期多久_卖疯了!开发区9.9元起换iphone原厂电池!
- nginx 配置SSL/HTTPS
- 经典题目螺旋方阵的详解
- git使用---工作区和暂存区
- webserver之使用数组实现阻塞队列
- html2canvas在安卓端微信里截取从相册打开的图片空白问题
- SAP 43亿美元收购Ariba 拓展云计算市场
- Win10 迅雷9精简优化设置方法
- Android根据包名获取APP名称
- selenium docker
- HTML班级网站实例(笔记2)
- 收到大量垃圾短信怎么办?如何屏蔽垃圾短信?
- 蓝牙连接测试系统软件,蓝牙测试软件官方版
- C#中的lock(锁)应用例子
- 裸奔的智能插座:博联Broadlink SP2/SP mini的分析、破解
- 121. 买卖股票的最佳时机_面试题63. 股票的最大利润_[找出数组中一个元素和它后面最大的元素的差值]
- Scrum立会报告+燃尽图(Beta阶段第二周第五次)
- ArcGIS批量出图操作流程(附练习数据下载)
热门文章
- Assignment | 05-week1 -Character level language model - Dinosaurus land
- 清新黄色背景薪酬绩效考核管理模版PPT模板
- 关于DIV中display属性误区以及牵扯出来的两个问题
- oracle 表分区,根据时间按月分区做一次记录
- HTML+CSS+JS`管理系统网站设计——学生信息管理系统模板 (13页) HTML+CSS+JavaScript html网页设计期末大作业_网页设计平时作业
- UE4反射原理(转)
- 大数据量的兴趣点如何在Cesium快速加载?(weixin公众号【图说GIS】)
- python二郎成长笔记(三)(matlab标定工具箱详解,旋转矩阵旋转向量,matlab标定数据传入opencv)
- Project2007操作手册(原创)
- macOS Final Cut Pro X 导入视频、剪切视频