我们使用的是direct(直接交换),而不是使用只能进行虚拟广播的 fanout(扇出交换),并且有可能选择性地接收日志。

虽然使用direct(直接交换)改进了我们的系统,但它仍然有局限性 - 它不能基于多个标准进行路由。

在我们的日志系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源来订阅日志。您可能从syslogunix工具中了解这个概念,该工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。

这会给我们带来很大的灵活性 - 我们可能想听听来自'cron'的关键错误以及来自'kern'的所有日志。

要在我们的日志系统中实现这一点,我们需要了解更复杂的topic (主题交换)。

Topic exchange 主题交换

发送到主题交换的消息不能具有任意routing_key- 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由密钥示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密钥中可以包含任意数量的单词,最多可达255个字节。

绑定密钥也必须采用相同的形式。主题交换背后的逻辑类似于直接交换- 使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是,绑定键有两个重要的特殊情况:

*(星号)可以替代一个单词。

#(hash)可以替换零个或多个单词。

在一个例子中解释这个是最容易的:

在这个例子中,我们将发送所有描述动物的消息。消息将与包含三个单词(两个点)的路由键一起发送。

路由键中的第一个单词将描述速度,第二个是颜色,第三个是物种:“。。”。

我们创建了三个绑定:Q1绑定了绑定键“* .orange。*”,Q2绑定了“*。*。rabbit”和“lazy。#”。

这些绑定可以概括为:

Q1对所有橙色动物感兴趣。

Q2希望听到关于兔子的一切,以及关于懒惰动物的一切。

路由密钥设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将同时发送给他们。另一方面,“quick.orange.fox”只会进入第一个队列,而“lazy.brown.fox”只会进入第二个队列。“lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定。“quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。

如果我们违反合同并发送带有一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不匹配任何绑定,将丢失。

另一方面,“lazy.orange.male.rabbit”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

主题交流

主题交换功能强大,可以像其他交易所一样。

当队列与“#”(哈希)绑定密钥绑定时 - 它将接收所有消息,而不管路由密钥 - 如扇出交换。

当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,主题交换的行为就像直接交换一样

把它们放在一起

我们将在日志记录系统中使用主题交换。我们将首先假设日志的路由键有两个词:“。”。

EmitLogTopic.java

importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classEmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static voidmain(String[] argv) {

Connection connection= null;

Channel channel= null;try{

ConnectionFactory factory= newConnectionFactory();

factory.setHost("localhost");

connection=factory.newConnection();

channel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String routingKey=getRouting(argv);

String message=getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey,null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

}catch(Exception e) {

e.printStackTrace();

}finally{if (connection != null) {try{

connection.close();

}catch(Exception ignore) {

}

}

}

}private staticString getRouting(String[] strings) {if (strings.length < 1)return "anonymous.info";return strings[0];

}private staticString getMessage(String[] strings) {if (strings.length < 2)return "Hello World!";return joinStrings(strings, " ", 1);

}private static String joinStrings(String[] strings, String delimiter, intstartIndex) {int length =strings.length;if (length == 0)return "";if (length

StringBuilder words= newStringBuilder(strings[startIndex]);for (int i = startIndex + 1; i < length; i++) {

words.append(delimiter).append(strings[i]);

}returnwords.toString();

}

}

ReceiveLogsTopic.java

importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throwsException {

ConnectionFactory factory= newConnectionFactory();

factory.setHost("localhost");

Connection connection=factory.newConnection();

Channel channel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String queueName=channel.queueDeclare().getQueue();if (argv.length < 1) {

System.err.println("Usage: ReceiveLogsTopic [binding_key]...");

System.exit(1);

}for(String bindingKey : argv) {

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer= newDefaultConsumer(channel) {

@Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties,byte[] body) throwsIOException {

String message= new String(body, "UTF-8");

System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

}

};

channel.basicConsume(queueName,true, consumer);

}

}

rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题)相关推荐

  1. 译:6.RabbitMQ Java Client 之 Remote procedure call (RPC,远程过程调用)

    在  译:2. RabbitMQ 之Work Queues (工作队列)  我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务. 但是如果我们需要在远程计算机上运行一个函数并等待结果呢?嗯,这 ...

  2. 译:1. RabbitMQ Java Client 之 Hello World

    这些教程介绍了使用RabbitMQ创建消息传递应用程序的基础知识.您需要安装RabbitMQ服务器才能完成教程 1. 打造第一个Hello World 程序 RabbitMQ是一个消息代理:它接受和转 ...

  3. (RabbitMQ) Java Client API Guide

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 本篇翻译的是RabbitMQ官方文档关 ...

  4. RabbitMq topic

    目录 概述 代码 结果 概述 上篇direct必须是生产者发布消息指定的routingKey和消费者在队列绑定时指定的routingKey完全相等时才能匹配到队列上.topic与direct不同,可以 ...

  5. (转) RabbitMQ学习之发布/订阅(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40887733 参考:http://blog.csdn.NET/lmj623565791/artic ...

  6. kerberos java_[Kerberos] Java client访问kerberos-secured cluste

    使用java client访问kerberos-secured cluster,最重要的是先从admin那里拿到可用的keytab文件,用来作认证.接下来就是调整连接的配置问题.以下先用连接hdfs为 ...

  7. mq 接口 java_Rabbitmq Java Client Api详解

    AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 基础概念快速入门 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbi ...

  8. RabbitMQ入门篇、介绍RabbitMQ常用的五种模式

    RabbitMQ 认识RabbitMQ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为 ...

  9. RabbitMQ狂神说笔记(RabbitMQ B站狂神说笔记、KuangStudy、学相伴飞哥)

    一. 引用文章 RabbitMQ狂神说笔记(B站狂神说笔记.KuangStudy.学相伴飞哥) RabbitMQ狂神说笔记(B站狂神说笔记.KuangStudy.学相伴飞哥)百度云盘地址,提取码:07 ...

最新文章

  1. python-for循环
  2. 计算机网络-TCP运输连接管理
  3. Android中点击按钮启动另一个Activity以及Activity之间传值
  4. C/C++头文件大全
  5. android 修改 dpi_鼠标DPI是什么意思?鼠标CPI和DPI之间区别知识科普
  6. 谷歌浏览器如何设置flash访问权限
  7. 阿里花17.7亿 买的才不是ofo一堆没用的单车
  8. 算法-第四版-练习1.2.3解答
  9. 手足之爱,平生一人:他们是中国历史上感情最好的一对兄弟 (苏轼苏辙,邓林武邓林飞)
  10. vue怎么改logo_vue项目添加网页logo
  11. 坑人的青旅乐山峨眉两日游
  12. 制作席慕蓉的诗html,席慕容诗歌集
  13. 乒乓球基本站姿站位和步伐
  14. 检验方法的验证、确认步骤及详细计算方法
  15. 计算机主机启动 显示器不动什么原因,主机开了电脑屏幕不亮怎么回事?电脑开机后显示器不亮的解决方案...
  16. 笔记本电脑通过手机蓝牙上网
  17. php怎么读取txt文件_PHP读取文件内容的五种方式
  18. 服务器固态支持热插拔吗,固态硬盘支持热插拔吗?
  19. 分布式系统常见问题总结
  20. android开发过程中遇到的问题

热门文章

  1. Java类类getDeclaredMethod()方法及示例
  2. 面试官:int和Integer有什么区别?为什么要有包装类?
  3. 90% 的人都会答错的面试题 == 和 equals 的区别
  4. 使用ab进行页面的压力测试
  5. 算法复习第三章分治法
  6. 自定义设置一个屏保程序
  7. zoj 1088 System Overload
  8. 配置设备作为DHCP 服务器(基于全局地址池)
  9. Python界面 PyQT可视化开发(python3+PyQt5+Qt Designer)
  10. CentOS7.0下Hadoop2.7.3的集群搭建