RabbitMQ·入门·壹
文章目录
- 1 MQ思想
- 1.1 相关概念:同步、异步通讯
- 1.1.1 同步通讯
- 1.1.2 异步通讯
- 1.2 MQ思想概述
- 1.2.1 MQ思想优缺点
- 1.2.2 MQ技术对比
- 1.2.3 MQ技术选型
- 2 快速入门
- 2.1 Docker安装RabbitMQ
- 2.1.1 拉取RabbitMQ镜像
- 2.1.2 安装RabbitMQ镜像
- 2.2 RabbitMQ基本结构
- 2.3 参考入门案例
- 2.3.1 简单队列模型图
- 2.3.2 实现思路
- 2.3.2.1 Publisher生产者
- 2.3.2.2 Consumer消费者
- 2.3.3 Publiser实现
- 2.3.4 Consumer实现
- 2.4 SpringAMQP
- 3 RabbitMQ中5种消息模型
- 3.1 BasicQueue 简单队列模型
- 3.1.1 Publisher发送消息
- 3.1.1.1 引入依赖
- 3.1.1.2 Publisher生产者配置文件添加RabbitMQ配置
- 3.1.1.3 生产者消息发送
- 3.1.2 消费者消息接收
- 3.1.2.1 consumer配置文件添加配置
- 3.1.2.2 consumer消费消息
- 3.2 WorkQueue任务模型
- 3.2.1 消息发送
- 3.2.2 消息接收
- 3.2.3 测试发现问题
- 3.2.4 能者多劳模式
- 3.3 发布、订阅模型
- 3.3.1 Fanout广播模式
1 MQ思想
1.1 相关概念:同步、异步通讯
通讯方式 | 举例 | 优势 | 劣势 |
---|---|---|---|
同步通讯 | 就像打电话,需要实时响应。 | 打电话可以立即得到响应。 | 不能跟多人同时通话。 |
异步通讯 | 就像发邮件,不需要马上回复。 | 可以同时与多个人收发邮件。 | 往往响应会有延迟。 |
1.1.1 同步通讯
Feign调用属于同步方式
优点 | 缺点 |
---|---|
时效性强 ,可以立即得到结果
|
①耦合度高 :每次加入新需求,都需修改原先代码;②性能低 :调用者需等待服务提供者响应;③资源浪费 :调用链中的服务等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源;④级联失败 :服务提供者出现问题,所有调用方会跟着出问题,最终导致微服务故障
|
1.1.2 异步通讯
示例:下单业务中,用户支付成功需调用订单服务、物流服务最终仓库分配响应的库存并准备发货。
事件模式中:
- 支付服务是事件发布者(publisher),在支付完成后发送支付成功事件(event),事件中带上订单id。
- 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功,监听到事件后完成自己业务。
- 注意:为解除事件发布者与订阅者间耦合,两者间并不是直接通信,而是需要一个中间人
(Broker)
。发布者发布事件到Broker
,不关心谁来订阅事件;订阅者从从Broker
订阅事件,不关心谁发来的消息。
1.2 MQ思想概述
MQ
(MessageQueue)消息队列:存放消息的队列,也就是事件驱动架构中的Broker
。
1.2.1 MQ思想优缺点
优点 | 缺点 |
---|---|
吞吐量提升:无需等待订阅者处理完成,响应速度快速 | 架构复杂,不好管理 |
故障隔离:服务没有直接调用,不存在级联失败问题 | 需要依赖于Broker的可靠、安全、性能 |
调用间无阻塞:不会造成无效的资源占用 | |
耦合度极低:每个服务间都可以灵活插拔,可替换 | |
流量削峰:不论发布事件的流量波动大小,都有Broker接收,订阅者可以按照自己的速度去处理事件 |
1.2.2 MQ技术对比
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
1.2.3 MQ技术选型
需求 | MQ |
---|---|
追求可用性 | Kafka、 RocketMQ 、RabbitMQ |
追求可靠性 | RabbitMQ、RocketMQ |
追求吞吐能力 | RocketMQ、Kafka |
追求消息低延迟 | RabbitMQ、Kafka |
2 快速入门
2.1 Docker安装RabbitMQ
2.1.1 拉取RabbitMQ镜像
docker pull rabbitmq:3.8-management
2.1.2 安装RabbitMQ镜像
docker run \-e RABBITMQ_DEFAULT_USER=username\-e RABBITMQ_DEFAULT_PASS=password \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
2.2 RabbitMQ基本结构
角色 | 解释 |
---|---|
publisher | 生产者 |
consumer | 消费者 |
exchange | 交换机,负责消息路由 |
queue | 队列,存储消息 |
virtualHost | 虚拟主机,隔离不同租户的exchange、queue、消息的隔离 |
2.3 参考入门案例
2.3.1 简单队列模型图
标识 | 解释 |
---|---|
publiser |
消息发布者(生产者 ),将消息发送至队列
|
queue |
消息队列,接受 并缓存消息
|
consumer |
订阅队列(消费者 ),消息订阅者,处理队列中消息
|
2.3.2 实现思路
2.3.2.1 Publisher生产者
- ① 建立连接,设置连接参数(RibbitMQ所在主机名、端口号、RabbitMQ用户名密码等)
- ② 创建Channel,Channel是消息读写通道,且每一个Channel代表一个会话任务。
- ③ 声明队列,在上述简单队列模型图中,需要
声明队列queue
,目的是向此队列发送消息- ④ 发送消息,向声明队列发送消息
- ⑤ 关闭连接和Channel,其都属于IO流,用完需关闭
2.3.2.2 Consumer消费者
- ① 建立连接,设置连接参数(RabbitMQ所在主机名、端口号、RabbitMQ用户名密码等)
- ② 创建Channel,Channel是消息读写通道,且每一个Channel代表一个会话任务。
- ③ 声明队列,在上述简单队列模型图中,需要
声明队列queue
,目的是从此queue中取出消息,从而消费。- ④ 订阅消息,完成消息的消费。
2.3.3 Publiser实现
属性 | 解释 |
---|---|
ConnectionFactory |
通过连接工厂建立连接,例如RabbitMQ主机名 、端口号 、vhost 、用户名 、密码
|
vhost | 相当于一个队列管道,使得各队列间不相互影响 |
Channel | 读写管道,客户端可建立多个Channel,每个Channel代表一个会话任务 |
channel.queueDeclare() |
声明队列,拥有如下参数: queueName :队列名称,必须指定且唯一 ;durable :是否需要持久化,若设置为true则服务器重启后队列依然存在,若为默认的false ,服务器重启后会删除;exclusive :是否为排外队列,若设置为true则只有声明此队列所属的连接可以使用这个队列,其他连接访问时会出错,默认false;autoDelete:是否自动删除,若设置为true则当无任何消费者使用这个队列时,队列会被自动删除,默认false;arguments : 队列其他属性,如存活时间等,默认null
|
channel.basicPublish() |
向RabbitMQ上指定exchange发布一条消息,包含如下参数:exchange :消息发送到那条交换机处理,默认交换机用 ""表示 ,交换机只做消息的路由,不存储记录消息;routing_key :路由键,即消息发往哪个队列处理,消息的投递与exchange类型有关;mandatory :是否强制发送,若为true,当消息无法路由时,将消息退回给生产者,若为false,则会将消息直接丢弃,当设置为true可以在添加一个备用交换机作为记录日志,将无法路由的消息记录到此日志交换机中;immediate :当交换机不能将消息路由到队列时,立即回退给生产者,提高消息传递的实时性,然而当生产者已经关闭或无法处理该消息会发生消息丢失 ,因此建议设置为false,让消息正常进入死信队列进行后继处理;baseProperties :消息附加属性,例如:TTL(存活时间)、优先级等
|
@Testpublic void testSendMessage() throws Exception {//1.建立连接ConnectionFactory factory = new ConnectionFactory();//1.1 设置连接参数,非别为:主机名、端口号、vhost(相当于一个队列管道,使得各队列不相互受影响)、用户名、密码factory.setHost("192.168.150.135");factory.setPort(5672);//5673为服务器数据传输;15672为浏览器客户端factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();//请求建立连接//2 创建通道ChannelChannel channel = connection.createChannel();//channel是消息读写通道。客户端可建立多个channel,每个channel表示一个会话任务。//3 创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);//4 发送消息String message = "你好呀,MQ";//向RabbitMQ上指定exchange发布一条消息,参数包含//exchange:消息发送到哪个交换机处理,默认交换机用 “ ” 表示//routing_key:路由键,即消息发往哪个队列,消息的投递与exchange类型有关//mandatory:是否为强制发送,TRUE:消息无法路由时返回生产者确认消息;false:丢弃消息//immediate:交换机不能把消息交给合适队列是否立即发送返回生产者(可能丢失)://basicProperties:消息附加属性,例如:消息优先级、过期时间channel.basicPublish("", queueName, null, message.getBytes());//默认交换机,指定队列名称,设置消息头信息,消息体字节数组形式System.out.println("消息发送成功" + message);//5 关闭通道和连接channel.close();connection.close();}
2.3.4 Consumer实现
属性 | 解释 |
---|---|
ConnectionFactory | 连接工厂,建立连接,设置连接参数:主机名、端口、vhost、用户名、密码 |
Channel | Channel是消息读写通道,客户端可以建立多个Channel,每个Channel代表一个会话任务 |
channel.queueDeclare() | 创建队列,参数包括:队列名称、是否持久化、当没有消费者时是否自动删除队列、是否只能被当前队列使用、队列其他属性(TTL等) |
channel.basicConsume() | 订阅消息,参数包括:监听队列名称、是否自动确认消息接收、重写方法表名对消息的处理逻辑(此处使用匿名内部类重写handleDelivery() 方法)、当有消息被接收到时,handleDelivery() 方法会被回调实现对消息的处理 |
public static void main(String[] args) throws Exception {//1建立连接ConnectionFactory factory = new ConnectionFactory();//1.1 设置连接参数:主机名、端口、vhost、用户名、密码factory.setHost("192.168.150.135");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();//发送连接请求//2创建通道Channel channel = connection.createChannel();//channel是消息读写通道,客户端可建立多个channel,每个channel表示一个会话任务//3创建队列String queueName = "simple.queue";//参数如下://队列名称//吃否持久化队列//当没有消费者时候是否自动删除队列//是否只能被当前连接使用//队列的其他属性(设置队列生存时间)channel.queueDeclare(queueName, false, false, false, null);//队列名称、不持久性、不自动删除、可被其他连接使用、不设置其他属性//4订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {//参数(监听队列名称、是否自动确认消息接收、实现对消息的处理逻辑、当有消息是回调handleDelivery()方法)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//处理消息String message = new String(body);System.out.println("接受的消息是:" + message);}});System.out.println("等待消息接收");
2.4 SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,利用SpringBoot对其实现自动装配。
SpringAmqp官方地址
SpringAMQP提供了三个功能:
- ① 自动声明队列、交换机及其绑定关系
- ② 基于注解的监听器模式,异步接收消息
- ③ 封装了RabbitTemplate工具,用于发送消息
3 RabbitMQ中5种消息模型
官方示例文档
3.1 BasicQueue 简单队列模型
角色 | 解释 |
---|---|
P(publisher) | 消息发布者,将消息发送到队列queue |
queue | 消息队列,负责接受并缓存消息 |
consumer | 订阅队列,处理队列中的消息 |
3.1.1 Publisher发送消息
3.1.1.1 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.1.1.2 Publisher生产者配置文件添加RabbitMQ配置
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码
3.1.1.3 生产者消息发送
@Autowiredprivate RabbitTemplate rabbitTemplate;//注入RabbitTemplate模板方法
/*** 简单队列模型*/@Testpublic void testSimpleQueue() {//队列名称String queueName = "simple.queue";//消息String message = "你好呀,MQ,这是用AMQP方法发送的消息";//发送消息rabbitTemplate.convertAndSend(queueName, message);}
3.1.2 消费者消息接收
3.1.2.1 consumer配置文件添加配置
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码
3.1.2.2 consumer消费消息
/*** 简单队列模型* @param msg AMQP发送的消息信息*/@RabbitListener(queues = "simple.queue")//声明消息队列public void listenSimpleQueueMessage(String msg) {System.out.println("AMQP方法接收到的消息:{" + msg + "}");}
3.2 WorkQueue任务模型
由来:当消息处理比较耗时,可能会使生产消息的速度大于消息消费速度,由此,消息会发生堆积现象,消息无法及时处理。
WorkQueue(Taskqueues):任务模型的出现,让多个消费者绑定到一个队列,共同消费队列中的消息。
3.2.1 消息发送
/*** workQueue* 向队列中不停发送消息,模拟消息堆积* @throws Exception 时间*/@Testpublic void testWorkQueue() throws Exception {//队列名称String queueName = "simple.queue";//消息String message = "你好MQ。测试工作队列消息堆积_";for (int i = 0; i < 1000; i++) {//发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
3.2.2 消息接收
/*** 模拟任务堆积* @param msg 任务堆积消息* @throws Exception 时间*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueue1(String msg) throws Exception {System.out.println("消费者1接收到的消息::{" + msg + "}" + LocalDateTime.now());Thread.sleep(20);}/*** 模拟任务堆积* @param msg 任务堆积消息* @throws Exception 时间*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueue2(String msg) throws Exception {System.err.println("消费者2接收到的消息::{" + msg + "}" + LocalDateTime.now());Thread.sleep(200);}
3.2.3 测试发现问题
测试发现:
- ① 消费者1快速消费自己的消息(间隔60s)。
- ② 消费者2缓慢处理自己的消息(间隔200s)。
因此,表明两个问题:
- ①
消息平均
分配给每个消费者。- ② 没有考虑到
消费者处理能力
。
3.2.4 能者多劳模式
修改配置文件实现:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
通过配置
能者多劳模式
,使得:
- ① 较少消费者2处理的消息的数量。
- ② 让性能好的消费者1多处理消息。
3.3 发布、订阅模型
发布订阅模型如下图(相比于其他模型、,多了交换机):
注意:
属性 | 解释 |
---|---|
Publisher |
生产者,向exchange(交换机)发送消息,区别于其他模型 ,不是将消息发送到队列,而是发送给exchange(交换机)。
|
Exchange |
交换机,① 接收生产者发送消息;② 决定如何处理消息,例如:交给某个特别队列、递交给所有队列、或者丢弃消息,只负责转发消息,不存储 ,分别对应三种交换机类型 ,注意:若没有任何队列与exchange绑定,或者无符合路由规则的队列,那么消息会丢失。
|
Consumer |
消费者,订阅队列,消费消息
|
Queue |
消息队列,接收消息,缓存消息
|
三种交换机类型:
类型 | 解释 |
---|---|
Fanout |
广播 ,将消息交给所有绑定到交换机的队列。
|
Direct |
定向 ,把消息交给指定routing_key规则的队列。
|
Topic |
通配符 ,把消息交给符合routing_pattern(路由模式)的队列。
|
3.3.1 Fanout广播模式
RabbitMQ·入门·壹相关推荐
- RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息
消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...
- rabbitMQ入门程序
1.生产者 /*** rabbitMQ入门程序消费者** @author xiaoss* @date 2020年10月27日 22:02*/ public class Producer01 {//队列 ...
- rabbitmq 入门demo
rabbitmq 入门demo http://www.cnblogs.com/jimmy-muyuan/p/5428715.html http://www.cnblogs.com/shanyou/p/ ...
- RabbitMQ 入门:2. Exchange 和 Queue
上文RabbitMQ 入门:1. Message Broker(消息代理)提到过 RabbitMQ 实现了 AMQP 这个协议(RabbitMQ 所支持的 AMQP 的版本是 0.9.1),这个协议的 ...
- RabbitMQ入门学习系列(二),单生产者消费者
友情提示 我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题.可以直接在公众号<爱码农爱生活 >留言.必定会再次 ...
- RabbitMQ入门:路由(Routing)
在上一篇博客<RabbitMQ入门:发布/订阅(Publish/Subscribe)>中,我们认识了fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exch ...
- RabbitMQ入门-Topic模式
上篇<RabbitMQ入门-Routing直连模式>我们介绍了可以定向发送消息,并可以根据自定义规则派发消息.看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样 ...
- RabbitMQ入门:发布/订阅(Publish/Subscribe)
在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...
- RabbitMQ入门到精通
RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...
最新文章
- 大规模服务设计部署经验谈
- PandaRSS 自助服务系统安装配置
- blocked java线程_Java线程状态:BLOCKED与WAITING的区别
- Linux内核链表交换节点,[笔记]Linux内核链表:结点的插入、删除以及链表的遍历...
- 【重复制造精讲】3、成本收集
- 芯片里的CPU、GPU、NPU是什么,它们是如何工作的
- 2019腾讯广告算法大赛完美收官,算法达人鹅厂“出道”
- 不能执行已释放 Script 的代码
- mysql在linux下的完整安装
- 移动端布局,C3新增属性
- H264/AVC协议基本概况
- Android实现TTS文字转语音功能
- 怎么用燃尽图高效搞定项目进度监控?看老原这一篇就够了!
- ie9 下面输入框后面怎么出现一个黑色叉叉
- Vue 开发 Lov 组件
- 三维动画--Blender软件介绍
- 那年的少年,都在做什么
- 计算机报名qq登录用户名或密码错误,登录电脑微信显示账号或密码错误怎么办...
- 【基于EDK的嵌入式系统】 关于Xilinx EDK添加自定义IP核到PLB总线后linux无法boot的问题
- ifafu最新版本android,ifafu最新版下载