5.1 exchanges

5.1.1 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

5.1.2 类型

  • 直接(direct)

  • 主题(topic)

  • 标题(headers)

  • 扇出(fanout)

5.1.3 无名exchange

在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

channel.basicPublish("", queueName, null, message.getBytes());

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话。

5.2 临时队列

之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

创建出来之后长成这样:

5.3 绑定

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

5.4 Fanout

5.4.1 介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型。

5.4.2 实战

ReceiveLogs01 将接收到的消息打印在控制台

public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";
​public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一个临时的队列 队列的名称是随机的* 当消费者断开和该队列的连接时 队列自动删除*/String queueName = channel.queueDeclare().getQueue();//把该临时队列绑定我们的 exchange 其中 routingKey(也称之为 binding key)为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("控制台打印接收到的消息" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
​
}

ReceiveLogs02 将接收到的消息存储在磁盘

public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";
​public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一个临时的队列 队列的名称是随机的* 当消费者断开和该队列的连接时 队列自动删除*/String queueName = channel.queueDeclare().getQueue();//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息写到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);File file = new File("D:\\workspace\\study_log\\rabbitmq_info.txt");FileUtils.writeStringToFile(file, message, "UTF-8", true);System.out.println("数据写入文件成功");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
​}
}

EmitLog 发送消息给两个消费者接收

public class EmitLog {private static final String EXCHANGE_NAME = "logs";
​public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/*** 声明一个 exchange* 1.exchange 的名称* 2.exchange 的类型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("请输入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息" + message);}}}
}

启动生产者和消费者,发送几条消息,可以发现ReceiveLogs01和ReceiveLogs02都接收到了该消息。

5.5 Direct

5.5.1 介绍

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green。

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

5.5.2 多重绑定

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

5.5.3 实战

接收error类

public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";
​public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

接收warn、info类

public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";
​public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
​
}

消息发送类

public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";
​public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//创建多个 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info", "普通 info 信息");bindingKeyMap.put("warning", "警告 warning 信息");bindingKeyMap.put("error", "错误 error 信息");//debug 没有消费这接收这个消息 所有就丢失了bindingKeyMap.put("debug", "调试 debug 信息");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME, bindingKey, null,message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息:" + message);}}}
​
}

5.6 Topics

5.6.1 之前类型的问题

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。

尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型。

5.6.2 Topic的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的:

  • *(星号)可以代替一个单词

  • #(井号)可以替代零个或多个单词

5.6. 3 匹配案例

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

routing_key 消费情况
quick.orange.rabbit 被队列 Q1Q2 接收到
azy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2

当队列绑定关系是下列这种情况时需要引起注意:

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

RabbitMQ(四)交换机exchange相关推荐

  1. RabbitMQ 四种Exchange

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchang ...

  2. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  3. RabbitMQ中交换机的几种模式

    目录 简述 交换机模式 Fanout模式 Direct模式 Topic模式 Headers模式 简述 生产者不直接跟队列打交道,而是通过交换机.交换机类似于生产者和队列直接的一个管理者,它将生产的消息 ...

  4. RabbitMQ 四种类型发送接收数据方式

    1.基本用法 生产者 1 import pika 2 import sys 3 4 username = 'wt' #指定远程rabbitmq的用户名密码 5 pwd = '111111' 6 use ...

  5. RabbitMQ的交换机类型和工作模式

    RabbitMQ的交换机类型有四种 1.direct 直流交换机: 根据消息的路由键routingkey,将消息以完全匹配的方式路由到指定的队列中. 这里的匹配指的是消息本身携带的路由键和队列与交换机 ...

  6. RabbitMQ - 4种Exchange类型

    在rabbitmq中,exchange有4个类型:direct,topic,fanout,header. direct exchange 此类型的exchange路由规则很简单: exchange在和 ...

  7. 计算机网络交换机基本配置实验,计算机网络实验四交换机基本配置

    计算机网络实验四交换机基本配置 (7页) 本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦! 9.9 积分 实验报告实验四:交换机基本配置一. 实验目的和要求(1) ...

  8. RabbitMQ之交换机的四种类型和属性

    交换机主要包括如下4种类型: Direct exchange(直连交换机) Fanout exchange(扇型交换机) Topic exchange(主题交换机) Headers exchange( ...

  9. RabbitMQ第二话 -- Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现

    本文主要分享RabbitMQ exchange类型的功能和使用.RabbitMQ延时队列.一个springboot服务发送消息到多虚拟主机 1.RabbitMQ exchange exchange交换 ...

最新文章

  1. 使用core data
  2. 实战项目三:爬取QQ群中的人员信息
  3. @RequestMapping报404错误问题解决
  4. linux 开启 自动挂载U盘 权限的设置
  5. C#(WinForm)上传图片保存到数据库和从数据库读取图片显示到窗体
  6. 【转】博客美化(3)为博客添加一个漂亮的分享按钮
  7. OpenStack 如何跨版本升级
  8. SCOM 2012知识分享-9:配置警报解决状态
  9. Symfony 框架实战教程——第一天:创建项目(转)
  10. java之public class和class声明区别详解 (转)
  11. 作为JavaScript的“超集”,感受一下TypeScript 的那些黑魔法
  12. 生成html数据字典,PHP生成html格式数据字典
  13. 专为Oracle数据库恢复而生 - PRM
  14. (13.1.3.9)PMBOK之三:十大知识领域之采购管理
  15. 如何保持精力充沛_在家工作,如何管理一支精力充沛,精力充沛的日常团队,远程站起来...
  16. Redis主从复制(master/slaver)
  17. Mezzanine多site管理问题
  18. win2008系统 安装hplaserj1010打印机驱动程序
  19. JVM笔记(三)类与类加载
  20. 【CSRF漏洞-01】跨站请求伪造漏洞靶场实战

热门文章

  1. ❗HTML引入JavaScript的三种常用方式汇总❗
  2. 【django】配置MySQL数据库【3】
  3. Linux Kernel中gicv3实现:SPIs中断routing到指定的CPU
  4. [SUCTF2018]babyre [ACTF新生赛2020]fungame
  5. [羊城杯 2020]Bytecode [UTCTF2020]babymips
  6. Windows保护模式学习笔记(一)—— 段寄存器GDT表
  7. 【Web】HTTPS 引入http资源,混合内容
  8. 2、创建视图(CREATE VIEW)
  9. isAlive()方法的作用
  10. ACM入门之【ST表/RMQ】