http://blog.csdn.net/zhu_tianwei/article/details/40887775

参考:http://blog.csdn.NET/lmj623565791/article/details/37706355

direct类型的消息通过绑定键转发到队列,但是存在一些局限性:它不能够基于多重条件进行路由选择,有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅,这就需要主题类型的转发器来实现。

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。

1.发送日志消息SendLogTopic,发送4个消息绑定不同的绑定键, "kernal.info", "cron.warning",  "auth.info", "kernel.critical"

[java] view plaincopy print?
  1. package cn.slimsmart.rabbitmq.demo.topic;
  2. import java.util.UUID;
  3. import com.rabbitmq.client.AMQP;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. //发送消息端
  8. public class SendLogTopic {
  9. private static final String EXCHANGE_NAME = "topic_logs";
  10. public static void main(String[] args) throws Exception {
  11. // 创建连接和频道
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.101.174");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. factory.setPort(AMQP.PROTOCOL.PORT);
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 声明转发器
  20. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  21. //定义绑定键
  22. String[] routing_keys = new String[] { "kernal.info", "cron.warning",
  23. "auth.info", "kernel.critical" };
  24. for (String routing_key : routing_keys)
  25. {
  26. //发送4条不同绑定键的消息
  27. String msg = UUID.randomUUID().toString();
  28. channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
  29. .getBytes());
  30. System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
  31. }
  32. channel.close();
  33. connection.close();
  34. }
  35. }

2.定义接收kernel.*消息的消费者

[java] view plaincopy print?
  1. package cn.slimsmart.rabbitmq.demo.topic;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.QueueingConsumer;
  7. //接收kernel.*消息
  8. public class ReceiveLogsTopicForKernel {
  9. private static final String EXCHANGE_NAME = "topic_logs";
  10. public static void main(String[] args) throws Exception {
  11. // 创建连接和频道
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.101.174");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. factory.setPort(AMQP.PROTOCOL.PORT);
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 声明转发器
  20. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  21. // 随机生成一个队列
  22. String queueName = channel.queueDeclare().getQueue();
  23. //接收所有与kernel相关的消息
  24. channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
  25. System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
  26. QueueingConsumer consumer = new QueueingConsumer(channel);
  27. channel.basicConsume(queueName, true, consumer);
  28. while (true)
  29. {
  30. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  31. String message = new String(delivery.getBody());
  32. String routingKey = delivery.getEnvelope().getRoutingKey();
  33. System.out.println(" [x] Received routingKey = " + routingKey
  34. + ",msg = " + message + ".");
  35. }
  36. }
  37. }

3.接收*.critical消息消费者

[java] view plaincopy print?
  1. package cn.slimsmart.rabbitmq.demo.topic;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.QueueingConsumer;
  7. //接收*.critical消息
  8. public class ReceiveLogsTopicForCritical {
  9. private static final String EXCHANGE_NAME = "topic_logs";
  10. public static void main(String[] args) throws Exception {
  11. // 创建连接和频道
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.101.174");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. factory.setPort(AMQP.PROTOCOL.PORT);
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 声明转发器
  20. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  21. // 随机生成一个队列
  22. String queueName = channel.queueDeclare().getQueue();
  23. // 接收所有与kernel相关的消息
  24. channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
  25. System.out
  26. .println(" [*] Waiting for critical messages. To exit press CTRL+C");
  27. QueueingConsumer consumer = new QueueingConsumer(channel);
  28. channel.basicConsume(queueName, true, consumer);
  29. while (true)
  30. {
  31. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  32. String message = new String(delivery.getBody());
  33. String routingKey = delivery.getEnvelope().getRoutingKey();
  34. System.out.println(" [x] Received routingKey = " + routingKey
  35. + ",msg = " + message + ".");
  36. }
  37. }
  38. }

启动2个消费者,再启动发送4类消息生产者。观察接收到的消息,都收到对应的消息。可以看出使用topic类型的转发器,成功实现了多重条件选择的订阅。

转载于:https://www.cnblogs.com/telwanggs/p/7124621.html

(转)RabbitMQ学习之主题topic(java)相关推荐

  1. (转) RabbitMQ学习之helloword(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40835555 amqp-client:http://www.rabbitmq.com/java-c ...

  2. (转) RabbitMQ学习之工作队列(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40887717 参考:http://blog.csdn.NET/lmj623565791/artic ...

  3. (转)RabbitMQ学习之路由(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40887755 参考:http://blog.csdn.NET/lmj623565791/artic ...

  4. 使用java实现MQTT协议客户端的接收、发布消息和订阅、退订主题topic

    记录一下我实习的第一个任务,学习MQTT协议 首先呢得了解MQTT是什么,这里推荐一个我学习MQTT的中文文档 MQTT协议的基于TCP/IP协议的一个物联网协议,有几个概念必须要弄懂得主题(topi ...

  5. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

  6. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  7. RabbitMQ学习笔记

    目录 一.MQ 的相关概念 MQ是什么? MQ三大优势 MQ的劣势 MQ 的产品 RabbitMQ核心 JMS 安装 二.HelloWorld 三.Work Queues(轮训) 消息应答 Rabbi ...

  8. 乐行学院RabbitMQ学习教程 第一章 RabbitMQ介绍(可供技术选型时使用)

    乐行学院RabbitMQ学习教程 第一章 RabbitMQ介绍 RabbitMQ介绍 1.RabbitMQ技术简介 2.RabbitMQ其他扩展插件 2.1监控工具rabbitmq-managemen ...

  9. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

最新文章

  1. 导入外部项目无法识别为Web项目无法部署到tomcat
  2. 实现SELECT的全选,反选,AB选的JAVASCRIPT代码
  3. Sql Server 邮件日志 操作
  4. POJ 2823-Sliding Window单调队列解题报告
  5. IOS开发基础之解压缩文件技术
  6. tensorflow环境下的识别食物_在win10环境下进行tensorflow物体识别(ObjectDetection)训练...
  7. 使用gdb和core dump迅速定位段错误
  8. Springboot+idea的一个bug(Unregistering JMX-exposed beans on shutdown)
  9. 用gdb来学习c语言(linux环境下)
  10. Windows系统查看svg缩略图插件
  11. python基础(一):入门必备知识
  12. movielens电影数据分析
  13. Java获取图片大小 及 尺寸 图片压缩 jpg压缩
  14. 6.0系统机器Xposed框架安装经验
  15. Markdown链接及脚注
  16. Mysql查询性能优化-善用Explain语句
  17. 如何重设或更改Verizon FIOS路由器的密码
  18. LoadRunner 常用函数大全+1
  19. Qt 实现Unicode字符表情包显示到界面 Emoji
  20. 【开源毕设】一款精美的家校互动APP分享——爱吖校推 [你关注的,我们才推](持续开源更新2)

热门文章

  1. 关于嵌入式可执行程序,你了解多少?
  2. 用java编写一个课表串口,安卓课程表源代码
  3. s7-300 400plc应用技术_西门子S7300/400顺序功能图设计教程,看完豁然开朗!
  4. Arduino-ESP8266环境配置及点灯
  5. ci php做多图上传,CodeIgniter快速实现图片上传
  6. Keil(MDK-ARM-STM32)系列教程(一)_新建软件工程详细过程
  7. Linux内核分析 - 网络[七]:NetFilter
  8. varnish关于Grace mode和Saint mode这两中模式配置
  9. python + opencv: kalman 跟踪
  10. 服务器双cpu性能强不,双CPU的电脑用起来,性能和功耗都是原来的两倍?