目录

  • RabbitMQ
    • MQ的相关概念
      • 消息队列协议
      • 消息持久化
      • 消息的分发策略
      • docker安装RabbitMQ
      • AMQP协议
    • RabbitMQ的几种模式
      • 简单simple模式
      • 发布/订阅fanout模式
      • 路由direct模式
      • 主题topic模式
      • 工作work模式
      • 参数header模式
    • MQ的使用场景
    • SpringBoot整合RabbitMQ
    • TTL过期时间
    • 死信队列
    • 内存磁盘的监控
      • 内存警告
      • 内存换页
      • 磁盘预警
    • 分布式事务
      • 分布式事务的方式
      • 生产可靠性
      • 消费可靠性

RabbitMQ

MQ的相关概念

什么是MQ

MQ(message queue),从字面意思上看,本质是个队列, FIFO先入先出,只不过队列中存放的内容是message而已,还是一-种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

MQ的特点

  • 流量消峰(使用队列的性质)
  • 应用解耦(使用中间件的性质)
  • 异步处理(使用消息的性质)

RabbitMQ:RabbitMQ是- -个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裏时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

四大核心概念:

  • 生产者:产生数据发送消息的程序是生产者
  • 交换机是RabbitMQ非常重要的一个部件, -方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
  • 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。 请注意生产者,消费
    者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
  • 队列:队列是RabbitMQ内部使用的一-种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。

内部构造:

消息队列协议

消息中间件使用什么协议:Openwire,AMQP,MQTT,Kafka,OpenMessage协议。

消息中间件协议和http协议的区别

  • 消息中间件协议较为简洁和高效
  • http一般是短连接,消息中间件协议是一个长期获取消息的行为,出现问题就要对数据进行持久化,用来保证高可用。

消息持久化

将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

消息的分发策略

发布订阅,轮询分发,公平分发,重发,消息拉取

RabbitMQ核心

  • Hello World:简单事件。单生产者,单消费者,单队列(简单模式)
  • Work Queue:单队列分发任务。单生产者,多消费者,单队列(工作模式)
  • Publish Subscribe:多队列分发任务。单生产者,单交换机,多队列,多消费者(发布/订阅模式)
  • Routing:路由模式。
  • Topics:主题模式。

docker安装RabbitMQ

# 下载镜像
docker pull rabbitmq

官网下载: rabbitmq官网

rabbitmq三个端口

4369/tcp, 5671-5672/tcp, 15691-15692/tcp, 25672-15672/tcp
  • 5672:client端通信口

  • 15672:管理界面ui端口

  • 25672:server间内部通信口

添加用户

# 创建用户
rabbitmqctl add_user username password
# 设置用户角色
rabbitmqctl set_user_tags username administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
# 列出用户列表
rabbitmqctl list_users

AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息网络协议

这个中间件服务器是一个数据服务器,他能接收消息,以及依赖任意的标准把他们(数据)按照指定的路线送到不同的消费方和当消费方不能很快的接收消息时存储他们(数据)在内存或者磁盘里面这两个主要功能。

AMQP主要做了三件事

  • 创造任意类型交换机和消息队列的能力
  • 连接交换机和消息队列创造一个理想的消息处理系统的能力
  • 完全通过协议控制的能力

AMQP协议:

AMQP permits peers to create multiple independent threads of control. Each channel acts as a virtual connection that share a single socket.

AMQP允许程序创建多个独立的控制线程,每个信号都共享一个socket端口进行虚拟连接,最小信道数量为1,尽量保证每一个信道流量平衡,不应该允许一个非常忙碌的信道而最后饿死了(字面意思理解就好)一个非常贫乏的信道。

一个消息可能存在许多个消息队列中,可以通过引用计数或者复制这个消息等等方法。

RabbitMQ的几种模式

创建一个消息队列

