在上一个教程中,我们提高了消息传递的灵活 我们使用direct交换而不是使用仅能够进行虚拟广播的fanout交换,

并且获得了基于路由key 有选择地接收消息的可能性。

虽然使用direct 交换改进了我们的系统,但它仍然有局限性 - 它不能基于多个标准进行路由。

在我们的消息传递系统中,我们可能不仅要根据路由key订阅队列,还要根据生成消息的源来订阅队列.

为了在我们的日志记录系统中实现这种灵活性,我们需要了解更复杂的topic交换。

Topic Exchange

发送到topic 交换的消息不能具有任意 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由密钥示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由密钥中可以包含任意数量的单词,最多可达255个字节。

绑定密钥也必须采用相同的形式。

topic 交换背后的逻辑 类似于direct 交换- 使用特定路由key发送的消息将被传递到与匹配绑定key绑定的所有队列。但是,绑定键有两个重要的特殊情况:

  • *(星号)可以替代一个单词。
  • #(hash)可以替换零个或多个单词。

在一个例子中解释这个是最容易的:

在这个例子中,我们将发送所有描述动物的消息。

消息将与包含三个单词(两个点)的路由键一起发送。路由键中的第一个单词将描述速度,第二个是颜色,第三个是物种:

<speed>.<colour>.<species>

我们创建了三个绑定

  • Q1   .orange.*
  • Q2   *.*.rabbit" and "lazy.#

这些绑定可以概括为:

  • Q1对所有orange橙色动物感兴趣。
  • Q2希望听到关于rabbit兔子的一切,以及关于lazy懒惰动物的一切。

路由密钥设置为“ quick.orange.rabbit ”的消息将传递到两个队列。

消息“ lazy.orange.elephant ”也将同时发送给他们。

另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。

“ lazy.pink.rabbit ”将仅传递到第二个队列一次,即使它匹配两个绑定。

“ quick.brown.fox ”与任何绑定都不匹配,因此它将被丢弃。

如果我们违反约定并发送带有一个或四个单词的消息,例如“ orange ”或“ quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不匹配任何绑定,将丢失。

另一方面,“ lazy.orange.male.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

Topic Exchange

topic exchange 功能强大,可以像其他exchange一样。

当队列与“ # ”(哈希)绑定密钥绑定时 - 它将接收所有消息,而不管路由密钥 - 如扇出交换。

当特殊字符“ * ”(星号)和“ # ”(哈希)未在绑定中使用时,主题交换的行为就像直接交换一样

放在一起

主类

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner;@SpringBootApplication
@EnableScheduling
public class RabbitMq0x05SpringAmqpTopicSampleApplication {public static void main(String[] args) {SpringApplication.run(RabbitMq0x05SpringAmqpTopicSampleApplication.class, args);}@Profile("usage_message")@Beanpublic CommandLineRunner usage() {return new CommandLineRunner() {@Overridepublic void run(String... arg0) throws Exception {System.out.println("This app uses Spring Profiles to control its behavior.\n");System.out.println("Sample usage: java -jar "+ "RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar "+ "--spring.profiles.active=topics"+ ",sender");}};}@Profile("!usage_message")@Beanpublic CommandLineRunner tutorial() {return new RabbitAmqpTutorialsRunner();}
}

Tut5Config.java

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;import com.xingyun.springamqp.business.Tut5Receiver;
import com.xingyun.springamqp.business.Tut5Sender;@Profile({"tut5","topics"})
@Configuration
public class Tut5Config {@Beanpublic TopicExchange topic() {return new TopicExchange("tut.topic");}@Profile("receiver")private static class ReceiverConfig {@Beanpublic Tut5Receiver receiver() {return new Tut5Receiver();}@Beanpublic Queue autoDeleteQueue1() {return new AnonymousQueue();}@Beanpublic Queue autoDeleteQueue2() {return new AnonymousQueue();}@Beanpublic Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) {return BindingBuilder.bind(autoDeleteQueue1).to(topic).with("*.orange.*");}@Beanpublic Binding binding1b(TopicExchange topic, Queue autoDeleteQueue1) {return BindingBuilder.bind(autoDeleteQueue1).to(topic).with("*.*.rabbit");}@Beanpublic Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) {return BindingBuilder.bind(autoDeleteQueue2).to(topic).with("lazy.#");}}@Profile("sender")@Beanpublic Tut5Sender sender() {return new Tut5Sender();}}

