一篇文章学会RabbitMQ。SpringAMQP操作RabbitMQ。RabbitMQ五种模式及其代码实现。
目录
一、同步与异步调用:
一)同步调用:
二)异步调用:
三)使用建议:
四)MQ种类
二、SpringAMQP
1、导入依赖:
2、启动相关服务:
3、配置序列化:
三、Rabbit五种关系模式:
一、简单模式编辑
二、工作模式:
三、发布、订阅模型:
一)Fantout模式:
二)路由模式:
三)Topic模式:
一、同步与异步调用:
一)同步调用:
支付服务调用订单服务时,只有当订单服务执行完成之后,才会接着调用仓储服务,以此类推,直到支付服务需要的全部服务都执行完毕之后才会给用户返回执行成功指令,所需时间较长。
存在问题:
1、耦合度高。给系统添加新功能时,必须修改原来的代码,例订单服务调用成功之后要给用户发一个短信,此时就必须去修改订单服务的代码。
2、性能下降。调用者的等待时间=每个子系统调用的时间之和。
3、资源浪费。每个服务在调用完成后等待响应的过程中不能释放请求所占用的资源,并发强度下及其浪费系统资源。
4、级联失败。当仓储服务出现故障之后,之后的短信服务就无法调用了,因为存在阻塞,只有仓储服务完成之后才会调用短信服务。
二)异步调用:
同样调用者调用支付服务之后,系统会立即通知Broker,之后Broker就会通知依赖于支付服务的所有服务,这些服务同时执行,不存在互相等待。只要支付服务将消息推送给Broker此次程序就相当于执行完毕。
优点 :
1、服务解耦。仓储服务执行之后不想再调用短信服务了,不用修改代码,直接将短信服务删除即可。
2、吸能提升。之前的同步调用500ms,异步调用只需60ms
3、没有级联失败问题。仓储服务损坏不影响其他服务的执行。
4、流量削峰。下图中假设订单服务能同时处理2两个,仓储服务能同时处理1个,此时一下子来了3个支付服务,则broker会将两个订单服务分配过去,剩下的那一个在队列等待,将一个仓储服务分配过去两外两个在队列等待。起到了数据缓冲的作用。
将高并发转化为低并发:
缺点:
1、对Broker的可靠性,安全性,吞吐能力有很高的要求。
2、依赖于Broker,一旦Borker崩溃整个微服务都崩溃了。
3、架构复杂,业务没有明显流程线,不易追踪管理。
三)使用建议:
1、想立刻得到结果,就必须使用同步调用。因为异步只是通知系统让系统要办这件事,但是什么时候办完用户并不知道,而同步不一样,同步是等待执行完之后才算调用完毕。
2、对吞吐量、并发、低耦合关系,则使用异步调用。
四)MQ种类
在上面异步调用时,我们讲到了Broker,MQ实际就是异步调用的Broker。下图显示了常见MQ的消息队列的对比。
二、SpringAMQP
SpringAMQP是一个简化Spring框架操作RabbitMQ的一个依赖,弥补了使用原生RabbitMQ书写代码时的不便。
1、导入依赖:
<!-- AMPQ依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、启动相关服务:
如果服务器的MQ没有启动要先启动。 我使用的是docker容器,所以先启动了docker在启动mq
1、启动docker:
systemctl start docker
2、查看自己的mq容器名称:
docker ps -a
3、启动容器: docker start 容器名
docker start mq
3、配置项目的yml文件:
spring:rabbitmq:host: 192.168.77.135 # rabbit的主机地址port: 5672 #端口username: itcast # 用户名password: 123321 # 密码virtual-host: / # 虚拟主机
3、配置序列化:
SpringBoot向MQ发送和接收默认以字节为基本类型,所以现在我们要发送Map对象的JSON类型就需要进行序列化配置。
一、发送端:
1、配置MAVEN:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
2、在启动类添加一个Bean配置:(用JSON方式替换默认的JDK方式)
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
3、发送消息:
@Test
public void testSendMap() throws InterruptedException {// 准备消息Map<String,Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 21);// 发送消息rabbitTemplate.convertAndSend("simple.queue","", msg);
}
二、接收端:
1,2步相同,添加依赖,覆盖原有配置。
3、接收端,只是将方法参数的String类型转化为Map类型即可:
@RabbitListener(queues = "fanout.queue2")
public void listenQueue(Map<String,Object> msg) {System.out.println("接收到消息:【" + msg + "】");
}
三、Rabbit五种关系模式:
一、简单模式
1、发送消息
1、写队列名称。
2、写要发送的内容
3、发送
@SpringBootTest
public class AMQPTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue"; //发送的队列名称String message = "hello amqp!"; //要发送的消息rabbitTemplate.convertAndSend(queueName,message); //发送}
}
二、接收消息:
1、书写一个单独的类,打上注解。
2、在方法上打上监听注解,参数是队列的名称
3、方法的参数与发送的参数保持一致。
@Component //注入到SpringBoot中
public class SpringListener {@RabbitListener(queues = "simple.queue") //队列的名称public void listenSimpleQueue(String msg){ //发送消息是什么类型的,这里参数就写什么类型System.out.println("我监听到了【"+msg+"】消息!");}
}
二、工作模式:
假设只有一个消费者,他最多消费20条消息,队列保存数据的上线为30条,现在发送端发来了70条消息,则前20条给消费者,21-50存在队列中,51-70则丢失,因为队列无法存放更多的消息,所以此时引入了多消费者来缓解消息丢失问题。
案例需求:
首先我们在消费者的yml文件中配置一下listener属性
spring:rabbitmq:host: 192.168.77.135 # rabbit的主机地址port: 5672 #端口username: itcast # 用户名password: 123321 # 密码virtual-host: / # 虚拟主机listener:simple:prefetch: 1 # 每次只处理一个消息,处理完成之后在给你下一个消息
1、发送代码:
@Testpublic void testWorkQueue() throws InterruptedException {String queueName = "simple.queue"; //发送的队列名称String message = "work queue"; //要发送的消息for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,message); //发送Thread.sleep(20);}}
2、接收端代码:
@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("一监听到了【"+msg+"】消息!");Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("二监听到了----【"+msg+"】消息!");Thread.sleep(200);}
三、发布、订阅模型:
在上面的简单和工作模式中,一个消息只能被一个用户消费。
针对订单系统,一条下单消息要发送给多个服务即多个消费者消费,所以此时引入了发布、订阅模式。
一)Fantout模式:
Fantout交换机会将消息路由到每一个跟其绑定的队列。
案例:
1、在消费者中创建一个类,声明队列和交换机:
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Bean //此处的函数名一定要与上面书写的一致public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
接收部分:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
2、消息的发送端:
@Test
public void testFanoutExchange() {// 队列名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);//发送的第二个参数写空
}
二)路由模式:
每一个队列都有对应的bindingKey(可有多个),交换机只会将消息发送给key相同的队列。
案例代码:
1、接收端:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
2、发送端:
@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
三)Topic模式:
相比于路由模式,Topic模式进行了升级,简化了key的书写方式,让接收部分变得更简单。
代码案例:
1、接收端:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
2、发送端:
/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
一篇文章学会RabbitMQ。SpringAMQP操作RabbitMQ。RabbitMQ五种模式及其代码实现。相关推荐
- rabbitMQ概述/在springboot下测试五种模式
一.应用场景: (1) 异步操作: 任务异步处理将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理.提高了应用程序的响应时间. (2) 解耦: 应用程序解耦合MQ相当于一个中介,生 ...
- 这五种糟糕的代码实践,程序员要学会规避
点击上方"程序猿技术大咖",关注并选择"设为星标" 回复"加群"获取入群讨论资格! 作者丨Marcin Gajda 译者丨马可薇 策划 | ...
- Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合
一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...
- 一篇文章学会写作,自媒体人的必经之路
自媒体离不开好内容. 能否持续性的创作出好内容是每一个自媒体人绕不过的坎. 今天从图文自媒体分享下写作. 写作应该是花80%的时间思考,花20%的时间把思考的内容写出来. 例如,我要写一篇知乎带货的文 ...
- php写一个shell脚本文件格式,一篇文章学会——shell脚本编写
用了caffe有一段时间了,感觉自己写shell脚本的能力有待提高,特地从菜鸟笔记处系统的看了一遍,其实学习基础就可,内容也不多,我就不总结了.把网站上的内容用markdown重新编辑了一下,各位可以 ...
- RabbitMQ入门篇、介绍RabbitMQ常用的五种模式
RabbitMQ 认识RabbitMQ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为 ...
- 一篇文章学会Python函数重写,每天进步一个知识点
函数重写(override) 前提:必须有继承性 原因: 父类中的功能(函数),子类需要用,但是父类中函数的函数体内容和我现在要执行的逻辑还不相符 那么可以将函数名保留(功能还是此功能),但是将函数体 ...
- Linux复习资料——一篇文章学会安装Java(免环境配置)以及tomcat服务
目录 手动安装 需求文件 配置脚本 全自动安装 完整脚本地址(包含下载地址): 测试 手动安装 需求文件 Tomcat下载地址:https://download.csdn.net/download/f ...
- java 最少使用(lru)置换算法_一篇文章学会如何基于LRU-K算法设计本地缓存实现流量削峰...
专注于Java领域优质技术号,欢迎关注 作者:一个Java菜鸟 1.背景介绍 1.1.现象 QPS突然增长2倍以上(45w~60w每分钟) 将产生下面一些问题: 1)响应接口响应时长增加了5倍(qps ...
最新文章
- Firefox 的一个HTTP分析器扩展
- 南阳理工大学 gnns 新基准
- [PHP] 深度解析Nginx下的PHP框架路由实现
- MySQL 高水位update_Oracle delete 高水位线处理问题
- 都有什么行业是“三年不开张,开张吃三年”?或是稳赚不赔的暴利行业?
- c#excel导入mysql_(转)C# Excel导入Access数据库的源码
- 20个最常用的Windows命令行
- Nginx-配置https虚拟服务(访问http时自动跳转https)
- 协程asyncio_迭代器,生成器,协程
- PHP与JS互相加密解密方法2.0
- iPad mini Retina越狱小结【2014年02月06日 - 初稿】
- 做企业管理软件的,怎能不读这本1965年的书呢?
- 如何给三线表格(图片)添加标题?
- 聊聊Dotnetty
- winxp计算机如何连接win7计算机,Win7电脑连接XP系统共享打印机的操作方法
- 情侣博客源码php,分享SEO WordPress的标题Title
- 麦克劳林级数与麦克劳林公式(泰勒公式)
- 数据中心远程集中解决方案有哪些?
- AI-K210 开发家庭万用宝模组(1)
- SCON:串行控制寄存器 之 格式
热门文章
- SAP-QM QA32/QA33权限控制IQC/OQC
- oracle order by 走索引吗,oracle order by 索引是否使用的情况
- 用ps实现小风车的动画效果
- 如何采用javaj导出word
- MySQL- 23- 视图
- yum安装最新版php7
- java实现图片与base64字符串之间的转换(不适用SUN公司的sun.misc.BASE64Encoder)
- 2021 年你需要知道的 CSS 工程化技术
- 实习生必学git以及详细下载安装步骤
- 戴尔外星人m16r1国行中文原厂Windows11系统自带Support Assist OS Recovery恢复出厂设置