/**
* queue 消息队列名称
* durable 当为true时,如果server重启则消息队列不变,可能会丢失没有持久化的数据
* exclusive 是否供一个消费者消费
* autoDelete 当所有的client使用完这个消息队列后是否自动删除这个消息队列
* arguments 其他参数
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

创建一个交换机

/**
* exchange 交换机名称
* type 交换机模式
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

绑定交换机和队列

/**
* queue 消息队列名称
* exchange 交换机名称
* routingKey 路由键
*/
Queue.BindOk queueBind(String queue, String exchange 交换机名称, String routingKey) throws IOException;

发布消息

/**
* exchange 交换机名称
* routingKey 路由键
* props其他属性
* body 发送内容
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

接收消息

/**
* queue 消息队列名称
* autoAck 消费成功之后是否自动应答
* deliverCallback 消费者未成功消费的回调
* cancelCallback 消费完的回调
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

简单simple模式

当不指定交换机的时,一般会存在默认交换机,默认交换机的模式的direct路由模式。

特点:生产者-消息队列-消费者

发布/订阅fanout模式

特点:广播机制,没有路由key的模式

路由direct模式

特点:有routing-key 匹配模式

主题topic模式

特点:模糊的routing-key匹配模式

工作work模式

特点:分发机制

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

  • 轮询模式的分发:一个消费者-条,按均分配;
  • 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少,按劳分配;

参数header模式

特点:参数匹配模式

MQ的使用场景

  • 解耦、削峰、异步

    • 同步、异步的问题(串行):串行执行时间较长
    • 并行方式,异步线程池:需要自己维护线程池、持久性、高可用都需要自己实现,最重要的是耦合在应用程序中
    • 异步消息队列:
  • 高内聚、低耦合

  • 流量的削峰

  • 分布式事务的可靠消息和可靠生产

  • 索引、缓存、静态化处理的数据同步

  • 流量监控

  • 日志监控(ELK)

  • 下单、订单分发、抢票

SpringBoot整合RabbitMQ

以fanout订阅/发布模式为例

加入依赖包

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

生产者类

// server.serverOrderService
package com.liuhao.springproducer;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class serviceOrderService {private RabbitTemplate rabbitTemplate;@Autowiredpublic void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void makeOrder(String userId, String productId, Integer num) {String s = UUID.randomUUID().toString();String exchangeName = "fanout_order";String routingKey = "";rabbitTemplate.convertAndSend(exchangeName, routingKey, s);}
}

配置类

package com.liuhao.springproducer;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 创建交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout_order", true, false);}// 创建消息队列@Beanpublic Queue smsQueue() {return new Queue("sms.fanout.queue", true);}@Beanpublic Queue emailQueue() {return new Queue("email.fanout.queue", true);}@Beanpublic Queue wxQueue() {return new Queue("wx.fanout.queue", true);}// 绑定交换机和消息队列@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(fanoutExchange());}@Beanpublic Binding wxBinding() {return BindingBuilder.bind(wxQueue()).to(fanoutExchange());}
}

消费者

package com.liuhao.demo1.service;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailService {@RabbitHandlerpublic void receive(String msg) {System.out.println(msg);}
}

TTL过期时间

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除

RabbitMQ可以对消息队列设置TTL。目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列,消费者将无法再收到该消息。

对队列进行设置

@Bean
public Queue wxQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 5000);  // 这个队列的消息存在时间为5000秒return new Queue("wx.fanout.queue", true, false, false, args);
}

消息队列的默认设置TTL的参数key值为x-message-ttl,可自动识别。

对消息进行设置

public void makeOrder(String userId, String productId, Integer num) {String s = UUID.randomUUID().toString();String exchangeName = "fanout_order";String routingKey = "";MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");  // 设置过期时间message.getMessageProperties().setContentEncoding("utf-8");return message;}};rabbitTemplate.convertAndSend(exchangeName, routingKey, s, messagePostProcessor);
}

此处convertAndSend是一个重载方法

public void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {this.convertAndSend(exchange, routingKey, message, messagePostProcessor, (CorrelationData)null);
}

死信队列

DLX,全称为Dead-Letter- Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属
性。当这个队列中存在死信时,Rabbitmq就 会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另-个队
列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可。

@Bean
public Queue wxQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 5000);  // 这个队列的消息存在时间为5000秒args.put("x-dead-letter-exchange", "DLX");args.put("x-dead-letter-routing-key", "DLX_routing_key");return new Queue("wx.fanout.queue", true, false, false, args);
}

fanout不需要配置。

内存磁盘的监控

内存警告

当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞 客户端的连接,并且停止
接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。

设置方式:命令

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

fraction/value为内存阈值。默认情况是: 0.4/2GB, 代表的含义是:当RabbitMQ的内存超过40%时,就会产生警告
并且阻塞所有生产者的连接。

通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效。

设置方式:配置文件

# 相对值
vm_memory_high_watermark.relative = 0.6
# 绝对值
vm_memory_high_watermark.absolute = 50MB

内存换页

在某个Broker节点及内存阻塞生产者之前,白会尝试将队到中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有-个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。

默认情况下,内存到达的阈值是50%时就会换页处理。

也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时, 会进行换页动作。

# 一般小于1
vm_memory_high_watermark_paging_ratio = 0.7

磁盘预警

当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。

默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。

这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一
次检查是: 60MB,第二检查可能就是1MB,就会出现警告。

分布式事务

分布式事务概念:分布式事务指事务的操作位于不同的节点上,需要保证事务的AICD特性。例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。

分布式事务的方式

两阶段提交(2PC),通过引入协调者(coordinator)来协调参与者的行为,并决定这些参与者是否真正要执行事务。

  • 缺点:同步阻塞、单点问题、数据不一致、没有容错机制。

补偿事务(TCC),TCC其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销) 操作。它分为三个阶段:

  • Try阶段主要是对业务系统做检测及资源预留
  • Confirm阶段主要是对业务系统做确认提交,下ry阶段执行成功并开始执行Confirm阶段时,默认一Confirm阶段是不会出错的,郎只要Try成功, Confirm一定成功。
  • Cancel阶段主要是在业务执行错误,需要 回滚的状态下执行的业务取消1预留资源释放。
  • 缺点:2、3步可能失败

本地消息表(异步确保),本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。

  • 缺点:消息表耦合到业务系统中,杂活较多。

MQ事务消息,异步场景,通用性较强,拓展性较高。

  • 第一阶段Prepared消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
  • 也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会 根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

生产可靠性

  • 为了保证数据一定发送到MQ中
  • 在同一事务中,增加一个冗余表(字段一般为消息内容和消息状态)的记录订单数据每条数据是否发送成功的状态
  • 然后利用RabbitMQ提供的publish/confirm机制开启确认机制后,如果消息正常发送到MQ就会获取到回执消息,然后把发送状态改成已发送状态
  • 如果

使用消息确认机制,给予生产者的消息回执,来确保生产者的可靠性。

打开确认机制:publisher-confirm-type: correlated

若消息确认机制收到的是一个未传输到MQ中的回执消息,则使用一个定时器,每隔一定时间便往MQ中进行投递,这样保证了消息一定可以投递到消息队列,然后进行数据修改。

// 被@PostConstruct修饰的方法会在服务器加载servlet的时候运行,并且只会被服务器执行一次,
// 在构造函数之后执行,在init()之前执行
@PostConstruct
public void regCallback() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String orderId = correlationData.getId();if (!ack) {System.out.println("消息传输失败");return ;}try {// 更新数据库(sql, orderId)}catch (Exception e) {e.printStackTrace();}}});
}

消费可靠性

如果消息队列已经到达了消息队列中,那么消费者可以直接从消息队列中取得数据,但是当一直没有想要的数据的时候,会产生死循环。

解决方案

  • 控制死循环的次数 + 死信队列(retry
  • try+catch+手动ack(acknowledge-mode:manual
  • try+catch+手动ack+死信队列处理

手动ack

/* 手动ack告诉mq消息正常消费 */
channel.basicAck(tag, false);
/**
* 第二个false表示当发生了nack时不重发
*/
channel.basicNAck(tag, false, false);
@RabbitListener(queues = {"", ""})
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {System.out.println("consumerDoAck: " + data);if (data.contains("success")) {// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费channel.basicAck(deliveryTag, false);} else {// 第三个参数true,表示这个消息会重新进入队列channel.basicNack(deliveryTag, false, true);}
}

RabbitMQ学习笔记和AMQP协议浅析相关推荐

  1. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

  2. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  3. 分布式消息中间件之RabbitMQ学习笔记[一]

    写在前面 嗯,陆续的整理一些中间件的笔记 今天和小伙伴们分享RabbitMQ 相关笔记 博文偏理论,内容涉及: RabbitMQ的简单介绍 AMQP协议标准介绍 RabbitMQ Demo 食用方式: ...

  4. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  5. RabbitMQ学习笔记(3)----RabbitMQ Worker的使用

    1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...

  6. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

  7. IOS之学习笔记十五(协议和委托的使用)

    1.协议和委托的使用 1).协议可以看下我的这篇博客 IOS之学习笔记十四(协议的定义和实现) https://blog.csdn.net/u011068702/article/details/809 ...

  8. RabbitMQ MQTT协议和AMQP协议

    RabbitMQ MQTT协议和AMQP协议 1        序言... 1 1.1     RabbitMq结构... 1 1.2     RabbitMq消息接收... 4 1.3     Ex ...

  9. RabbitMq学习笔记001---RabbitMq在Windows下安装配置

    rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,Rab ...

