内容翻译自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

RabbitMQ(三):Exchange交换器--fanout

RabbitMQ(四):Exchange交换器--direct

RabbitMQ(五):Exchange交换器--topic

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

RabbitMQ(七):常用方法说明 与 学习小结


Topics:

在上一篇博客中我们改进了我们的日志系统:使用direct路由器替代了fanout路由器,从而可以选择性地接收日志。

尽管使用direct路由器给我们的日志系统带来了改进,但仍然有一些限制:不能基于多种标准进行路由。

在我们的日志系统中,我们可能不仅需要根据日志的严重级别来接收日志,而且有时想基于日志来源进行路由。如果你知道syslog这个Unix工具,你可能了解这个概念,sysylog会基于日志严重级别(info/warn/crit...)和设备(auth/cron/kern...)进行日志分发。

如果我们可以监听来自corn的错误日志,同时也监听kern的所有日志,那么我们的日志系统就会更加灵活。

为了实现这个功能,我们需要了解一个复杂的路由器:topic路由器。


主题路由器(Topic Exchange):

发送到topic路由器的消息的路由键routing_key不能任意给定:它必须是一些单词的集合,中间用点号.分割。这些单词可以是任意的,但通常会体现出消息的特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。这些路由键可以包含很多单词,但路由键总长度不能超过255个字节。

绑定键binding key也必须是这种形式。topic路由器背后的逻辑与direct路由器类似:以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:

(1)*(星号)仅代表一个单词

(2)#(井号)代表任意个单词

下图可以很好地解释这两个符号的含义:

对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:<speed>.<colour>.<species>

我们创建了三个绑定,Q1的绑定键为*.orange.*,Q2的绑定键有两个,分别是*.*.rabbitlazy.#

上述绑定关系可以描述为:

(1)Q1关注所有颜色为orange的动物。

(2)Q2关注所有的rabbit,以及所有的lazy的动物。

如果一个消息的路由键是quick.orange.rabbit,那么Q1和Q2都可以接收到,路由键是lazy.orange.elephant的消息同样如此。但是,路由键是quick.orange.fox的消息只会到达Q1,路由键是lazy.brown.fox的消息只会到达Q2。注意,路由键为lazy.pink.rabbit的消息只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。

假如我们不按常理出牌:发送一个路由键只有一个单词或者四个单词的消息,像orange或者quick.orange.male.rabbit,这样的话,这些消息因为不和任意绑定键匹配,都将会丢弃。但是,lazy.orange.male.rabbit消息因为和lazy.#匹配,所以会到达Q2,尽管它包含四个单词。

Topic exchange::

Topic exchange非常强大,可以实现其他任意路由器的功能。

当一个队列以绑定键#绑定,它将会接收到所有的消息,而无视路由键(实际是绑定键#匹配了任意的路由键)。----这和fanout路由器一样了。

*#这两个特殊的字符不出现在绑定键中,Topic exchange就会和direct exchange类似了。


放在一块:

我们将会在我们的日志系统中使用主题路由器Topic exchange,并假设所有的日志消息以两个单词<facility>.<severity>为路由键。

代码和上个教程几乎一样。