RabbitAmqpTutorialsRunner.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;public class RabbitAmqpTutorialsRunner implements CommandLineRunner {/*** application.properties文件中配置tutorial.client.duration=10000 需要* */@Value("${tutorial.client.duration:0}")private int duration;@Autowiredprivate ConfigurableApplicationContext ctx;@Overridepublic void run(String... args) throws Exception {// TODO Auto-generated method stubSystem.out.println("Ready ... running for " + duration + "ms");Thread.sleep(duration);ctx.close();}}

Tut5Sender.java

import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;public class Tut5Sender {@Autowiredprivate RabbitTemplate template;@Autowiredprivate TopicExchange topic;private int index;private int count;private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox","lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {StringBuilder builder = new StringBuilder("Hello to ");if (++this.index == keys.length) {this.index = 0;}String key = keys[this.index];builder.append(key).append(' ');builder.append(Integer.toString(++this.count));String message = builder.toString();template.convertAndSend(topic.getName(), key, message);System.out.println(" [x] Sent '" + message + "'");}}

Tut5Receiver.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;public class Tut5Receiver {@RabbitListener(queues = "#{autoDeleteQueue1.name}")public void receive1(String in) throws InterruptedException {receive(in, 1);}@RabbitListener(queues = "#{autoDeleteQueue2.name}")public void receive2(String in) throws InterruptedException {receive(in, 2);}public void receive(String in, int receiver) throws InterruptedException {StopWatch watch = new StopWatch();watch.start();System.out.println("instance " + receiver + " [x] Received '" + in + "'");doWork(in);watch.stop();System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");}private void doWork(String in) throws InterruptedException {for (char ch : in.toCharArray()) {if (ch == '.') {Thread.sleep(1000);}}}
}

查看用法

java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar

启动生产者

java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=topics,sender

启动消费者

java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=topics,receiver

转载于:https://www.cnblogs.com/xingyunblog/p/10007231.html

译: 5. RabbitMQ Spring AMQP 之 Topic 主题相关推荐

  1. 译: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 发布和订阅

    在第一篇教程中,我们展示了如何使用start.spring.io来利用Spring Initializr创建一个具有RabbitMQ starter dependency的项目来创建spring-am ...

  2. (八)RabbitMQ消息队列-通过Topic主题模式分发消息

    前两章我们讲了RabbitMQ的direct模式和fanout模式,本章介绍topic主题模式的应用.如果对direct模式下通过routingkey来匹配消息的模式已经有一定了解那fanout也很好 ...

  3. RabbitMQ消息分发模式----Topic主题模式

    前面虽然有Direct类型和Fanout的转换器.但它们仍然有一定的局限性--不能根据多重条件进行路由选择. Topic exchange(主题转发器) 发送给主题转发器的消息不能是任意设置的选择键, ...

  4. rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题)

    我们使用的是direct(直接交换),而不是使用只能进行虚拟广播的 fanout(扇出交换),并且有可能选择性地接收日志. 虽然使用direct(直接交换)改进了我们的系统,但它仍然有局限性 - 它不 ...

  5. php stomp rabbitmq,docker环境下的RabbitMQ部署,Spring AMQP使用

    AMQP简介 AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦和 ...

  6. RabbitMQ(6)-Spring AMQP,Spring集成RabbitMQ

    2019独角兽企业重金招聘Python工程师标准>>> 一.Qucik Start 1.rabbitmq-producer.xml <?xml version="1. ...

  7. day72 JavaWeb框架阶段——RabbitMQ消息队列【了解常见的MQ产品,了解RabbitMQ的5种消息模型,会使用Spring AMQP】

    文章目录 0.学习目标 1.RabbitMQ 1.1.搜索与商品服务的问题 1.2.消息队列(MQ) 1.2.1.什么是消息队列 1.2.2.AMQP和JMS 1.2.3.常见MQ产品 1.2.4.R ...

  8. 深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议

    前言 消息队列在现今数据量超大,并发量超高的系统中是十分常用的.本文将会对现时最常用到的几款消息队列框架 ActiveMQ.RabbitMQ.Kafka 进行分析对比. 详细介绍 RabbitMQ 在 ...

  9. spring amqp rabbitmq fanout配置

    基于spring amqp rabbitmq fanout配置如下: 发布端 <rabbit:connection-factory id="rabbitConnectionFactor ...

最新文章

  1. MYSQL备份与恢复精华篇
  2. oracle 某天 减一天,案例一:shell脚本指定日期减去一天
  3. 为什么c相电路在前面_三相电路分析
  4. QPS、TPS、并发用户数、吞吐量的关系
  5. 精诚合作 共创未来——阿里云数据智能合作策略介绍
  6. MS Sql中取每个表的大小,行数
  7. exchange server 2010 OWA 附件功能只支持IE浏览器
  8. Java实现拖拉/滑动图片验证码
  9. 飞机大战python素材_python飞机大战源码和素材
  10. 如果找活跃IP段!抓肉鸡必须的!
  11. 《女士品茶》读书笔记
  12. 多移动机器人(阿克曼小车)在gazebo中的配置
  13. word使用上角标超链接到引用的参考论文
  14. 淘宝6.18叠猫猫赚猫币自动生成
  15. 刘夏真的简历中国科学院计算机所,刘夏_广西医科大学研究生导师信息
  16. 菲尔人格测试今天你测了吗?
  17. 比子弹速度快十倍的导弹是怎么被拦截的?
  18. Grammer -- 助动词
  19. 强制卸载VS2013
  20. 安装 ABAQUS2020时出错

热门文章

  1. Ubuntu使用tzselect修改时区
  2. open***在Windows客户端权限那些事
  3. 设计模式系列(一)单例模式
  4. Struts2教程2:处理一个form多个submit
  5. Shell之sed用法 转滴
  6. 自定义评分器Similarity,提高搜索体验
  7. 今天收到上海某公司的全英文笔试题(some question of interview )
  8. netty源码深度分析
  9. 【Latex】一些使用
  10. mybatis的注解开发之三种动态sql