最新文章

  1. 设置select下拉框不可修改的→“四”←种方法
  2. arm ida 伪代码 安卓 符号表_IDA调试界面介绍及快捷键
  3. 微软技术能力测试工具V0.1试用
  4. 判断windows进程是否存在
  5. 容器created状态_docker容器状态的转换实现
  6. airpods2突然变得很小声_11岁女孩胸部发育被同学取笑,穿束胸衣上课突然晕倒...
  7. 面试题,你做了哪些事情来提升自己的沟通能力?
  8. Python编程学习笔记:列表
  9. 配置管理系统和整体变更系统的区别与联系
  10. 苹果电脑表格取消自动计算机,苹果电脑excel序列被隐藏怎么办
  11. 4939: [Ynoi2016]掉进兔子洞 莫队 压位
  12. 西游记中牛魔王的雄厚实力和家业地盘
  13. SlidesJS基本使用方法和官方文档解释 【Jquery幻灯片插件 Jquery相册插件】
  14. bootstrap的学习-基础样式和排版一
  15. ROS 简单的跨浏览器通信
  16. 无法打开包括文件: “opencv2/opencv_modules.hpp”
  17. 计算机教师教育笔记,信息技术教师的读书笔记
  18. h5前端开发培训,html5学习笔记
  19. 企业微信可以转移好友吗?
  20. 【荐号】有了它们,成功创业,成就事业巅峰,迎娶白富美,指日可待!

热门文章

  1. 【元胞自动机】元胞自动机交通流仿真【含Matlab源码 827期】
  2. JavaWeb项目打包成桌面程序,内嵌浏览器、tomcat、jre、mysql,实现一键安装
  3. Linux常用命令及使用方法(非常详细!!!)
  4. 杂散干扰解决办法_最全干扰解释-杂散-互调-阻塞
  5. MIKE 21 教程 2.8 水中构筑物(堰、涵洞、阀门、堤防、桥墩、涡轮机)
  6. STC51烧录程序时序分析
  7. KDJB1200六相继电保护测试仪
  8. 互联网行业常用数据分析指标
  9. What is Machine ID?
  10. SPSS常用的10种统计分析