02、RabbitMQ之交换机
一、 Exchange(交换机)的作用
在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,
队列再将消息以推送或者拉取方式给消费者进行消费
创建消息 路由键 pull/push生产者------------>交换机------------>队列------------>消费者
二、 Exchange(交换机)的类型
1)直连交换机:Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。
2)主题交换机:Topic Exchange
弥补直连交换机的缺点!
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key,
假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。
所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,
主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*…的格式,每个部分用.分开,其中
*表示一个单词
#表示任意数量(零个或多个)单词。
示例:队列Q1绑定键为 *.TT.*队列Q2绑定键为TT.#如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到
3)扇形交换机:Fanout Exchange
扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。
扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,
所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
这个交换机没有路由键概念,就算你绑了路由键也是无视的。
4)首部交换机:Headers exchange
5)默认交换机
6)Dead Letter Exchange(死信交换机)
本文只讲前三种交换机
三、交换机的属性
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
Name:交换机名称
Durability:是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在
Auto-delete:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它
Arguments:扩展参数
**四、案例:交换机的使用 **
打开linux,开启docker,开启容器,容器的创建请参照
https://blog.csdn.net/qq_43469718/article/details/103671005
输入界面版RabbitMQ
创建springcloud项目
rabbitmq02 #主模块
rabbitmq-provider #生产者
rabbitmq-consumer #消费者给父模块添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zking</groupId><artifactId>rabbitmq02</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq02</name><packaging>pom</packaging><description>Demo project for Spring Boot</description><modules><module>rabbitmq-provider</module><module>rabbitmq-consumer</module></modules><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
子模块pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.zking</groupId><artifactId>rabbitmq02</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>rabbitmq-provider</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-provider</name><packaging>jar</packaging><description>Demo project for Spring Boot</description></project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.zking</groupId><artifactId>rabbitmq02</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>rabbitmq-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-consumer</name><packaging>jar</packaging><description>Demo project for Spring Boot</description></project>
配置yml
rabbitmq-provider
server:port: 8081servlet:context-path: /rabbitmq-provider
spring:rabbitmq:virtual-host: my_vhosthost: 192.168.208.130port: 5672username: adminpassword: admin
rabbitmq-consumer
server:port: 8082servlet:context-path: /rabbitmq-consumer
spring:rabbitmq:virtual-host: my_vhosthost: 192.168.208.130port: 5672username: adminpassword: admin
配置完毕后
1.演示直连交换机:Direct Exchange
在生产者rabbitmq-provider
新建包rabbitmq
,这个包专门用来放配置类
新建类RabbitDirect
配置类
package com.zking.rabbitmqprovider.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** 功能描述: <br>* 〈〉用于绑定队列交换机路由键的关系* 示例:直接交换机(Driect Exchange)* @Param:* @Return:* @Author: 骄傲的骨傲天* @Date: 2019/12/23 22:27*/
@Configuration
public class RabbitDirect {/** 功能描述: <br>* 〈〉1.定义一队列* @Param:* @Return:* @Author: 骄傲的骨傲天* @Date: 2019/12/23 22:34*/@Beanpublic Queue queue(){return new Queue("queue-1",true);}/** 功能描述: <br>* 〈〉2.定义直接交换机* @Param:* @Return:* @Author: 骄傲的骨傲天* @Date: 2019/12/23 22:34*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("direct-exchange");}/** 功能描述: <br>* 〈〉3.将队列与直连交换机进行绑定,并以路由键来串联* @Param:* @Return:* @Author: 骄傲的骨傲天* @Date: 2019/12/23 22:36*/@Beanpublic Binding directBinding(){return BindingBuilder.bind(queue()).to(directExchange()).with("direct-routing-key");}
}
新建controller
包与rabbitmq
同级目录
新建类SendController
用户来向交换机发送消息根据路由键传到消息队列中
此类头上有个@RestController注解
@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/senderByDricet")public Map<String,Object> senderByDricet(){Map<String,Object> map=new HashMap<String,Object>();Map<String, Object> data = this.createData();log.info("生产者发送消息,queue={},directExchange={},routingkey={}","queue-1","direct-exchange","direct-routing-key");log.info("msg={}",data);//第一个参数:交换机的名称//第二个参数:路由键//第三个参数:数据rabbitTemplate.convertAndSend("direct-exchange","direct-routing-key",data);map.put("code",1);map.put("msg","消息发送成功");return map;}public Map<String,Object> createData(){Map<String,Object> data=new HashMap<String,Object>();String ceateDate= LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String message="hello,rabbitmq!!";data.put("createDate",ceateDate);data.put("massage",message);return data;}
运行
postman测试一下
再看下RabbitMQ web版有没有此数据列表
可以看到名为queue-1的消息队列中是有一条消息的
往消费者rabbitmq-consumer
写入代码,消费此消息
创建rabbitmq
包
创建RabbitDirectReceiver
类
开始消费
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-1"})
@Slf4j
public class RabbitDirectReceiver {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("msg={}",map);}}
看下消息已经被消费了
关闭消费者rabbitmq-consumer
,不让进行消费,因为演示主题交换机需要看到效果
2.演示主题交换机:Topic Exchange
同理我们先来制造消息
在生产者rabbitmq-provider
中的rabbitmq
包下创建主题交换机配置类RabbitTopicConfig
package com.zking.rabbitmqprovider.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/** 功能描述: <br>* 〈〉主题交换器* @Param:* @Return:* @Author: 骄傲的骨傲天* @Date: 2019/12/23 23:33*/
@Configuration
public class RabbitTopicConfig {@Beanpublic Queue queueX(){return new Queue("queue-x",true);}@Beanpublic Queue queueY(){return new Queue("queue-y",true);}@Beanpublic Queue queueZ(){return new Queue("queue-z",true);}@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topic-exchange");}@Beanpublic Binding bindingX(){return BindingBuilder.bind(queueX()).to(topicExchange()).with("topic.person.xxx");}@Beanpublic Binding bindingY(){return BindingBuilder.bind(queueY()).to(topicExchange()).with("topic.person.yyy");}@Beanpublic Binding bindingZ(){return BindingBuilder.bind(queueZ()).to(topicExchange()).with("topic.person.*");}
}
可以看到我创建了三个消息队列:queue-x
queue-y
queue-z
注意看bindingZ
方法的路由键规则topic.person.*
主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key
需要采用*.#.*.....
的格式,每个部分用.
分开,其中
*
表示一个单词
#
表示任意数量(零个或多个)单词。
然后再controller
包下的SendController
类中添加senderByTopic
方法(演示直连交换机所创建的)
@RequestMapping("/senderByTopic")public Map<String,Object> senderByTopic(String routingKey){Map<String,Object> map=new HashMap<String,Object>();Map<String, Object> data = this.createData();log.info("生产者发送消息,directExchange={},routingkey={}","topic-exchange",routingKey);log.info("msg={}",data);//第一个参数:交换机的名称//第二个参数:路由键//第三个参数:数据rabbitTemplate.convertAndSend("topic-exchange",routingKey,data);map.put("code",1);map.put("msg","消息发送成功");return map;}
此方法中有个routingKey,这就是路由键,是活的,从前端传过的
重启rabbitmq-provider
,使用postman测试一下
结果:
可以发现我的路由键参数为:topic.person.yyy
同时匹配queue-y
和queue-z
的规则,所以两个消息队列中都有消息
往消费者rabbitmq-consumer
写入代码,消费此两条消息
在rabbitmq
包下创建三个类:RabbitTopicReceiverX
RabbitTopicReceiverY
RabbitTopicReceiverZ
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-x"})
@Slf4j
public class RabbitTopicReceiverX {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("RabbitTopicReceiverX.handlermessage={}",map);}
}
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-y"})
@Slf4j
public class RabbitTopicReceiverY {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("RabbitTopicReceiverY.handlermessage={}",map);}
}
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-z"})
@Slf4j
public class RabbitTopicReceiverZ {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("RabbitTopicReceiverZ.handlermessage={}",map);}
}
运行消费者
可以看到 y z消息队列已经被消费了
3.演示扇形交换机
扇形交换机是与路由键无关的
首先在生产者rabbitmq-provider
下的rabbitmq
中创建扇形交换机的配置类RabbitFanoutConfig
package com.zking.rabbitmqprovider.rabbitmq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.validation.executable.ValidateOnExecution;/** 功能描述: <br>* 〈〉扇形交换机,用于广播消息,于路由键无关* @Param:* @Return:* @Author: 骄傲的骨傲天* @Date: 2019/12/23 23:32*/
@Configuration
public class RabbitFanoutConfig {@Beanpublic Queue queueA(){return new Queue("queue-a",true);}@Beanpublic Queue queueB(){return new Queue("queue-b",true);}@Beanpublic Queue queueC(){return new Queue("queue-c",true);}@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout-exchange");}@Beanpublic Binding bindingA(){return BindingBuilder.bind(queueA()).to(fanoutExchange());}@Beanpublic Binding bindingB(){return BindingBuilder.bind(queueB()).to(fanoutExchange());}@Beanpublic Binding bindingC(){//可以看到这里不需要withreturn BindingBuilder.bind(queueC()).to(fanoutExchange());}
}
在controller
包中的SendController
中添加方法
@RequestMapping("/senderByFanout")public Map<String,Object> senderByFanout(){Map<String,Object> map=new HashMap<String,Object>();Map<String, Object> data = this.createData();log.info("生产者发送消息,fanoutExchange={}","fanout-exchange");log.info("msg={}",data);//第一个参数:交换机的名称//第二个参数:路由键//第三个参数:数据rabbitTemplate.convertAndSend("fanout-exchange",null,data);map.put("code",1);map.put("msg","消息发送成功");return map;}
可以看到rabbitTemplate.convertAndSend("fanout-exchange", null,data);
路由键是null
因此,此方法发送消息所有与fanout-exchange
交换机关联的消息队列都会被发送消息
运行生产者rabbitmq-provider
postman测试一下
三个消息队列都有消息
往消费者rabbitmq-consumer
写入代码,消费消息
在rabbitmq
包下写入三个类:RabbitFanoutReceiverA
RabbitFanoutReceiverB
RabbitFanoutReceiverC
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-a"})
@Slf4j
public class RabbitFanoutReceiverA {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("RabbitTopicReceiverA.handlermessage={}",map);}
}
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-b"})
@Slf4j
public class RabbitFanoutReceiverB {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("RabbitTopicReceiverB.handlermessage={}",map);}
}
package com.zking.rabbitmqconsumer.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"queue-c"})
@Slf4j
public class RabbitFanoutReceiverC {@RabbitHandlerpublic void handlerMessage(Map<String,Object> map){log.info("消费者接收消息。。。");log.info("RabbitTopicReceiverC.handlermessage={}",map);}
}
启动消费者
效果演示完毕!!!
给大家看下项目最终的结构
02、RabbitMQ之交换机相关推荐
- RabbitMQ的交换机类型和工作模式
RabbitMQ的交换机类型有四种 1.direct 直流交换机: 根据消息的路由键routingkey,将消息以完全匹配的方式路由到指定的队列中. 这里的匹配指的是消息本身携带的路由键和队列与交换机 ...
- RabbitMQ中交换机的几种模式
目录 简述 交换机模式 Fanout模式 Direct模式 Topic模式 Headers模式 简述 生产者不直接跟队列打交道,而是通过交换机.交换机类似于生产者和队列直接的一个管理者,它将生产的消息 ...
- RabbitMQ之交换机的四种类型和属性
交换机主要包括如下4种类型: Direct exchange(直连交换机) Fanout exchange(扇型交换机) Topic exchange(主题交换机) Headers exchange( ...
- RabbitMQ持久化交换机队列
持久化 将交换机或队列的数据保存到磁盘 服务器宕机或重启之后依然存在 读写速度比较慢 非持久化 将交换机或队列的数据保存到内存 服务器宕机或重启之后将不存在 读写速度比较快 配置方式 <!-- ...
- 003 Rabbitmq中交换机的类型
一 注意点 在Rabbitmq之中,存在绑定键和路由键的概念. [1]绑定键 : 交换机和队列关系的一种描述. [2]路由键: 消息之中消息标签的内容,描述了消息最终到达哪些队列之中. 在Rabbit ...
- RabbitMQ exchange交换机机制
目录 RabbitMQ 概念 exchange交换机机制 什么是交换机 binding? Direct Exchange交换机 Topic Exchange交换机 Fanout Exchange交换机 ...
- RabbitMQ之交换机
目录 一.RabbitMQ交换机 1.交换机的由来 2.交换机类型 2.1直连交换机(Direct Exchange) 2.2主题交换机(Topic Exchange) 2.3扇形交换机 ...
- RabbitMQ之交换机的讲解
一.交换机 1.Exchange 在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行 ...
- RabbitMQ Topic交换机(结果成功)
public class EmitLogTopic {//交换机的名称public static final String EXCHANGE_NAME = "topic_logs" ...
- RabbitMQ Topic交换机(生产者)
/* * 声明主题交换机及相关队列 * 消费者C1 * */ public class ReceiveLogsTopic01 {//交换机名称public static final String EX ...
最新文章
- MII 功能简介(论坛整理)
- 突然记起我也遇到过一个麻花姐
- ios 开发账号 退出协作_如何在iOS 10中的Notes上进行协作
- 浏览器对象模型(BOM)
- 收藏 | 一文看尽2020AI论文
- Spring Boot详情
- 深度学习笔记(五):LSTM
- Linux 实现 Google Authenticator 动态密码 + SSH 密码双重认证
- FH153C6常用一键开关机芯片 ON/OFF单键开关IC 美容仪电子开关IC
- mysql update无效_Mysql update记录无效如何解决
- NTC电阻短路(高温)电池未停止充电分析
- 日本首相会见奥特曼,考虑引入 ChatGPT 技术
- 阿里云产品试用更新,产品组合试用装更划算,快来免费上云吧
- 在国内,如何优雅的使用ChatGPT??
- 计算机同S7-300PLC通讯,S7-300PLC主站之间的PROFIBUS-DP通讯详解
- python中options是什么意思_在OPTIONS方法中显示棉花糖模式的描述
- 小程序今日头条demo
- 三子棋游戏(超级详解,附加电脑下棋优化)
- Streaming的介绍
- Python常见问题解决记录1-Non-ASCII character '\xe7'错误