同步调用

  • 优点:时效性强,可以立即得到结果
  • 缺点:
  1. 耦合度高,每次加入新需求都要该原来的代码
  2. 性能和吞吐能力下降,调用者需要等待提供者响应后才能继续下一步操作
  3. 有额外资源消耗,调用者在等待服务响应过程中,不能释放请求占用的资源
  4. 有级联失效问题,如果服务提供者出现问题,所有调用方都会跟着出现问题

异步调用——通过Broker代理,调用者在请求broker后可以立即返回,无需等待所有结果返回后再响应,这里引用黑马的图片,很直观

优点:

  1. 耦合度低
  2. 吞吐量提升
  3. 故障隔离
  4. 流量削峰:当有大量请求时,可以先放在broker里,服务器根据自己的处理速度再去broker里处理请求,以此来缓解服务器压力

缺点:

  1. 依赖于Broker的可靠性、安全性、吞吐能力,所以必须保证broker足够可靠
  2. 架构复杂,业务没有明显的业务流程线,不好追踪管理

MQ简介

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是上述中事件驱动架构中的Broker,下面通过黑马的截图来看看几种常见的MQ区别

RabbitMQ学习

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网:https://www.rabbitmq.com/,结构如下

基本概念

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息的队列
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

安装

这次安装是在以前安装好的docker容器上进行的安装,步骤如下

  1. 下载RabbitMQ

    docker pull rabbitmq:3-management  #冒号后面是版本号

    阿萨

  2. 下载好后运行容器

    docker run \-e RABBITMQ_DEFAULT_USER=itcast \    #用户名-e RABBITMQ_DEFAULT_PASS=123321 \    #密码  --name mq \    #容器名字--hostname mq1 \    #主机名 -p 15672:15672 \    #rabbitmq的管理平台端口,可通过该端口登录管理平台-p 5672:5672 \      #收发消息的端口  -d \    #后台运行rabbitmq:3-management #镜像名称

    完成后就安装好了

可以通过输入IP地址和端口在浏览器访问

常见的消息模型

基本消息队列,下图是官网解释

工作消息队列概念

以上两种都是通过队列来发送,没有exchange交换机的概念,并且消息被消费后立即销毁

以下三种都属于发布订阅模式(允许将同一个消息发送给多个消费者),根据交换机类型不同又分为三类,分别是

  • 广播模式(Fanout),一次向多个消费者发送消息,x指的是exchange交换机
  • 路由模式(Direct),选择性接收消息

  • 主题模式(Topic),基于模式(主题)接收消息

基本消息队列消息发送流程

  1. 创建connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列消息接收流程

  1. 创建connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

SpringAMQP

AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

SpringAMQP:是基于AMQP协议的一套api规范,提供了模板来收发消息。包括了两部分,其中spring-amqp是基础抽象,spring-rabbit是底层默认实现的,官网地址Spring AMQP

通俗来讲就是通过官方原生的方法来创建消息队列太麻烦了,spring现在通过封装底层实现来帮我们简化了创建流程和代码难度

下面通过SpringAMQP创建做一个最简单的基本消息队列

  1. 创建一个父工程并引入相关依赖

     <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
  2. 创建推送者Publisher子项目,并且配置yml文件

    logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
    spring:rabbitmq:host: 192.168.65.129 # rabbitMQ的ip地址port: 5672 # rabbitmq收发消息的端口username: itcast    #  rabbitmq用户名password: 123321    #  rabbitmq密码virtual-host: /   #虚拟主机名字
  3. 写一个单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowiredprivate RabbitTemplate rabbitTemplate; //引入rabbitmq的协议标准对象@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue"; //队列名字String message = "hello, spring amqp!";  //队列消息rabbitTemplate.convertAndSend(queueName, message);}
}

这里注意,我指定的队列名字叫做"simple.queue",但是在rabbitmq里面没有这个队列,需要我手动去创建一个,如下

创建成功后,启动单元测试,点击队列名字,看一下效果

