RabbitMQ交换机(Fanout、Direct、Topic)三种模式详解
一. 交换机
1.1 Exchanges
1.1.1 Exchanges概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
1.1.2 Exchanges的类型
Exchanges总共有以下类型
- 直接(direct)
- 主题(topic)
- 标题(header)
- 扇出(fanout)
1.1.3 无名exchange
无名类型的exchange为默认类型,通常使用空字符串进行标识
在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的 原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
之前我们使用发送消息的代码为
//其中第一个参数就是exchange交换机的无名类型,也就是默认类型
channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的
1.2 临时队列
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称 的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连 接,队列将被自动删除。
临时队列和非临时队列主要看队列是否持久化
创建临时队列的方式是我们在创建信道的时候不给队列设置名称即可
String queueName = channel.queueDeclare().getQueue();
这个时候我们就创建了一个随机字符串的队列名称
1.3 绑定(bindings)
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
1.4 Fanout
1.4.1 Fanout介绍
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中。系统中默认有些exchange 类型
就类似于村里面的大喇叭一样,只要发出了,所有人都可以听得见
1.4.2 Fanout代码实现
消费者01
package com.rabbitmq.eason.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;/*** 扇出类型的消费者1** @author HuangSiYuan* @since 2021-07-09*/
public class Consumer01 {//交换机的名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明一个临时队列/*** 生成一个临时的队列、队列的名称是随机的* 当消费者断开与队列的连接的时候, 队列就自动删除了* */String queueName = channel.queueDeclare().getQueue();//绑定交换机/** 参数一: 队列名称* 参数二: 交换机名称* 参数三: 路由key* */channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("C1等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag,message)-> {System.out.println("C1成功接收到消息: " + new String(message.getBody()));/** 消息手动应答* */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = consumerTag -> {System.out.println("C1接收失败的消息的队列标记值: " + consumerTag);};//消费者取消消息时触发/*** 参数一: 队列名称* 参数二: 是否自动应答* 参数三: 成功确认消息回调函数* 参数四: 失败消息回调函数* */boolean autoAck = false;channel.basicConsume(queueName,autoAck,deliverCallback,cancelCallback);}}
消费者02
package com.rabbitmq.eason.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;import java.util.Arrays;/*** 扇出类型的消费者2** @author HuangSiYuan* @since 2021-07-09*/
public class Consumer02 {//交换机的名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明一个临时队列/*** 生成一个临时的队列、队列的名称是随机的* 当消费者断开与队列的连接的时候, 队列就自动删除了* */String queueName = channel.queueDeclare().getQueue();//绑定交换机/** 参数一: 队列名称* 参数二: 交换机名称* 参数三: 路由key* */channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("C2等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag,message)-> {System.out.println("C2成功接收到消息: " + new String(message.getBody()));/** 消息手动应答* */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = consumerTag -> {System.out.println("C2接收失败的消息的队列标记值: " + consumerTag);};//消费者取消消息时触发/*** 参数一: 队列名称* 参数二: 是否自动应答* 参数三: 成功确认消息回调函数* 参数四: 失败消息回调函数* */boolean autoAck = false;channel.basicConsume(queueName,autoAck,deliverCallback,cancelCallback);}}
生产者实现
package com.rabbitmq.eason.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.eason.utils.RabbitMQUtils;import java.nio.charset.StandardCharsets;
import java.util.Scanner;/*** Fanout交换机的生产者* 发消息给交换机** @author HuangSiYuan* @since 2021-07-09*/
public class Produce {//交换机的名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();//发送消息/** 参数一: 交换机名称* 参数二: 交换机路由key值* 参数三: 额外参数* 参数四: 消息* */channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));}}}
1.5 Direct
对于开始Direct之前我们先回顾一下bindings
绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
绑定之后的意义由其交换类型决定。
1.5.1 Direct exchange 介绍
我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去
例如上图中我们的交换机类型时direct类型,交换机中绑定了两个队列,一个是Q1队列一个是Q2队列,Q1绑定的routingKey键位orange,Q2绑定的routingKey键为black和green
在这种绑定的情况下,我们发送消息的时候我们都会绑定一个路由键,如果某一条消息绑定的路由键为orange,那么交换机就会将这条信息传送给Q1队列,如果绑定的路由键为black或者green,那么此时交换机就会将这些消息传送给Q2队列进行处理
1.5.2 多重绑定
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多
1.5.3 代码实现
交换机接收01编写
package com.rabbitmq.eason.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;/*** Direct交换机01** @author HuangSiYuan* @since 2021-07-09*/
public class Consumer01 {public static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列channel.queueDeclare("console",false,false,false,null);//绑定交换机channel.queueBind("console",EXCHANGE_NAME,"info");channel.queueBind("console",EXCHANGE_NAME,"warning");DeliverCallback deliverCallback = (consumerTag, message)-> {System.out.println("console成功接收到消息: " + new String(message.getBody()));/** 消息手动应答* */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = consumerTag -> {System.out.println("C1接收失败的消息的队列标记值: " + consumerTag);};channel.basicConsume("console",true,deliverCallback,cancelCallback);}
}
交换机02编写
package com.rabbitmq.eason.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;/*** Direct交换机01** @author HuangSiYuan* @since 2021-07-09*/
public class Consumer02 {public static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列channel.queueDeclare("disk",false,false,false,null);//绑定交换机channel.queueBind("disk",EXCHANGE_NAME,"error");DeliverCallback deliverCallback = (consumerTag, message)-> {System.out.println("disk队列名成功接收到消息: " + new String(message.getBody()));/** 消息手动应答* */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = consumerTag -> {System.out.println("C1接收失败的消息的队列标记值: " + consumerTag);};channel.basicConsume("disk",true,deliverCallback,cancelCallback);}
}
生产者编写
package com.rabbitmq.eason.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.eason.utils.RabbitMQUtils;import java.nio.charset.StandardCharsets;
import java.util.Scanner;/*** 指定交换机生产者** @author HuangSiYuan* @since 2021-07-09*/
public class Produce {public static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//这个类型的交换机只是多了一个用于标识的RoutingKey标识某一个交换机//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();//发送消息//此时所有发送都会发送给队列console/*channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8));*///此时所有发送都会发送给队列diskchannel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes(StandardCharsets.UTF_8));}}}
可以根据RoutingKey的不同传输给不同的队列进行消息的处理
1.6 Topic
当我们使用了Direct交换机能够大大的活跃了我们分配消息给不同队列的消息处理,但是我们还是没有办法完全的完成我们的需求,尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型
1.6.1 Topic主题队列的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
- *(星号)可以代替一个单词
- #(井号)可以替代零个或多个单词
1.6.2 主题队列匹配规则
如上图所示
Q1队列绑定的是* .orange. * ,表示中间带 orange 带 3 个单词的字符串( * .orange. )
Q2队列绑定的是*. *.rabbit和lazy.#
*最后一个单词是 rabbit 的 3 个单词(*. .rabbit)
第一个单词是 lazy 的多个单词(lazy.#)
测试routingKey | 匹配的队列 |
---|---|
quick.orange.rabbit | 被队列 Q1Q2 接收到 |
lazy.orange.elephant | 被队列 Q1Q2 接收到 |
quick.orange.fox | 被队列 Q1 接收到 |
lazy.brown.fox | 被队列 Q2 接收到 |
azy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 |
quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
lazy.orange.male.rabbit | 是四个单词但匹配 Q2 |
当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
1.6.3 主题队列代码实现
生产者
package com.rabbitmq.eason.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.eason.utils.RabbitMQUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;/*** 主题队列生产者** @author HuangSiYuan* @since 2021-07-09*/
public class Produce {public static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);Map<String,String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");bindingKeyMap.forEach((key1, value) -> {try {//发送消息channel.basicPublish(EXCHANGE_NAME,key1,null,value.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发送消息: " + value);} catch (IOException e) {e.printStackTrace();}});}}
主题队列交换机01
package com.rabbitmq.eason.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;/*** 声明主题交换机及相关队列** @author HuangSiYuan* @since 2021-07-09*/
public class TopicExchange01 {public static final String EXCHANGE_NAME = "topic_exchange";//接收消息public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String orangeKey = "*.orange.*";String queueName = "q1";//声明一个队列channel.queueDeclare(queueName,false,false,false,null);//绑定队列并且指定routingKeychannel.queueBind(queueName,EXCHANGE_NAME,orangeKey);DeliverCallback deliverCallback = ((consumerTag, message) -> {System.out.println("交换机成功收到消息: " + new String(message.getBody()) + ",获取的键为: " + message.getEnvelope().getRoutingKey());});CancelCallback cancelCallback = consumerTag -> {System.out.println("交换机失败消息标记: " + consumerTag);};//接收消息channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}
主题队列交换机02
package com.rabbitmq.eason.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;/*** 声明主题交换机及相关队列** @author HuangSiYuan* @since 2021-07-09*/
public class TopicExchange02 {public static final String EXCHANGE_NAME = "topic_exchange";//接收消息public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String rabbitKey = "*.*.rabbit";String lazyKey = "lazy.#";String queueName = "q2";//声明一个队列channel.queueDeclare(queueName,false,false,false,null);//绑定队列并且指定routingKeychannel.queueBind(queueName,EXCHANGE_NAME,rabbitKey);channel.queueBind(queueName,EXCHANGE_NAME,lazyKey);DeliverCallback deliverCallback = ((consumerTag, message) -> {System.out.println("交换机成功收到消息: " + new String(message.getBody())+ ",绑定的键为: " + message.getEnvelope().getRoutingKey());});CancelCallback cancelCallback = consumerTag -> {System.out.println("交换机失败消息标记: " + consumerTag);};//接收消息channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}
小结
以上就是本次博客的全部内容,主要介绍有RabbitMQ交换机的三种模式,广播模式(Fanout),直接交换模式(Direct),主题模式(Topic)三种,其中主题模式使用占比较大,因为相较于其他两种模式来说主题模式较为灵活,可用性更高
大家在看本篇博客的时候,如果有什么观点有误,或者有些需要改正的地方大家可以多多提出,大家一起学习,共同进步!!!
RabbitMQ交换机(Fanout、Direct、Topic)三种模式详解相关推荐
- 【夏目鬼鬼分享】StringBoot整合RabbitMQ,使用Direct、Fanout、Topic三种模式
RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...
- vmware网卡三种模式详解
NAT(Network Address Translation) ,网络地址交换,NAT模式是比较简单的实现虚拟机上网的方式,NAT模式的虚拟机就是通过宿主机(物理电脑)上网和交换数据的. Bride ...
- 网页中的三种地址详解
网页中的a标签具有地址跳转的功能,href属性指向跳转的地址. 一.三种地址模式 网络地址分为两种,一种是绝对地址,一种是相对地址. 但是相对地址又可以细分为两种,一种是基于当前目录的相对地址,一种是 ...
- Beini的6种***模式详解
Aireplay-ng的6种***模式详解 -0Deautenticate冲突模式 使已经连接的合法客户端强制断开与路由端的连接,使其重新连接.在重新连接过程中获得验证数据包,从而产生有效ARPreq ...
- python 命令-python解析命令行参数的三种方法详解
这篇文章主要介绍了python解析命令行参数的三种方法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 python解析命令行参数主要有三种方法: ...
- Maven精选系列--三种仓库详解
转载自 Maven精选系列--三种仓库详解 仓库分类 1.本地仓库 本地仓库就是开发者本地已经下载下来的或者自己打包所有jar包的依赖仓库,本地仓库路径配置在maven对应的conf/settings ...
- 全站仪与计算机之间的数据传输,必看!全站仪数据传输的三种方式详解,都安排得明明白白(上)...
原标题:必看!全站仪数据传输的三种方式详解,都安排得明明白白(上) 科力达全站仪数据传输 一般而言,全站仪的数据传输方式有三种,分别是通过串口.USB.SD卡三种方式,因为电脑配置等因素的不同,一些数 ...
- 查看登陆系统用户的信息的三种方法详解
查看登陆系统用户的信息的三种方法详解 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.who这个命令显示可以谁在登陆,但是这个有很多的花式玩法,这个命令超简单 语法:who [O ...
- FTP协议主动(Port)模式和被动(Passive)两种模式详解
FTP协议主动(Port)模式和被动(Passive)两种模式详解 FTP(File Transfer Protocol)是文件传输协议的简称. 正如其名所示: FTP的主要作用,就是让用户连接上一个 ...
最新文章
- 机器学习各领域必读经典综述
- centos下面安装mysql_centos下安装mysql
- 吴良超 融合 cnn+lstm
- 基于matlab/simulink的双闭环PMSM控制
- 串结构练习——字符串连接
- Android Broadcast Security
- 洛谷——P1092 虫食算
- 话里话外:成功的ERP需要全程的流程变革(三)
- libsvm中数据归一化的重要性
- linux下mysql中文乱码_linux下mysql中文乱码
- Raki的读paper小记:RoBERTa: A Robustly Optimized BERT Pretraining Approach
- java5 64_java8 64位(官方免费版下载2020)
- 史上最全电子科技大学858信号与系统考研要了解的常识
- idea Lombok插件下载
- 【Python常见 面试题】实现三位数的水仙花数,Python入门案例学习
- 关于软件测试的基础认知分享
- Kali 安装 Nessus 详细过程
- 《音乐达人秀:Adobe Audition CC实战222例》——1.2 从双卡录音机到多轨录音软件...
- 针对电子企业的仓储需求,提出WMS仓储管理系统解决方案
- Spark MLlib机器学习 | 算法综合实战(一)(史上最详细)