生产者EmitLogTopic.java


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) {Connection connection = null;Channel channel = null;try {//建立连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();//声明路由器和路由器类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//定义路由键和消息String routingKey = "";String message = "msg.....";//发布消息channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");} catch (Exception e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (Exception ignore) {}}}}
}

消费者ReceiveLogsTopic.java

import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {//建立连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由器和路由器类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();//String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//监听消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);}
}

现在,可以动手实验了。
开头提到的:日志严重级别info/warn/crit...和设备auth/cron/kern...

消费者:
String bingingKeys[] = {""}改为String bingingKeys[] = {"#"},启动第一个消费者;
再改为String bingingKeys[] = {"kern.*"},启动第二个消费者;
再改为String bingingKeys[] = {"*.critical"},启动第三个消费者;
再改为String bingingKeys[] = {"kern.*", "*.critical"},启动第四个消费者。

生产者,发送多个消息,如:
路由键为kern.critical 的消息:A critical kernel error
路由键为kern.info 的消息:A kernel info
路由键为kern.warn 的消息:A kernel warning
路由键为auth.critical 的消息:A critical auth error
路由键为cron.warn 的消息:A cron waning
路由键为cron.critical 的消息:A critical cron error

试试最后的结果:第一个消费者将会接收到所有的消息,第二个消费者将会kern的所有严重级别的日志,第三个消费者将会接收到所有设备的critical消息,第四个消费者将会接收到kern设备的所有消息和所有critical消息。

下篇博客中,我们将会学习如何让消息往返,以此来作为一个远程过程调用(RPC)。


说明

①与原文略有出入,如有疑问,请参阅原文

②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。

RabbitMQ(五):Exchange交换器--topic相关推荐

  1. springboot rabbitmq direct exchange和topic exchange 写法上关于路由键的区别

    这是direct exchange写法中消息发送写法,可见下图红色框中路由键是queue队列中定义的路由键 这是topic exchange写法中消息发送写法,可见下图红色框中路由键是exchange ...

  2. RabbitMQ(四):Exchange交换器--direct

    内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...

  3. RabbitMQ(三):Exchange交换器--fanout

    内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...

  4. RabbitMQ入门学习系列(六) Exchange的Topic类型

    快速阅读 介绍exchange的topic类型,和Direct类型相似,但是增加了"."和"#"的匹配.比Direct类型灵活 Topic消息类型 特点是:to ...

  5. RabbitMQ的三大交换器详解

    pom文件都是相同的 <?xml version="1.0" encoding="UTF-8"?> <project xmlns=" ...

  6. RabbitMQ五种工作模式

    RabbitMQ五种工作模式 1.简单队列 一个生产者对应一个消费者!! 2.work 模式 一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!! 轮询分发就是将消息队列中的消息,依次 ...

  7. RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

    文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...

  8. RabbitMQ主题模式(Topic)

    主题模式(Topic) 主体模式其实就是在路由模式的基础上,支持了对key的通配符匹配(星号以及井号),以满足更加复杂的消息分发场景. "#" : 匹配一个或者多个 "* ...

  9. RabbitMQ入门学习系列(五) Exchange的Direct类型

    快速阅读 利用Exchange的Direct类型,实现对队列的过滤,消费者启动以后,输入相应的key值,攻取该key值对应的在队列中的消息 . 从一节知道Exchange有四种类型 Direct,To ...

最新文章

  1. win10 anaconda 下pcl库的安装
  2. 用户故事与敏捷方法pdf
  3. 将 iOS 应用的体积控制在 20MB 以内对于其下载量有很明显的影响吗?
  4. WebService中文件传输
  5. [转载] java提取字符串中的字母数字
  6. hive获取mysql里的文件_apache – 如何在hive中获取数据库用户名和密码
  7. 面向对象,面向服务,面向组件三种编程模式有什么区别
  8. 【Flink】ValidationException: Could not find any factory for identifier json
  9. python的根号运算_python怎么表示根号运算
  10. 怎么解决IPA processing failed错误, 用xcode 11打包
  11. python计算最大公约数函数_python如何分享解两数的最大公约数 python代码 最大公约和最小公倍数数计算?...
  12. CAD关于线型操作添加线型(com接口c#语言)
  13. ora 01033 解决
  14. 方法重写和重载的规则
  15. 如何搭建自己的网站别人可以直接访问
  16. 【Java】GUI图形化界面中,setBounds()中参数的含义
  17. TTS交易所的STO(证券化通证)、资产证券化(ABS)与ICO之间的区别
  18. mysql的between and的用法
  19. 联想 ThinkPadE480无法调节亮度
  20. 身份认证技术的产业发展

热门文章

  1. 九十五、轻松搞定Python中的Excel办公自动化系列
  2. 数据科学篇| Seaborn库的使用(四)
  3. Vue 学习第八天
  4. 一文详解神经网络与激活函数的基本原理
  5. PW Live 直播 | 北邮博士生纪厚业:异质图神经网络之模型和应用
  6. 文档扫描:深度神经网络在移动端的实践
  7. 综述 | 知识图谱向量化表示
  8. 计算平方根【牛顿迭代法】
  9. nginx: [error] invalid PID number in /run/nginx.pid
  10. 机票预定系统类图_电商系统延时任务机制源码分享