以上就是发送消息到队列的过程,但并没有接收消息,现在写如何接收队列里的消息

  1. 创建Consumer消费者springboot子工程
  2. 引入依赖(父工程引过了所以可以不用引)
  3. 配置yml文件
    logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
    spring:rabbitmq:host: 192.168.65.129 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /
  4. 创建一个监听类,这里取名listener,

    @Component  //注入为spring的一个bean,好让spring监控到
    public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")  //这里是指定我们监听的是哪一个消息队列public void listenSimpleQueue(String msg){System.out.println("消费者接收到的消息是:"+msg);}
    }

    完成后启动application启动类,然后就可以接收消息了,如下

    注意:消费完后队列里的消息会被销毁,所以再去rabbitmq的队列里就看不见已经消费过的消息了

下面通过SpringAMQP创建做一个工作队列(work模型),就是将多个消费者绑定到同一队列,同一条消息只会被一个消费者处理

任务:在一秒内由p端发送50条数据,C1处理速度是40条每秒,C2是10条每秒,让两个消费者协同处理

  1. 在发送端写好发送代码

      @Testpublic void WorkQueue() throws InterruptedException {String queueName="simple.queue";String message="工作队列";for (int i = 0; i <50 ; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);//这里表示50条数据在一秒钟发完}
  2. 在消费者端写好两个消费者代码

    @RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到的消息是:"+msg+LocalTime.now());Thread.sleep(25);//表示消费者1每秒处理40个消息}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收到的消息是:"+msg+LocalTime.now());Thread.sleep(100); //表示消费者2每秒处理10个消息}

    这里要注意的是,rabbitmq默认才是消息预取的方式,就是两个消费者都先把所有消息从队列里拿过来再处理,如果有50个消息,每个消费者就先拿25个,到手后再一起处理,如果两个消费者的消费能力不一样,这显然是有弊端的,所以需要进行如下配置

  3. 在yml文件做配置

    logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
    spring:rabbitmq:host: 192.168.65.129 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1   #这是新加的配置,表示每个消费者最多先预取1个消息,消费完后再取下一个,保证了每个消费者都根据自己的消费速度来处理消息
  4. 然后分别启动消息发送者和消费者,查看情况,如果电脑处理器正常一般就是消费者一接收了40条消息,消费者2接受了10条消息,耗时在一秒内

发布订阅模式案列学习

首先是广播模式案列实现,Fanout exchange会将收到的消息路由到每一个跟其绑定的queue

步骤如下:

  1. 在消息发送端写发送代码,以前都是直接发给队列,现在是发送给交换机,所以代码如下

    @Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName = "exchange01";// 消息String message = "hello, every one!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
  2. 在消费者端新建一个配置类,用于建立交换机和队列名称,并将交换机和队列绑定,代码如下

    @Configuration//表示该类为配置类
    public class FanoutConfig {// 交换机有多种类型,这里是广播模式,所以声明FanoutExchange交换机名字叫做"exchange01"@Bean  //声明为一个bean交给spring管理public FanoutExchange fanoutExchange(){return new FanoutExchange("exchange01");}// 声明队列名字叫做fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);//表示将fanoutQueue1队列绑定到fanoutExchange交换机}// fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
    }
    
  3. 完成消费者代码的编写

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到fanout.queue1的消息是:"+"{"+msg+"}");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到fanout.queue2的消息是:"+"{"+msg+"}");}

    完成后重启服务,效果如下

如此便实现了一个消息被多个消费者消费的目的

路由模式(routes)

前面介绍的广播模式,可以将一个消息发送给多个与路由器绑定的队列;而现在介绍的路由模式,是把消息发送给与路由器绑定了相同key值的队列,实现了消息的指定发送,即选择性接收消息

