SpringCloud源码探析(六)-消息队列RabbitMQ
1.概述
RabbitMQ是一个开源的消息代理和队列服务器,它是基于Erlang语言开发,并且是基于AMQP协议的。由于Erlang语言最初使用与交换机领域架构,因此使得RabbitMQ在Broker之间的数据交互具有良好的性能。AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种消息队列应用层协议,专门面向消息的中间件而设计,类似于JAVA的JMS协议,基于此规范能够开发出各种各样的消息中间件。RabbitMQ能够与SpringAMQP完美整合,具有较强的扩展性和丰富的API,且具有高可靠性和低延时的优点,被业界广泛使用。本文将介绍SpringAMQP与Rabbit的使用,帮助读者更好地理解消息队列。
2.消息队列RabbitMQ使用
2.1 MQ对比
由上图可知,从可靠性和消息延迟的角度来说,RabbitMQ都是较为突出的,从吞吐量的角度来说可能一般。因此在选用消息队列中间件时,应根据使用场景选择更为合适的。
2.2 AMQP核心概念
名称 | 概念 |
---|---|
Server/Broker | 可以理解为消息队列实体 |
VHost | 虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离 |
Exchange | 消息交换机,它指定消息按一定规则,路由到对应队列 |
Queue | 消息队列载体,每个消息可能会被投到一个或多个队列 |
Producer | 消息生产者,消息投递方 |
Consumer | 消息消费者,接收消息的程序 |
Binding | 绑定器,它将exchange和queue按照路由规则绑定起来 |
channel | 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务 |
Routing Key | 路由关键字,exchange根据关键字将消息投递到指定queue |
2.3 RabbitMQ架构图
由上图可知,生产者通过channel连接到Broker,将消息投送到Exchange,Exchange再按照指定规则将消息投送到对应的queue,消费者通过监听指定的queue来获取消息。生产者不需要关注投递到那个队列,消费者也不关心消息是从哪个Exchange发送而来,两者之间是松耦合的关系。Exchange的类型有:Fanout交换机、Direct交换机、Topic交换机。Fanout交换机是一种广播模式,消息进来时会投递到所有与之绑定的队列。Direct交换机是完全根据key进行匹配队列,key一致时才会投送。Topic交换机可以按一定的规则进行匹配key,匹配成功进行投递。
2.4 SpringBoot中使用RabbitMQ
2.4.1 引入pom文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.4.2 添加配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
2.4.3 编写生产者
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitApplication.class)
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}}
2.4.3 编写消费者
@Component
public class RabbitMQListener {@RabbitListener(queues = "simple.queue")public void receiveMessage(String message) {System.out.println("接收到消息:" + message);}
}
2.4.4 测试结果
运行完成测试代码后,消费者收到消息。上述代码展示的是RabbitMQ最基础的发送和接收模型,生成者将消息发送到指定队列,消费者订阅指定队列获取消息。
2.5 RabbitMQ核心消息发送模型
2.5.1 WorkQueue模型
该模型一个队列可以对应多个消费者,适用于发送者发送大量消息,容易发送积压,因此需要多个消费者来消费。
2.workQueue测试代码
测试类:
@Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "work.queue";// 消息String message = "hello, world";for (int i = 1; i <= 50; i++) {// 发送消息String messageInfo = message + "..." + i;rabbitTemplate.convertAndSend(queueName, messageInfo);log.info("消息发送成功:{}", messageInfo);Thread.sleep(50);}}
消费代码:
@RabbitListener(queues = "work.queue")public void receiveWorkMessage1(String message) throws InterruptedException {System.out.println("消息队列1...接收到消息:" + message);Thread.sleep(10);}@RabbitListener(queues = "work.queue")public void receiveWorkMessage2(String message) throws InterruptedException {System.out.println("消息队列2...接收到消息:" + message);Thread.sleep(15);}
3.workQueue运行结果
2.5.2 FanoutExchange模型
1.FanoutExchange消息发送模型
FanoutExchanger模型比简单模型和WorkQueue模型多了一个交换机(Exchange),消息先会发送到Exchange,然后Exchange将消息路由到每一个与其绑定的queue。
2.代码配置
@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange");}@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
发送部分测试代码:
@Testpublic void testFanout() {// 队列名称String exchangeName = "fanout.exchange";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"", message);}
3.运行结果
2.5.3 DirectExchange模型
DirectExchange的消息发送模式与FanoutExchange模型基本一致,区别在于它会将收到的消息根据规则路由到指定的Queue,也就是说中间多了一层根据规则筛选发送队列,它的每一个Queue都与Exchange设置一个BindingKey,发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
1.代码如下
监听者部分代码如下:
@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void receiveDirectQueue1(String message) throws InterruptedException {System.out.println("消息队列1...接收到消息:" + message);}@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void receiveDirectQueue2(String message) throws InterruptedException {System.out.println("消息队列2...接收到消息:" + message);}
测试代码如下:
@Testpublic void testDirect() {// 队列名称String exchangeName = "direct.exchange";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"red", message);}@Testpublic void testDirectYellow() {// 队列名称String exchangeName = "direct.exchange";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"yellow", message);}
2.测试结果
由上述结果可知,当队列订阅同一个Exchange时,向指定Exchange并携带RoutingKey,只有包含对应RoutingKey的队列才能收到消息。
2.5.4 TopicExchange模型
TopicExchange与DirectExchange类似,区别在于RoutingKey必须是多个单词的列表,并且以.分割,Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词。
1.测试代码
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC),key = {"#.news"}))public void receiveTopicQueue2(String message) throws InterruptedException {System.out.println("消息队列2...接收到消息:" + message);}
发送消息部分代码:
@Testpublic void testTopicQueue() {// 队列名称String exchangeName = "topic.exchange";// 消息String message = "湖人总冠军";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"sports.news", message);}
2.测试结果
3.小结
1.RabbitMQ是一个遵循AMQP协议的消息中间件,消息从生产者发送到消费者,他能根据规则指定消息发送对象,缓存或进行持久化;
2.Spring AMQP是基于AMQP协议开放的接口,能够无缝对接各种基于AMQP的协议,快速利用Spring进行开发;
3.RabbitMQ延时低,而可靠性高,适用于吞吐量较大且对信息实时性要求较高的场景。
4.参考文献
1.https://www.bilibili.com/video/BV1LQ4y127n4
2.https://juejin.cn/post/6844903903637536775
3.https://www.cnblogs.com/tinmh/p/6026726.html
5.附录
https://gitee.com/Marinc/nacos.git
SpringCloud源码探析(六)-消息队列RabbitMQ相关推荐
- SpringCloud源码探析(三)-Nacos集群搭建与配置管理
1.概述 上一篇文章SpringCloud源码探析(二)-Nacos注册中心分析了nacos单机版的部署以及SpringBoot整合nacos,nacos不仅仅可以作为注册中心,也可以作为配置中心.本 ...
- SpringCloud源码探析(四)-OpenFeign使用及其原理
1.概述 在SpringCloud中,服务之间的调用方式可以通过ResTemplate进行调用,也可以通过Feign调用.ResTemplate的缺陷在于需要指定请求url,存在硬编码问题,导致代码难 ...
- matplotlib工具栏源码探析四(自定义工具项图标)
在matplotlib工具栏源码探析二(添加.删除内置工具项)和matplotlib工具栏源码探析三(添加.删除自定义工具项)两篇文章中,仔细观察会发现,不论内置工具项还是自定义工具项都没有图标,但是 ...
- Selenium3 Python WebDriver API源码探析(19)加载FireFox用户配置文件
FireFox用户配置文件 Firefox 将用户个人信息(例如书签.密码.首选项.扩展.Cookie.证书等)保存在一系列文件中,它们被叫做用户配置文件,它们与 Firefox 的程序文件保存在不同 ...
- Forest源码探析
Forest 是一个开源的 Java HTTP 客户端框架,它能够将 HTTP 的所有请求信息(包括 URL.Header 以及 Body 等信息)绑定到您自定义的 Interface 方法上,能够通 ...
- Selenium3 Python WebDriver API源码探析(10):动作链(ActionChains):鼠标事件和键盘事件
鼠标.键盘事件是我们利用Selenium操控浏览器的重要交互手段,主要由selenium\webdriver\common\action_chains.py中的ActionChains类实现.该类通过 ...
- zookeeper笔记+源码刨析
会不断更新!冲冲冲!跳转连接 https://blog.csdn.net/qq_35349982/category_10317485.html zookeeper 1.介绍 Zookeeper 分布式 ...
- ffmpeg实战教程(十三)iJKPlayer源码简析
要使用封装优化ijk就必须先了解ffmpeg,然后看ijk对ffmpeg的C层封装! 这是我看ijk源码时候的笔记,比较散乱.不喜勿喷~ ijk源码简析: 1.ijkplayer_jni.c 封装的播 ...
- 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解
导语 在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...
最新文章
- Spring+hibernate+struts
- Node安装node-sass总是下载超时问题解决
- 算法---------搜索二维矩阵 II(Java 版)
- 如何设置可见性Android软键盘
- 第六届北京八大处新春祈福庙会大年初一迎客
- 重设忘记的Mysql密码
- Kubernetes入门——Kubernetes日志采集与监控告警
- html鼠标滑轮换图片,JavaScript实现鼠标滚轮控制页面图片切换
- 三大性质总结:原子性,有序性,可见性
- wps vba模块压缩包_01_创建第一个VBA小程序:你好,世界
- delphi xe5 android,android – 发送电子邮件Delphi XE5
- Win10环境下 Cad插件使用失败 解决方法
- VUE(混入mixin、计算属性computed、监听watch)
- 支付宝飞行模式/转卡/转账/h5拉起支付
- 计算机假期计划内容,2019寒假计划,超详细学习计划表
- 解决VM虚拟机导致硬盘灯常亮,很卡的问题
- 安徽外国语学院计算机毕业大补考,学生缓考、补考及重修最终成绩计算办法
- java计时器StopWatch
- 如何设置QTableWideget和行高和列宽
- oracle 监听服务启动后停止