文章目录

  • 1 MQ思想
    • 1.1 相关概念:同步、异步通讯
      • 1.1.1 同步通讯
      • 1.1.2 异步通讯
    • 1.2 MQ思想概述
      • 1.2.1 MQ思想优缺点
      • 1.2.2 MQ技术对比
      • 1.2.3 MQ技术选型
  • 2 快速入门
    • 2.1 Docker安装RabbitMQ
      • 2.1.1 拉取RabbitMQ镜像
      • 2.1.2 安装RabbitMQ镜像
    • 2.2 RabbitMQ基本结构
    • 2.3 参考入门案例
      • 2.3.1 简单队列模型图
      • 2.3.2 实现思路
        • 2.3.2.1 Publisher生产者
        • 2.3.2.2 Consumer消费者
      • 2.3.3 Publiser实现
      • 2.3.4 Consumer实现
    • 2.4 SpringAMQP
  • 3 RabbitMQ中5种消息模型
    • 3.1 BasicQueue 简单队列模型
      • 3.1.1 Publisher发送消息
        • 3.1.1.1 引入依赖
        • 3.1.1.2 Publisher生产者配置文件添加RabbitMQ配置
        • 3.1.1.3 生产者消息发送
      • 3.1.2 消费者消息接收
        • 3.1.2.1 consumer配置文件添加配置
        • 3.1.2.2 consumer消费消息
    • 3.2 WorkQueue任务模型
      • 3.2.1 消息发送
      • 3.2.2 消息接收
      • 3.2.3 测试发现问题
      • 3.2.4 能者多劳模式
    • 3.3 发布、订阅模型
      • 3.3.1 Fanout广播模式

1 MQ思想

1.1 相关概念:同步、异步通讯

通讯方式 举例 优势 劣势
同步通讯 就像打电话,需要实时响应。 打电话可以立即得到响应。 不能跟多人同时通话。
异步通讯 就像发邮件,不需要马上回复。 可以同时与多个人收发邮件。 往往响应会有延迟。

1.1.1 同步通讯

Feign调用属于同步方式

优点 缺点
时效性强,可以立即得到结果 耦合度高:每次加入新需求,都需修改原先代码;②性能低:调用者需等待服务提供者响应;③资源浪费:调用链中的服务等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源;④级联失败:服务提供者出现问题,所有调用方会跟着出问题,最终导致微服务故障

1.1.2 异步通讯

示例:下单业务中,用户支付成功需调用订单服务、物流服务最终仓库分配响应的库存并准备发货。

事件模式中

  • 支付服务是事件发布者(publisher),在支付完成后发送支付成功事件(event),事件中带上订单id。
  • 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功,监听到事件后完成自己业务。
  • 注意:为解除事件发布者与订阅者间耦合,两者间并不是直接通信,而是需要一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件;订阅者从从Broker订阅事件,不关心谁发来的消息。

1.2 MQ思想概述

MQ(MessageQueue)消息队列:存放消息的队列,也就是事件驱动架构中的Broker

1.2.1 MQ思想优缺点

优点 缺点
吞吐量提升:无需等待订阅者处理完成,响应速度快速 架构复杂,不好管理
故障隔离:服务没有直接调用,不存在级联失败问题 需要依赖于Broker的可靠、安全、性能
调用间无阻塞:不会造成无效的资源占用
耦合度极低:每个服务间都可以灵活插拔,可替换
流量削峰:不论发布事件的流量波动大小,都有Broker接收,订阅者可以按照自己的速度去处理事件

1.2.2 MQ技术对比

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

1.2.3 MQ技术选型

需求 MQ
追求可用性 Kafka、 RocketMQ 、RabbitMQ
追求可靠性 RabbitMQ、RocketMQ
追求吞吐能力 RocketMQ、Kafka
追求消息低延迟 RabbitMQ、Kafka

2 快速入门

2.1 Docker安装RabbitMQ

2.1.1 拉取RabbitMQ镜像

docker pull rabbitmq:3.8-management

2.1.2 安装RabbitMQ镜像

docker run \-e RABBITMQ_DEFAULT_USER=username\-e RABBITMQ_DEFAULT_PASS=password \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management

2.2 RabbitMQ基本结构

角色 解释
publisher 生产者
consumer 消费者
exchange 交换机,负责消息路由
queue 队列,存储消息
virtualHost 虚拟主机,隔离不同租户的exchange、queue、消息的隔离

2.3 参考入门案例

2.3.1 简单队列模型图