发布者发送消息时,会指定一个RoutingKey,而队列则会绑定一个bindingKey,当这两个参数的值相同时,路由器才会将消息路由到队列中去,现在做一个案列,来实现消息的指定发送

  1. 这次先写发送端代码

     @Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "direct exchange";// 消息String message = "发给所有队列";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "common" /*这里的common就是routingKey,要和路由器的bindingkey相对应才能成功接收到值*/, message);}
  2. 写接收端代码,这次不通过bean的方式来写,直接在注解上配置队列名字和路由器名字及类型

    //路由模式@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), //声明队列的名字exchange = @Exchange(name = "direct exchange", type = ExchangeTypes.DIRECT), //声明路由器的名字和路由器的类型,这里是direct类型key = {"common", "队列A的key"} //声明路由器的key值,这里声明一个共有的common和一个特有的”队列A的key“))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value= @Queue(name = "direct.queue2"),exchange = @Exchange(name = "direct exchange",type = ExchangeTypes.DIRECT),key = {"common","队列B的key"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息"+"【" + msg + "】");}

    注意看接收端的代码,有一个key值是common,即是共有的key值,我们在发送端也写有common这个值,启动服务看效果

可以发现通过共有的key值,路由器把消息分别发送给了两个队列,由两个消费者端接收到了一样的消息,这和广播模式很像

现在修改routingKey值,改为某个队列特有的key,将key值改为"队列A的key",看效果

现在就只有direct.queue1队列才能收到消息了,另外一个队列因为key值不同所有没能收到消息,由此实现了路由模式的选择性接收消息的效果

话题模式(Topic)

TopicExchange和DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

并且在绑定时可以以#或*作为通配符

#:代指一个或多个单词

*:代指一个单词

话题模式就是要同一个话题的消息才能被接收,比如有中国天气和外国天气,中国军事和外国军事,这时如果设置routingKey为中国.#,则发送中国天气和中国军事这两个板块。如果设置为#.军事,则发送中国军事和外国军事这两个板块,下面举例

  1. 先写发送端

        @Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "Topic exchange";// 消息String message = "这是中国军事频道";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.junshi" , message);}@Testpublic void testSendTopicExchange2() {// 交换机名称String exchangeName = "Topic exchange";// 消息String message = "这是外国军事频道";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "waiguo.junshi" , message);}
    }
  2. 写接收端代码,和路由模式差不多,就是bandingKey要改一下

        @RabbitListener(bindings = @QueueBinding(value = @Queue(name= "topce.queue1"),exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("只接收到与china相关的消息,不管是什么内容"+"【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name= "topce.queue2"),exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC),key = "#.junshi"))public void listenTopicQueue2(String msg){System.out.println("只接收到junshi相关的消息,不管是哪个国家的"+"【" + msg + "】");}

    先写启动中国军事频道的测试类,会发现接收端所有中国的话题会报道中国军事,军事话题会报道军事

    再把发送端代码的中国军事改为天气

     @Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "Topic exchange";// 消息String message = "今天中国气温适中";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.tianqi" , message);}

    效果如下

消息转换器

在springAMQP的发送方法中,接收消息的类型是Object,也就是所我们可以发送任意对象的消息,SpringAMQP会自动序列化字节后发送

下面通过创建对象的方式,将对象从发送端传递到消费端,看看如何实现

  1. 在夫父工程引入相关依赖

    <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
  2. 在消费者端配置类声明一个队列

     @Beanpublic Queue objectQueue(){return new Queue("object.queue");}
  3. 在消费者端写好接收代码

    @RabbitListener(queues = "object.queue")public void listenObjectQueue2(List msg){System.out.println("接收到object.queue的消息:" + msg);}
  4. 在发送端写发送代码,以对象形势发送,这里用list对象

       @Testpublic void testSendObject() {List list =new ArrayList();list.add("小鱼");list.add("大鱼");// 发送消息rabbitTemplate.convertAndSend( "object.queue",list);}
  5. 分别在发送端和接收端的启动类写转换工具的方法

     @Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}

    然后启动服务,效果如下

MQ介绍,RabbitMQ在SpringAMQP中的使用相关推荐

  1. [喵咪MQ(1)]RabbitMQ简单介绍准备工作

    [喵咪MQ(1)]RabbitMQ简单介绍准备工作 前言 哈喽大家好呀! 看标题就知道我们这次要讲MQ,之前博客中有提到的KafKa理论上来说也是一个优秀的MQ队列软件,比较知名的MQ有:Go语言编写 ...

  2. 初识MQ和RabbitMQ

    MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列. 几种常见MQ的对比: RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apa ...

  3. RabbitMQ入门篇、介绍RabbitMQ常用的五种模式

    RabbitMQ 认识RabbitMQ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为 ...

  4. leyou商城day10 MQ介绍及详情静态页

    02.商品详情:分析渲染商品详情页需要的数据 分析item.html页面,得到渲染所需要的数据如下: List categories:三个分类对象的集合 Brand brand:品牌对象 String ...

  5. 【MQ】MQ消息中间件RabbitMQ

    第一部分:RabbitMQ 一.MQ 概念 MQ,Message Queue,消息队列.本质是队列,遵循FIFO先进先出原则.只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于 ...

  6. MQ原理、使用场景、IBM WebSphere MQ介绍及spring集成配置

    一.MQ简介及特点 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们 ...

  7. 常用MQ原理、使用场景和IBM WebSphere MQ介绍

    一.MQ简介及特点 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们 ...

  8. 一篇文章学会RabbitMQ。SpringAMQP操作RabbitMQ。RabbitMQ五种模式及其代码实现。

    目录 一.同步与异步调用: 一)同步调用: 二)异步调用: 三)使用建议: 四)MQ种类 二.SpringAMQP 1.导入依赖: 2.启动相关服务: 3.配置序列化: 三.Rabbit五种关系模式: ...

  9. RabbitMQ指南(中)

    原文出处: Listen 在上一篇文章中,介绍了使用RabbitMQ的Hello World例子, 以及如何创建一个work queue.在work queue的例子中每条消息都只会被传递到一个wor ...

