业务场景:

为了做到高可用性,topic的consumer服务通常是多台服务。如果用普通的Topic,则多个consumer的服务就会出现重复消费的情况。

解决方案:

AMQ引入了虚拟Topic,如果Topic的名字是以"VirtualTopic."开头,则AMQ自动将其识别为虚拟主题的Topic,如 VirtualTopic.NORMAL

其对应的consumer则需要以 "Consumer.groupName.VirtualTopic.X"的格式命名,其中groupName是为了标记consumer的分组,如 Consumer.normal.VirtualTopic.NORMAL。

原理:

    AMQ是通过Queue来实现这个功能。

package com.hayden.amq;import java.util.concurrent.atomic.AtomicInteger;import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;public class TestVirtualTopic {  @Testpublic void testNormalTopic() {  try {  ActiveMQConnectionFactory factoryA = getAMQConnectionFactory();  ActiveMQTopic queue = new ActiveMQTopic(getNormalTopicName());  ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();  conn.start();  Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);  MessageConsumer consumer1 = session.createConsumer( queue );  MessageConsumer consumer2 = session.createConsumer( queue );  final AtomicInteger aint1 = new AtomicInteger(0);  MessageListener listenerA = new MessageListener() {  public void onMessage(Message message) {  try {  int index = aint1.incrementAndGet();System.out.println(index+ " => receive from "+ getNormalTopicName() +": " + message);  Thread.sleep(10L);} catch (Exception e) {  e.printStackTrace();  }  }  };  consumer1.setMessageListener(listenerA);  consumer2.setMessageListener(listenerA);  MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalTopicName()));  int index = 0;  while (index++ < 100) {
//              System.out.println("Start to send msg");TextMessage message = session.createTextMessage(index  + " message.");  producer.send(message); Thread.sleep(5L);}  } catch (Exception e) {  e.printStackTrace();  }  }@Testpublic void testNoralVirtualTopic() {  try {  ActiveMQConnectionFactory factoryA = getAMQConnectionFactory();  Queue queue = new ActiveMQQueue(getVirtualTopicConsumerName());  ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();  conn.start();  Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);  MessageConsumer consumer1 = session.createConsumer( queue );  MessageConsumer consumer2 = session.createConsumer( queue );  final AtomicInteger aint1 = new AtomicInteger(0);  MessageListener listenerA = new MessageListener() {  public void onMessage(Message message) {  try {  int index = aint1.incrementAndGet();System.out.println(index+ " => receive from "+ getNormalVirtualTopicName() +": " + message);
//                          if(index % 2==0){
//                              this.wait(1000L);
//                          }} catch (Exception e) {  e.printStackTrace();  }  }  };  consumer1.setMessageListener(listenerA);  consumer2.setMessageListener(listenerA);  MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalVirtualTopicName()));  int index = 0;  while (index++ < 100) {  TextMessage message = session.createTextMessage(index  + " message.");  producer.send(message);  }  } catch (Exception e) {  e.printStackTrace();  }  }@Testpublic void testVirtualTopic() {  try {  ActiveMQConnectionFactory factoryA = getAMQConnectionFactory();  Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA());  ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();  conn.start();  Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);  MessageConsumer consumer1 = session.createConsumer( queue );  MessageConsumer consumer2 = session.createConsumer( queue );  MessageConsumer consumer3 = session.createConsumer( new ActiveMQQueue(getVirtualTopicConsumerNameB()) );  final AtomicInteger aint1 = new AtomicInteger(0);  MessageListener listenerA = new MessageListener() {  public void onMessage(Message message) {  try {  System.out.println(aint1.incrementAndGet()  + " => receive from "+ getVirtualTopicConsumerNameA() +": " + message);  } catch (Exception e) {  e.printStackTrace();  }  }  };  consumer1.setMessageListener(listenerA);  consumer2.setMessageListener(listenerA);  final AtomicInteger aint2 = new AtomicInteger(0);  MessageListener listenerB = new MessageListener() {  public void onMessage(Message message) {  try {  System.out.println(aint2.incrementAndGet()  + " => receive from "+ getVirtualTopicConsumerNameB() +": " + message);  } catch (Exception e) {  e.printStackTrace();  }  }  };  consumer3.setMessageListener(listenerB);  MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));  int index = 0;  while (index++ < 10) {  TextMessage message = session.createTextMessage(index  + " message.");  producer.send(message);  }  } catch (Exception e) {  e.printStackTrace();  }  }private ActiveMQConnectionFactory getAMQConnectionFactory() {return new ActiveMQConnectionFactory(  "tcp://127.0.0.1:61616");}  protected static String getNormalTopicName() {  return "nomal.TEST";  }  protected static String getNormalVirtualTopicName() {  return "VirtualTopic.NORMAL";  }  protected static String getVirtualTopicName() {  return "VirtualTopic.TEST";  }  protected static String getVirtualTopicConsumerName() {  return "Consumer.normal.VirtualTopic.NORMAL";  }protected static String getVirtualTopicConsumerNameA() {  return "Consumer.A.VirtualTopic.TEST";  }  protected static String getVirtualTopicConsumerNameB() {  return "Consumer.B.VirtualTopic.TEST";  }  }  


AMQ 虚拟topic相关推荐

  1. RabbitMq系列(九):主题交换Topic Exchange

    系列文章 RabbitMq系列(一):服务器搭建 RabbitMq系列(二):最简单的例子 RabbitMq系列(三):工作队列 RabbitMq系列(四):消息确认和持久性 RabbitMq系列(五 ...

  2. Rabbitmq小书

    Rabbitmq小书 RabbitMQ 安装 Docker安装 Rabbitmq初识 AMQP 0.9.1 协议解析 AMQP协议简介 消息代理和他们所扮演的角色 AMQP 0-9-1 模型简介 AM ...

  3. RabbitMQ简介以及AMQP协议

    RabbitMQ能为你做些什么? 消息系统允许软件.应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步 ...

  4. RabbitMq详解之AMQP协议

    AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信.通过了解Rabb ...

  5. AMQP协议简介(源自官网的翻译)

    感谢作者的翻译:AMQP 0 9 1 Model Explained - RabbitMQ 中文 对AMQP有了更深刻的认识. 目录 AMQP 0-9-1 和 AMQP 模型高阶概述 AMQP是什么 ...

  6. ActiveMQ学习(七)

    2019独角兽企业重金招聘Python工程师标准>>> Destination的高级特性 1.组合destinations(虚拟queue) 指的是组合队列,就是虚拟destinat ...

  7. 张洋:浅析PageRank算法

    本文引自http://blog.jobbole.com/23286/ 很早就对Google的PageRank算法很感兴趣,但一直没有深究,只有个轮廓性的概念.前几天趁团队outing的机会,在动车上看 ...

  8. RabbitMQ入门教程——发布/订阅

    什么是发布订阅 发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象.这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态. 为了描述这种 ...

  9. ActiveMQ Destination高级特性

    1.1通配符 1"." 用于作为路径上名字间的分割符 2">" 用于递归的匹配任何以这个名字开始的Destination 3       "*& ...

最新文章

  1. 关于线程执行顺序的问题
  2. linux获取网卡的ip,Windows和Linux系统下获取多网卡的ip地址
  3. iphone保修期多久_卖疯了!开发区9.9元起换iphone原厂电池!
  4. nginx 配置SSL/HTTPS
  5. 经典题目螺旋方阵的详解
  6. git使用---工作区和暂存区
  7. webserver之使用数组实现阻塞队列
  8. html2canvas在安卓端微信里截取从相册打开的图片空白问题
  9. SAP 43亿美元收购Ariba 拓展云计算市场
  10. Win10 迅雷9精简优化设置方法
  11. Android根据包名获取APP名称
  12. selenium docker
  13. HTML班级网站实例(笔记2)
  14. 收到大量垃圾短信怎么办?如何屏蔽垃圾短信?
  15. 蓝牙连接测试系统软件,蓝牙测试软件官方版
  16. C#中的lock(锁)应用例子
  17. 裸奔的智能插座:博联Broadlink SP2/SP mini的分析、破解
  18. 121. 买卖股票的最佳时机_面试题63. 股票的最大利润_[找出数组中一个元素和它后面最大的元素的差值]
  19. Scrum立会报告+燃尽图(Beta阶段第二周第五次)
  20. ArcGIS批量出图操作流程(附练习数据下载)

热门文章

  1. Assignment | 05-week1 -Character level language model - Dinosaurus land
  2. 清新黄色背景薪酬绩效考核管理模版PPT模板
  3. 关于DIV中display属性误区以及牵扯出来的两个问题
  4. oracle 表分区,根据时间按月分区做一次记录
  5. HTML+CSS+JS`管理系统网站设计——学生信息管理系统模板 (13页) HTML+CSS+JavaScript html网页设计期末大作业_网页设计平时作业
  6. UE4反射原理(转)
  7. 大数据量的兴趣点如何在Cesium快速加载?(weixin公众号【图说GIS】)
  8. python二郎成长笔记(三)(matlab标定工具箱详解,旋转矩阵旋转向量,matlab标定数据传入opencv)
  9. Project2007操作手册(原创)
  10. macOS Final Cut Pro X 导入视频、剪切视频