RabbitMQ笔记

1、RabbitMQ是什么?

1.1、概念:

​ MQ(Message Queue)消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。本质上就是一个存储、转发消息的中间件。

1.2、MQ历史发展:

​ 最早的消息队列软件TIB(The Information Bus)实现了生产者和消费者的解耦,它的成功引起了IBM的注意,之后IBM研发了自己的IBM Websphere,微软也研发了MicroSoft Message Queue,但是这些产品由于协议和API的不同,没有一套标准接口来实现它们的互通,2001年SUN公司发布JMS规范,这使得我们只要使用合适的MQ驱动即可,就像JDBC规范一样,但是这是跟JAVA语言绑定的,无法实现跨语言。2006年AMQP协议规范的发布,促进了MQ的发展,它是跨语言和跨平台的。2007年Rabbit公司基于AMQP规范开发了RabbitMQ。

1.3、MQ的主要特点:

  1. MQ是一个独立运行的服务
  2. 采用队列(先进先出)作为数据结构
  3. 具有发布订阅的功能

1.4、MQ可以实现的功能:

  1. 实现异步通信
  2. 实现系统解耦
  3. 实现流量削峰,在访问量剧增时,MQ中的队列可以减少突发访问的情况。

2、RabbitMQ的基本特性

  • 可靠性,RabbitMQ提供了发送应答、发布确认、持久化等机制来保证可靠性。
  • 灵活的路由:通过交换机来实现消息的灵活路由。
  • 支持多客户端:对主流开发语言(Python、Java、PHP、C#、JavaScript、Go)都有客户端实现。
  • 支持多协议:除了原生的AMQP协议,还支持STOMP、MQTT、HTTP and WebSockets等其它消息中间件协议。
  • 集群与扩展性:多个RabbitMQ节点可以组成集群,支持负载。
  • 高可用队列:通过镜像队列实现队列中的数据复制。
  • 权限管理:通过用户和VHost实现权限管理。
  • 插件系统:支持各种丰富的插件扩展。
  • 管理界面:提供了一个简单易用的用户界面。

3、RabbitMQ的工作模型

4、RabbitMQ的功能

4.1、消息的过期时间TTL(Time To Live)

  • Message TTL (优先级高)

    单条消息的过期时间,在发送消息时指定消息的属性来设置。

  • Queue TTL

    队列中的消息超过过期时间后,该队列的消息都会过期,通过队列属性x-message-ttl的值来设置

4.2、死信队列Dead Letter

  • 消息什么情况下变成死信

    1. 消息过期了
    2. 消息在被消费者拒绝并且没有设置重回队列(nack||Reject)&&requeue==false
    3. 队列达到了最大长度,超过了max-length或者max-length-bytes,最先入队的消息会被发送到死信队列。
  • 死信队列

    队列在创建的时候可以指定一个死信交换机DLX(Dead Letter Exchange),DLX绑定的队列就是死信队列DLQ(Dead Letter Queue),需要注意的是死信队列也是普通队列。

    通过队列属性x-dead-letter-exchange指定死信交换机,然后将死信交换机和死信队列绑定起来。

4.3、延迟队列

RabbitMQ本身不支持延迟队列,这里有几种解决方案

  1. 利用数据库实现,将消息存储到数据库,采用定时任务来发送。

  2. 利用TTL和DLX实现,整体的流程就像这样:

    这样也会带来比较明显的缺点:如果使用Message TTL可能会导致队列的消息阻塞,后面的消息无法投递(前面的过期时间大于后面的);如果使用Queue TTL在消息延迟梯度比较多的情况下,需要创建大量的交换机和队列。

  3. 利用延迟队列插件类实现(rabbitmq-delayed-message-exchange)

4.4、流量控制

当生产者的生产速率远大于消费者的消费速率时,会产生大量消息堆积的情况;当消费者处理消息的能力有限,导致消费端本地缓存消息数量过多,导致内存溢出,针对这些情况,可以通过服务端流控和消费端限流来控制。

4.4.1、服务端流控
  1. 内存控制

    RabbitMQ启动时会检测机器的内存大小,当占用超过40%(默认)的内存时,会阻塞所有连接。可以通过配置文件rabbitmq.config中的vm_memory_high_watermark属性来进行设置。

  2. 磁盘控制

    当磁盘空间低于50MB(默认)时,会触发限流,通过配置文件中的disk_free_limit.ralative和disk_free_limit.absolute属性来设置,前者是百分比,后者是绝对大小。

4.4.2、消费端限流

​ 由于消费端会缓存消息,当缓存的消息过多时,如果能够在一定数量的消息消费完成之前,不再推送消息,这样就可以完成消费端限流了。

基于Consumer或者Channel设置prefetch count值。表示consumer端的最大unacked messages数量,当超过这个数值的消息未被确认时,会停止投递新的消息给该消费者。比如channel的prefetch count值为10时,当消费端有10条消息没有发送ack后,将不再给这个消费者投递消息。

5、消息的可靠性

RabbtMQ在异步通信的时候,消息传递的可靠性可以从RabbitMQ工作模型看出,保证消息发送、消息路由、消息存储、消息投递这四个过程的可靠性,就能确保消息的可靠性。下面来逐个分析。

5.1、消息发送到服务端

RabbitMQ中提供了两种服务端确认机制,事务(Transaction)模式和确认(Confirm)模式。

5.1.1、Transaction模式
String msg = "Hello Rabbit MQ";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 声明队列
try {channel.txSelect();//将channel设置为事务模式// String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());// int i =1/0;channel.txCommit();System.out.println("消息发送成功");
} catch (Exception e) {channel.txRollback();System.out.println("消息已经回滚");
}

这种模式下发送消息是阻塞的,对服务器性能影响很大,所以使用的很少。

5.1.2、Confirm模式

确认模式有三种:

5.1.2.1、普通确认模式
String msg = "Normal Confirm";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect();// 开启确认模式
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (channel.waitForConfirms()) {// 发送一条,确认一条System.out.println("SUCCESS" );
}

这种模式效率不高。

5.1.2.2、批量确认模式
String msg = "Batch Confirm";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {channel.confirmSelect();// 开启确认模式for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, null, (msg +":"+ i).getBytes());}channel.waitForConfirmsOrDie();// 批量确认结果,ACK如果是Multiple=True,表示Delivery-Tag之前的消息都被确认了//只要有一个未确认就会发生异常,之前的所有确认的消息都要重发System.out.println("BATCH SUCCESS");
} catch (Exception e) {// 发生异常,处理e.printStackTrace();
}
5.1.2.3、异步确认模式

5.2、消息从交换机路由到队列

当消息无法路由时,可以通过服务端重发给生产者或者让这些消息都路由到备份的交换机上来处理。

5.2.1、消息回发

使用mandatory参数和ReturnListener将消息回发给生产者。

channel.addReturnListener(new ReturnListener() {//指定ReturnListenerpublic void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException {System.out.println("被返回的消息"+"replyText:"+replyText+"exchange:"+exchange+"routingKey:"+routingKey+"message:"+new String(body));}});
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").build();
// 第三个参数是mandatory,如果为true,结合ReturnListener消息回发,如果false,直接丢弃消息
channel.basicPublish("","gupaodirect",true, properties,"hello future".getBytes());
5.2.2、消息路由到备份机
//在声明交换机的时候指定备份交换机
Map<String,Object> properties = new HashMap<String,Object>();
properties.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, properties);

5.3、消息在队列中存储

当服务发生故障中时,会导致内存中的数据丢失,因此需要将消息本身和元数据(交换机、队列、绑定关系)都持久化到磁盘中。

5.3.1、队列和交换机的持久化

在声明队列和交换机时将durable参数设置为true。

// 声明交换机
// 参数说明String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true, false, null);
// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
5.3.2、消息的持久化

在发送消息时将消息设置为持久化消息

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2表示持久化消息.contentEncoding("UTF-8").build();// 发送消息
channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());

5.4、消息投递到消费者

消费者在收到消息后或者处理时发生异常,都会导致消息投递失败。RabbitMQ提供了消费者的消息确认机制来确保可靠性,分为自动发送ACK和手动发送ACK两种。服务端没有收到ACK,等到消费者断开连接后,会将这条消息发送给其他消费者,如果没有其它消费者,消费者重启后会重新消费这条消息。

消费者订阅队列时,可以指定autoAck参数,若为false,则表示手工ack,服务端会等待消费端显式回复ack后才会移除消息。

(spingboot整合amqp中)通过setAcknowledgeMode(AcknowledgeMode.AUTO)来设置。取值有AUTO/NONE/MANUAL。

NONE表示自动ack。

MANUAL表示手动ack。

AUTO:如果方法没有抛出异常,则发送ack。如果抛出AmqpRejectAndDontRequeueException则消息会被拒绝,且不会重新入队。如果抛出ImmediateAcknowledgeAmqpException则发送ack。如果是其他的异常并且requeue参数值为true,则消息会被拒绝并且重新入队。

6、其他问题

6.1、生产者如何知道消费者有没有成功消费?

  • 调用生产者API
  • 发送响应给生产者

6.2、消息幂等性

对于重复发送的消息,生成一个唯一的业务ID,通过日志来做重复控制。

6.3、消息的顺序性

当一个队列有多个消费者时,由于不同的消费者的消费速度是不同的,顺序性无法保证,因此只有当一个队列仅有一个消费者的情况下才能保证顺序性。

7、集群

RabbitMQ集群用于实现高可用和负载均衡。集群有两种模式:普通集群和镜像集群。集群中的结点有两种类型:磁盘结点和内存结点。磁盘结点会将元数据放在磁盘中,内存结点将元数据放在内存中。集群中至少需要一个磁盘结点来持久化元数据,默认情况下结点类型为磁盘结点。集群通过25672端口进行通信。

7.1、普通集群

这种模式下,不同节点之间只会相互同步元数据。由于消息是存储在队列中的,因此不能保证队列的高可用,如果节点失效将导致相关队列不可用。

7.2、镜像集群

镜像队列模式下,消息的内容也会在镜像节点间同步,可用性提高,但是会降低系统的性能。

这里简单总结一下学习RabbitMQ的知识点,demo演示后面在补充上。

一文看懂RabbitMQ相关推荐

  1. docker rabbitmq_一文看懂Rabbitmq,从安装到实战演练

    Rabbitmq的初步使用 随着微服务概念发展,大应用逐步拆分为小应用,提高开发效率,专门的人做专门的事情,逐渐的流行起来. 在微服务上实现通信的方式大部分是采用rpc方式,也有升级版本的grpc. ...

  2. 一文看懂 AI 训练集、验证集、测试集(附:分割方法+交叉验证)

    2019-12-20 20:01:00 数据在人工智能技术里是非常重要的!本篇文章将详细给大家介绍3种数据集:训练集.验证集.测试集. 同时还会介绍如何更合理的讲数据划分为3种数据集.最后给大家介绍一 ...

  3. 一文看懂计算机视觉-CV(基本原理+2大挑战+8大任务+4个应用)

    2020-03-06 20:00:00 计算机视觉(Computer Vision)是人工智能领域的一个重要分支.它的目的是:看懂图片里的内容. 本文将介绍计算机视觉的基本概念.实现原理.8 个任务和 ...

  4. 一文看懂人脸识别(4个特点+4个实现步骤+5个难点+算法发展轨迹)

    2020-03-09 20:01:00 人脸识别是身份识别的一种方式,目的就是要判断图片和视频中人脸的身份时什么. 本文将详细介绍人脸识别的4个特点.4个步骤.5个难点及算法的发展轨迹. 什么是人脸识 ...

  5. 一文看懂卷积神经网络-CNN(基本原理+独特价值+实际应用)

    http://blog.itpub.net/29829936/viewspace-2648775/ 2019-06-25 21:31:18 卷积神经网络 – CNN 最擅长的就是图片的处理.它受到人类 ...

  6. 【深度学习理论】一文看懂卷积神经网络

    [深度学习理论]一文看懂卷积神经网络 https://mp.weixin.qq.com/s/wzpMtMFkVDDH6scVcAdhlA 选自Medium 作者: Pranjal Yadav 经机器之 ...

  7. python读取excelsheet-一文看懂用Python读取Excel数据

    原标题:一文看懂用Python读取Excel数据 导读:现有的Excel分为两种格式:xls(Excel 97-2003)和xlsx(Excel 2007及以上). Python处理Excel文件主要 ...

  8. ​【Python基础】一文看懂 Pandas 中的透视表

    作者:来源于读者投稿 出品:Python数据之道 一文看懂 Pandas 中的透视表 透视表在一种功能很强大的图表,用户可以从中读取到很多的信息.利用excel可以生成简单的透视表.本文中讲解的是如何 ...

  9. angular 字符串转换成数字_一文看懂Python列表、元组和字符串操作

    好文推荐,转自CSDN,原作星辰StarDust,感觉写的比自己清晰-大江狗荐语. 序列 序列是具有索引和切片能力的集合. 列表.元组和字符串具有通过索引访问某个具体的值,或通过切片返回一段切片的能力 ...

最新文章

  1. SqlDataReader执行带输出参数存储过程 错误分析
  2. 远程桌面连接管理问题解决方法大全
  3. python中统计计数的几种方法和Counter的介绍
  4. java 高级泛型_Java 泛型高级
  5. unity 是厘米还是米_乔丹19岁才1.75米,2年增高近20公分,这个长高方法你能坚持多久...
  6. 循环依赖 三级缓存解决
  7. 7.16 c++自学笔记
  8. 机器视觉硬件选型——镜头选型
  9. oracle停数据库服务器,优化Oracle停机时间及数据库恢复
  10. python语言是非开源语言_python是非开源语言吗
  11. 在网页中实现:手势解锁密码
  12. SWFObject 2.0 官方文档二
  13. 从程序员到项目经理(28):该死的结果导向(只看结果,不问过程到底行不行?)
  14. 供应链金融你了解多少?
  15. 房屋管理系统简单Damo
  16. qt:qt5.12警告消除大法之 warning: zero as null pointer constant
  17. Autovue v21.0.2新功能简介
  18. 考研词汇测试软件,考研词汇 - 在线打字测试(dazi.kukuw.com)
  19. C语言n层嵌套平方根的计算n
  20. 【轴承故障分解】基于 ITD实现轴承故障信号分解含Matlab源码

热门文章

  1. 图像的空间分辨率和幅度分辨率
  2. 2018.3版本 CLion的激活码
  3. (转)Visual SourceSafe (VSS的使用方法)使用方法
  4. 前端页面查看PDF文档内容总结
  5. CSS实现图片自适应布局
  6. (已完善)基于Python的TCP 协议实现人机聊天(程序具有服务端和客户端)
  7. 机器学习-特征归一化
  8. 数学知识——欧拉函数
  9. 利用微信API将你的微信变为聊天机器人
  10. 数据结构 严薇敏 队列 的实现及其使用方法详解