最新文章

  1. 装饰器模式与java.io包
  2. 数据结构哪本书比较好_东莞工厂电动伸缩门固定在哪一边比较好?
  3. 获取客户端IP和MAC地址
  4. Arcgis Server初学笔记(一)
  5. settimeout(fn(),0)
  6. 小型电商Web架构!小而美!值得学习!
  7. 一个软件工程师在北京的反省
  8. python config文件_Python如何配置config文件?
  9. 你知道门禁卡的原理吗?手机模拟门禁卡研究
  10. html二级菜单:DIV+CSS制作二级菜单(横排二级下拉菜单)以及二级菜单出现错位怎么解决
  11. win 10 设置静态ip 子网前缀长度
  12. HBuilder创建App并打包发布
  13. 智源社区AI周刊No.97:Bengio新论文用GFlowNets统一生成模型;北大发布AI for EDA数据集...
  14. 千万千万不可运行的Linux命令
  15. STM32单片机语音声控智能台灯可调光冷暖光人检测锂电池供电太阳能和USB充电
  16. esc键 qt 退出菜单_qt之esc键
  17. linux常用命令整理-02-服务器-系统-内存-磁盘-优化
  18. 【WLAN】【测试】盘点如何查看系统连接过的WIFI密码(包括手机、电脑及不同系统)
  19. kvc实践一:核心方法和进阶
  20. 考研真有那么难吗?过来人分享一下

热门文章

  1. 【Pytorch】RuntimeError: one of the variables needed for gradient computation has been modified by
  2. 直播|一小时轻松学会抖音爆款视频制作
  3. 西门子PLC学习笔记十一-(装入与传送指令)
  4. Wiki(中文翻译为维基)
  5. 基于LDA模型的主题分析
  6. 【找工作资料】英文求职信相关
  7. TextView实现自动上下滚动的效果(TextSwitcher)
  8. 用MATLAB写出蒙特卡洛仿真
  9. 负值瞬时频率出现原因及解决方法总结
  10. 施工员培训建筑八大员培训施工员房屋建筑工程现场施工管理剖析