标识 解释
publiser 消息发布者(生产者),将消息发送至队列
queue 消息队列,接受缓存消息
consumer 订阅队列(消费者),消息订阅者,处理队列中消息

2.3.2 实现思路

2.3.2.1 Publisher生产者

  • ① 建立连接,设置连接参数(RibbitMQ所在主机名、端口号、RabbitMQ用户名密码等)
  • ② 创建Channel,Channel是消息读写通道,且每一个Channel代表一个会话任务。
  • ③ 声明队列,在上述简单队列模型图中,需要声明队列queue,目的是向此队列发送消息
  • ④ 发送消息,向声明队列发送消息
  • ⑤ 关闭连接和Channel,其都属于IO流,用完需关闭

2.3.2.2 Consumer消费者

  • ① 建立连接,设置连接参数(RabbitMQ所在主机名、端口号、RabbitMQ用户名密码等)
  • ② 创建Channel,Channel是消息读写通道,且每一个Channel代表一个会话任务。
  • ③ 声明队列,在上述简单队列模型图中,需要声明队列queue,目的是从此queue中取出消息,从而消费。
  • ④ 订阅消息,完成消息的消费。

2.3.3 Publiser实现

属性 解释
ConnectionFactory 通过连接工厂建立连接,例如RabbitMQ主机名端口号vhost用户名密码
vhost 相当于一个队列管道,使得各队列间不相互影响
Channel 读写管道,客户端可建立多个Channel,每个Channel代表一个会话任务
channel.queueDeclare() 声明队列,拥有如下参数: queueName:队列名称,必须指定且唯一durable:是否需要持久化,若设置为true则服务器重启后队列依然存在,若为默认的false,服务器重启后会删除;exclusive:是否为排外队列,若设置为true则只有声明此队列所属的连接可以使用这个队列,其他连接访问时会出错,默认false;autoDelete:是否自动删除,若设置为true则当无任何消费者使用这个队列时,队列会被自动删除,默认false;arguments: 队列其他属性,如存活时间等,默认null
channel.basicPublish() 向RabbitMQ上指定exchange发布一条消息,包含如下参数:exchange:消息发送到那条交换机处理,默认交换机用 ""表示,交换机只做消息的路由,不存储记录消息;routing_key:路由键,即消息发往哪个队列处理,消息的投递与exchange类型有关;mandatory:是否强制发送,若为true,当消息无法路由时,将消息退回给生产者,若为false,则会将消息直接丢弃,当设置为true可以在添加一个备用交换机作为记录日志,将无法路由的消息记录到此日志交换机中;immediate:当交换机不能将消息路由到队列时,立即回退给生产者,提高消息传递的实时性,然而当生产者已经关闭或无法处理该消息会发生消息丢失,因此建议设置为false,让消息正常进入死信队列进行后继处理;baseProperties:消息附加属性,例如:TTL(存活时间)、优先级等
 @Testpublic void testSendMessage() throws Exception {//1.建立连接ConnectionFactory factory = new ConnectionFactory();//1.1 设置连接参数,非别为:主机名、端口号、vhost(相当于一个队列管道,使得各队列不相互受影响)、用户名、密码factory.setHost("192.168.150.135");factory.setPort(5672);//5673为服务器数据传输;15672为浏览器客户端factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();//请求建立连接//2 创建通道ChannelChannel channel = connection.createChannel();//channel是消息读写通道。客户端可建立多个channel,每个channel表示一个会话任务。//3 创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);//4 发送消息String message = "你好呀,MQ";//向RabbitMQ上指定exchange发布一条消息,参数包含//exchange:消息发送到哪个交换机处理,默认交换机用 “ ” 表示//routing_key:路由键,即消息发往哪个队列,消息的投递与exchange类型有关//mandatory:是否为强制发送,TRUE:消息无法路由时返回生产者确认消息;false:丢弃消息//immediate:交换机不能把消息交给合适队列是否立即发送返回生产者(可能丢失)://basicProperties:消息附加属性,例如:消息优先级、过期时间channel.basicPublish("", queueName, null, message.getBytes());//默认交换机,指定队列名称,设置消息头信息,消息体字节数组形式System.out.println("消息发送成功" + message);//5 关闭通道和连接channel.close();connection.close();}

2.3.4 Consumer实现

属性 解释
ConnectionFactory 连接工厂,建立连接,设置连接参数:主机名、端口、vhost、用户名、密码
Channel Channel是消息读写通道,客户端可以建立多个Channel,每个Channel代表一个会话任务
channel.queueDeclare() 创建队列,参数包括:队列名称、是否持久化、当没有消费者时是否自动删除队列、是否只能被当前队列使用、队列其他属性(TTL等)
channel.basicConsume() 订阅消息,参数包括:监听队列名称、是否自动确认消息接收、重写方法表名对消息的处理逻辑(此处使用匿名内部类重写handleDelivery() 方法)、当有消息被接收到时,handleDelivery() 方法会被回调实现对消息的处理
    public static void main(String[] args) throws Exception {//1建立连接ConnectionFactory factory = new ConnectionFactory();//1.1 设置连接参数:主机名、端口、vhost、用户名、密码factory.setHost("192.168.150.135");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();//发送连接请求//2创建通道Channel channel = connection.createChannel();//channel是消息读写通道,客户端可建立多个channel,每个channel表示一个会话任务//3创建队列String queueName = "simple.queue";//参数如下://队列名称//吃否持久化队列//当没有消费者时候是否自动删除队列//是否只能被当前连接使用//队列的其他属性(设置队列生存时间)channel.queueDeclare(queueName, false, false, false, null);//队列名称、不持久性、不自动删除、可被其他连接使用、不设置其他属性//4订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {//参数(监听队列名称、是否自动确认消息接收、实现对消息的处理逻辑、当有消息是回调handleDelivery()方法)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//处理消息String message = new String(body);System.out.println("接受的消息是:" + message);}});System.out.println("等待消息接收");

2.4 SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,利用SpringBoot对其实现自动装配。
SpringAmqp官方地址

SpringAMQP提供了三个功能:

  • ① 自动声明队列、交换机及其绑定关系
  • ② 基于注解的监听器模式,异步接收消息
  • ③ 封装了RabbitTemplate工具,用于发送消息

3 RabbitMQ中5种消息模型

官方示例文档

3.1 BasicQueue 简单队列模型

角色 解释
P(publisher) 消息发布者,将消息发送到队列queue
queue 消息队列,负责接受并缓存消息
consumer 订阅队列,处理队列中的消息

3.1.1 Publisher发送消息

3.1.1.1 引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.1.2 Publisher生产者配置文件添加RabbitMQ配置

spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码

3.1.1.3 生产者消息发送

 @Autowiredprivate RabbitTemplate rabbitTemplate;//注入RabbitTemplate模板方法
    /*** 简单队列模型*/@Testpublic void testSimpleQueue() {//队列名称String queueName = "simple.queue";//消息String message = "你好呀,MQ,这是用AMQP方法发送的消息";//发送消息rabbitTemplate.convertAndSend(queueName, message);}

3.1.2 消费者消息接收

3.1.2.1 consumer配置文件添加配置

spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码

3.1.2.2 consumer消费消息

    /*** 简单队列模型* @param msg AMQP发送的消息信息*/@RabbitListener(queues = "simple.queue")//声明消息队列public void listenSimpleQueueMessage(String msg) {System.out.println("AMQP方法接收到的消息:{" + msg + "}");}

3.2 WorkQueue任务模型

由来:当消息处理比较耗时,可能会使生产消息的速度大于消息消费速度,由此,消息会发生堆积现象,消息无法及时处理。
WorkQueue(Taskqueues):任务模型的出现,让多个消费者绑定到一个队列,共同消费队列中的消息。

3.2.1 消息发送

    /*** workQueue* 向队列中不停发送消息,模拟消息堆积* @throws Exception 时间*/@Testpublic void testWorkQueue() throws Exception {//队列名称String queueName = "simple.queue";//消息String message = "你好MQ。测试工作队列消息堆积_";for (int i = 0; i < 1000; i++) {//发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}

3.2.2 消息接收

  /*** 模拟任务堆积* @param msg 任务堆积消息* @throws Exception 时间*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueue1(String msg) throws Exception {System.out.println("消费者1接收到的消息::{" + msg + "}" + LocalDateTime.now());Thread.sleep(20);}/*** 模拟任务堆积* @param msg 任务堆积消息* @throws Exception 时间*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueue2(String msg) throws Exception {System.err.println("消费者2接收到的消息::{" + msg + "}" + LocalDateTime.now());Thread.sleep(200);}

3.2.3 测试发现问题

测试发现:

  • ① 消费者1快速消费自己的消息(间隔60s)。
  • ② 消费者2缓慢处理自己的消息(间隔200s)。

因此,表明两个问题:

  • 消息平均分配给每个消费者。
  • ② 没有考虑到消费者处理能力

3.2.4 能者多劳模式

修改配置文件实现:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

通过配置能者多劳模式,使得:

  • ① 较少消费者2处理的消息的数量。
  • ② 让性能好的消费者1多处理消息。

3.3 发布、订阅模型

发布订阅模型如下图(相比于其他模型、,多了交换机):

注意:

属性 解释
Publisher 生产者,向exchange(交换机)发送消息,区别于其他模型不是将消息发送到队列,而是发送给exchange(交换机)
Exchange 交换机,① 接收生产者发送消息;② 决定如何处理消息,例如:交给某个特别队列递交给所有队列、或者丢弃消息只负责转发消息,不存储,分别对应三种交换机类型 ,注意:若没有任何队列与exchange绑定,或者无符合路由规则的队列,那么消息会丢失。
Consumer 消费者,订阅队列,消费消息
Queue 消息队列,接收消息,缓存消息

三种交换机类型:

类型 解释
Fanout 广播,将消息交给所有绑定到交换机的队列。
Direct 定向,把消息交给指定routing_key规则的队列。
Topic 通配符,把消息交给符合routing_pattern(路由模式)的队列。

3.3.1 Fanout广播模式

RabbitMQ·入门·壹相关推荐

  1. RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息

    消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...

  2. rabbitMQ入门程序

    1.生产者 /*** rabbitMQ入门程序消费者** @author xiaoss* @date 2020年10月27日 22:02*/ public class Producer01 {//队列 ...

  3. rabbitmq 入门demo

    rabbitmq 入门demo http://www.cnblogs.com/jimmy-muyuan/p/5428715.html http://www.cnblogs.com/shanyou/p/ ...

  4. RabbitMQ 入门:2. Exchange 和 Queue

    上文RabbitMQ 入门:1. Message Broker(消息代理)提到过 RabbitMQ 实现了 AMQP 这个协议(RabbitMQ 所支持的 AMQP 的版本是 0.9.1),这个协议的 ...

  5. RabbitMQ入门学习系列(二),单生产者消费者

    友情提示 我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题.可以直接在公众号<爱码农爱生活 >留言.必定会再次 ...

  6. RabbitMQ入门:路由(Routing)

    在上一篇博客<RabbitMQ入门:发布/订阅(Publish/Subscribe)>中,我们认识了fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exch ...

  7. RabbitMQ入门-Topic模式

    上篇<RabbitMQ入门-Routing直连模式>我们介绍了可以定向发送消息,并可以根据自定义规则派发消息.看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样 ...

  8. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  9. RabbitMQ入门到精通

    RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...

最新文章

  1. 大规模服务设计部署经验谈
  2. PandaRSS 自助服务系统安装配置
  3. blocked java线程_Java线程状态:BLOCKED与WAITING的区别
  4. Linux内核链表交换节点,[笔记]Linux内核链表:结点的插入、删除以及链表的遍历...
  5. 【重复制造精讲】3、成本收集
  6. 芯片里的CPU、GPU、NPU是什么,它们是如何工作的
  7. 2019腾讯广告算法大赛完美收官,算法达人鹅厂“出道”
  8. 不能执行已释放 Script 的代码
  9. mysql在linux下的完整安装
  10. 移动端布局,C3新增属性
  11. H264/AVC协议基本概况
  12. Android实现TTS文字转语音功能
  13. 怎么用燃尽图高效搞定项目进度监控?看老原这一篇就够了!
  14. ie9 下面输入框后面怎么出现一个黑色叉叉
  15. Vue 开发 Lov 组件
  16. 三维动画--Blender软件介绍
  17. 那年的少年,都在做什么
  18. 计算机报名qq登录用户名或密码错误,登录电脑微信显示账号或密码错误怎么办...
  19. 【基于EDK的嵌入式系统】 关于Xilinx EDK添加自定义IP核到PLB总线后linux无法boot的问题
  20. ifafu最新版本android,ifafu最新版下载

热门文章

  1. 在接口中利用匿名内部类实现接口
  2. 【爬虫实践】记一次Scrapy框架入门使用爬取豆瓣电影数据
  3. 输出二叉树的叶子结点
  4. 小米、百度等巨头加速布局,物联网成争夺新风口
  5. 关于家用摄像头,你想知道的都在这
  6. 分享77个PHP源码,总有一款适合您
  7. 一段式、两段式以及三段式状态机(FSM)设计实例
  8. <PTA>7-4 以英里计的平均速度 (10 分)
  9. 国外计算机考研,牛人计算机考研复习计划(经典)(国外英文资料).doc
  10. Vuepress使用指南(reco)