RabbitMQ 学习笔记

  • RabbitMQ 学习笔记
    • 1. 中间件
      • 1.1 什么是中间件
      • 1.2 为什么要使用消息中间件
      • 1.3 中间件特点
      • 1.4 在项目中什么时候使用中间件技术
    • 2. 中间件技术及架构的概述
      • 2.1 学习中间件的方式和技巧
      • 2.2 学习目标
      • 2.3 什么是消息中间件
      • 2.4 单体架构
      • 2.5 分布式架构
    • 3. 基于消息中间件的分布式系统的架构
      • 3.1 消息中间件应用的场景
      • 3.2 常见的消息中间件
      • 3.4 消息中间件的本质及设计
      • 3.5 消息中间件的核心组成部分
      • 3.6 小结
    • 4. 消息队列协议
      • 4.1 什么是协议
      • 4.2 网络协议的三要素
      • 4.3 AMQP 协议
      • 4.4 MQTT 协议
      • 4.5 OpenMessage 协议
      • 4.6 Kafka 协议
      • 4.7 小结
    • 5. 消息队列持久化
      • 5.1 持久化
      • 5.2 常见的持久化方式
    • 6. 消息的分发策略
      • 6.2 场景分析一
      • 6.3 场景分析二
      • 6.4 消息分发策略的机制和对比
    • 7. 消息队列高可用和高可靠
      • 7.1 什么是高可用机制
      • 7.2 集群模式 1 - Master-slave 主从共享数据的部署方式
      • 7.3 集群模式 2 - Master- slave 主从同步部署方式
      • 7.4 集群模式 3 - 多主集群同步部署模式
      • 7.5 集群模式 4 - 多主集群转发部署模式
      • 7.6 集群模式 5 Master-slave 与 Broker-cluster 组合的方案
      • 7.7 什么是高可靠机制
    • 8. RabbitMQ 入门及安装
      • 8.1 概述
      • 8.2 Erlang 安装
        • 8.2.1 安装 rpm
        • 8.2.2 下载 erlang
        • 8.2.3 查看版本号
        • 8.2.4 安装 socat
      • 8.3 安装 RabbitMQ
        • 8.3.1 下载 RabbitMQ
        • 8.3.2 启动 RabbitMQ 服务
        • 8.3.3 RabbitMQ 的配置
        • 8.3.4 相关端口
    • 9. RabbitMQ Web 管理界面及授权操作
      • 9.1 RabbitMQ 管理界面
      • 9.2 授权账号和密码
      • 9.3 小结
    • 10. RabbitMQ 之 Docker 安装
      • 10.1 Docker 安装 RabbitMQ
        • 10.1.1 虚拟化容器技术—Docker 的安装
        • 10.1.2 Docker 的相关命令
        • 10.1.3 安装 RabbitMQ
      • 10.2 额外 Linux 相关排查命令
    • 11. RabbitMQ 的角色分类
      • 11.1 None
      • 11.2 Management
      • 11.3 Policymaker
      • 11.4 Monitoring
      • 11.5 Administrator
    • 12. AMQP
      • 12.1 什么是 AMQP
      • 12.2 AMQP 生产者流转过程
      • 12.3 AMQP 消费者流转过程
    • 13. RabbitMQ 的核心组成部分
      • 13.1 RabbitMQ 的核心组成部分
      • 13.2 RabbitMQ 整体架构
      • 13.3 RabbitMQ 的运行流程
      • 13.4 RabbitMQ 支持消息的模式
    • 14. RabbitMQ 入门案例 - Simple 简单模式
      • 14.1 实现步骤
      • 14.2 构建一个 Maven 工程
      • 14.3 导入 RabbitMQ 的 Maven 依赖
        • 14.3.1 Java 原生依赖
        • 14.3.2 Spring 依赖
        • 14.3.3 SpringBoot 依赖
      • 14.4 启动启动 rabbitmq-server 服务
      • 14.5 定义生产者
      • 14.6 定义消费者
    • 15. RabbitMQ 入门案例 - Fanout 模式
    • 16. RabbitMQ 入门案例 - Direct 模式
    • 17. RabbitMQ 入门案例 - Work 模式
      • 17.1 轮询模式 Round-Robin
      • 17.2 公平模式 Fair
    • 18. RabbitMQ 使用场景
      • 异步、削峰、解耦
      • 18.1 解耦
        • 18.1.1 同步异步的问题
        • 18.1.2 并行方式 异步线程池
        • 18.1.3 异步消息队列的方式
      • 18.2 解耦
      • 18.3 削峰
      • 18.4 其他场景
    • 19. RabbitMQ 入门案例 - 完整创建
    • 20. RabbitMQ-SpringBoot 案例 - Fanout 模式
    • 21. RabbitMQ-SpringBoot 案例 - Direct 模式
    • 22. RabbitMQ-SpringBoot 案例 - Topic 模式
    • 23. RabbitMQ-SpringBoot 案例 - Work 模式
    • 24. RabbitMQ 的内存警告
      • 24.1 RabbitMQ 的内存警告
      • 24.2 RabbitMQ 的内存控制
        • 24.2.1 命令的方式
        • 24.2.2 配置文件方式 rabbitmq.conf
      • 24.3 RabbitMQ 的内存换页
      • 24.4 RabbitMQ 的磁盘预警
    • 25. RabbitMQ-SpringBoot 案例 - 集群部署
    • SpringBoot 整合 RabbitMQ 集群配置详解
    • RabbitMQ - 高级 - 集群
      • 1. RabbitMQ 集群
      • 2. 集群搭建
      • 3. 单机多实例搭建
        • 3.1 启动第一个节点 rabbit-1
        • 3.2 启动第二个节点 rabbit-2
        • 3.3 验证启动
        • 3.4 rabbit-1 操作为主节点
        • 3.5 rabbit-2 操作为从节点
        • 3.6 验证集群状态
        • 3.7 Web 监控
        • 3.8 小结
    • RabbitMQ - 高级 - 过期时间 TTL
    • RabbitMQ - 高级 - 死信队列
    • RabbitMQ - 高级 - 分布式事务
      • 1. 简述
      • 2. 分布式事务的方式
        • 2.1 2PC
        • 2.2 TCC
        • 2.3 本地消息表
        • 2.4 MQ 事务消息
        • 2.5 总结
      • 3. 具体实现
        • 3.1 分布式事务的完整架构图如下
        • 3.2 分布式系统分布式事务问题
        • 3.3 基于 MQ 的分布式事务整体设计思路
        • 3.4 基于 MQ 的分布式事务消息的可靠生产问题
        • 3.5 基于 MQ 的分布式事务消息的可靠生产问题 - 定时重发
        • 3.6 基于 MQ 的分布式事务消息的可靠消费
        • 3.7 基于 MQ 的分布式事务消息的消息重发
        • 3.8 基于 MQ 的分布式事务消息的死信队列消息转移 + 人工处理
        • 3.9 基于 MQ 的分布式事务消息的死信队列消息重试注意事项
        • 3.10 基于 MQ 的分布式事务消息的定时重发
      • 4. 总结
        • 优点
        • 缺点
        • 建议
    • RabbitMQ 面试题
      • 1. RabbitMQ 为什么需要信道,为什么不是 TCP 直接通信
      • 2. QUEUE 队列到底在消费者创建还是生产者创建?
      • 3. 可以存在没有交换机的队列吗?

RabbitMQ 学习笔记

1. 中间件

1.1 什么是中间件

我国企业从 20 世纪 80 年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务和市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。

中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件 = 平台 + 通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件 = 平台 + 通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。

举例:

  1. RMI(Remote Method Invocations, 远程调用)
  2. Load Balancing(负载均衡,将访问负荷分散到各个服务器中)
  3. Transparent Fail-over(透明的故障切换)
  4. Clustering(集群, 用多个小的服务器代替大型机)
  5. Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
  6. Transaction 事务(全局 / 局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)
  7. Dynamic Redeployment(动态重新部署, 在不停止原系统的情况下,部署新的系统)
  8. System Management(系统管理)
  9. Threading(多线程处理)
  10. Message-oriented Middleware 面向消息的中间件(异步的调用编程)
  11. Component Life Cycle(组件的生命周期管理)
  12. Resource pooling(资源池)
  13. Security(安全)
  14. Caching(缓存)

1.2 为什么要使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

1.3 中间件特点

为解决分布异构问题,人们提出了中间件 (middleware) 的概念。中间件是位于平台 (硬件和操作系统) 和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口和协议规范的多种实现。

也许很难给中间件一个严格的定义,但中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和 OS 平台
(3)支持分布计算,提供跨网络、硬件和 OS 平台的透明性的应用或服务的交互
(4)支持标准的协议
(5)支持标准的接口

由于标准接口对于可移植性和标准协议对于互操作性的重要性,中间件已成为许多标准化工作的主要部分。对于应用软件开发,中间件远比操作系统和网络服务更为重要,中间件提供的程序接口定义了一个相对稳定的高层应用环境,不管底层的计算机硬件和系统软件怎样更新换代,只要将中间件升级更新,并保持中间件对外的接口定义不变,应用软件几乎不需任何修改,从而保护了企业在应用软件开发和维护中的重大投资。

简单说:中间件有个很大的特点,是脱离于具体设计目标,而具备提供普遍独立功能需求的模块。这使得中间件一定是可替换的。如果一个系统设计中,中间件是不可替换的,不是架构、框架设计有问题,那么就是这个中间件,在别处可能是个中间件,在这个系统内是引擎。

1.4 在项目中什么时候使用中间件技术

在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,中间件的技术一般现在一些互联网公司或者项目中使用比较多,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑。

但是作为一个开放人员,一定要有学习中间件技术的能力和思维,否则很容易当项目发展到一个阶段在去掌握估计或者在面试中提及,就会给自己带来不小的困扰,在当今这个时代这些技术也并不是什么新鲜的东西,如果去掌握和挖掘最关键的还是自己花时间和花精力去探讨和研究。

2. 中间件技术及架构的概述

2.1 学习中间件的方式和技巧

  1. 理解中间件在项目架构中的作用,以及各中间件的底层实现。
  2. 可以使用一些类比的生活概念去理解中间件,
  3. 使用一些流程图或者脑图的方式去梳理各个中间件在架构中的作用
  4. 尝试用 Java 技术去实现中间件的远离
  5. 静下来去思考中间件在项目中设计的和使用的原因
  6. 如果找到对应的替代总结方案
  7. 尝试编写博文总结类同中间件技术的对比和使用场景。
  8. 学会查看中间件的源码以及开开源项目和博文。

2.2 学习目标

  • 什么是消息中间件
  • 什么是协议
  • 什么是持久化
  • 消息分发
  • 消息的高可用
  • 消息的集群
  • 消息的容错
  • 消息的冗余

2.3 什么是消息中间件

在实际的项目中,大部分的企业项目开发中,在早期都采用的是单体的架构模式,如下图:

2.4 单体架构

在企业开发的中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个一工程中,如果其中的一个模块升级或者迭代发生一个很小变动都会重新编译和重新部署项目。 这种的架构存在的问题就是:

  1. 耦合度太高
  2. 运维的成本过高
  3. 不易维护
  4. 服务器的成本高
  5. 以及升级架构的复杂度也会增大

这样就有后续的分布式架构系统。如下

2.5 分布式架构

何谓分布式系统呢:

通俗一点:就是一个请求由服务器端的多个服务(服务或者系统)协同处理完成

和单体架构不同的是,单体架构是一个请求发起 JVM 调度线程(确切的是 tomcat 线程池)分配线程 Thread 来处理请求直到释放,而分布式是系统是:一个请求是由多个系统共同来协同完成,JVM 和环境都可能是独立。如果生活中的比喻的话,单体架构就想建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展都后期的时候要去部署和思考的问题。我们也不难看出来,分布式架构系统存在的优缺点如下:

缺点

  1. 学习成本高,技术栈过多
  2. 运维成本和服务器成本增高
  3. 人员的成本也会增高
  4. 项目的负载度也会上升
  5. 面临的错误和容错性也会成倍增加
  6. 占用的服务器端口和通讯的选择的成本高
  7. 安全性的考虑和因素逼迫可能选择 RMI/MQ 相关的服务器端通讯。

优点

  1. 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资源,不造成服务器资源的浪费
  2. 系统的独立维护和部署,耦合度降低,可插拔性。
  3. 系统的架构和技术栈的选择可以变的灵活(而不是单纯的选择 Java)
  4. 弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态。

3. 基于消息中间件的分布式系统的架构

从上图中可以看出来,消息中间件的是

  1. 利用可靠的消息传递机制进行系统和系统直接的通讯
  2. 通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程间的通讯。

3.1 消息中间件应用的场景

  1. 跨系统数据传递
  2. 高并发的流量削峰
  3. 数据的分发和异步处理
  4. 大数据分析与传递
  5. 分布式事务

比如你有一个数据要进行迁移或者请求并发过多的时候,比如你有 10W 的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行。

3.2 常见的消息中间件

ActiveMQ、RabbitMQ、Kafka、RocketMQ 等。

3.4 消息中间件的本质及设计

它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。

MQ 消息队列:负责数据的传接受,存储和传递,所以性能要过于普通服务和技术。

谁来生产消息,存储消息和消费消息呢?

3.5 消息中间件的核心组成部分

  1. 消息的协议
  2. 消息的持久化机制
  3. 消息的分发策略
  4. 消息的高可用,高可靠
  5. 消息的容错机制

3.6 小结

其实不论选择单体架构还是分布式架构都是项目开发的一个阶段,在什么阶段选择适合的架构方式,而不能盲目追求,最后造成的后果和问题都需要自己买单。但是作为一个开发人员学习和探讨新的技术是我们每个程序开发者都应该去保持和思考的问题。当我们没办法去改变社会和世界的时候,我们为了生活和生存那就必须要迎合企业和市场的需求,发挥你的价值和所学的才能,创造价值和实现自我。

4. 消息队列协议

4.1 什么是协议

我们知道消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的 TCP/IP,UDP 协议还是其他的自己取构建等,而这些约定成俗的规范就称之为:协议。

所谓协议是指:

  1. 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。
  2. 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高。
  3. 协议对数据格式和计算机之间交换数据都必须严格遵守规范。

4.2 网络协议的三要素

  1. 语法语法是用户数据与控制信息的结构与格式, 以及数据出现的顺序。
  2. 语义语义是解释控制信息每个部分的意义。它规定了需要发出何种控制信息, 以及完成的动作与做出什么样的响应。
  3. 时序时序是对事件发生顺序的详细说明。

比如我 MQ 发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的响应结果和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的 http 请求协议:

  1. 语法:http 规定了请求报文和响应报文的格式。
  2. 语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是 post/get 请求)
  3. 时序:一个请求对应一个响应。(一定先有请求在有响应,这个是时序)

而消息中间件采用的并不是 http 协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage 协议。

面试题:为什么消息中间件不直接使用 http 协议呢?

  1. 因为 http 请求报文头和响应报文头是比较复杂的,包含了 cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。
  2. 大部分情况下 http 大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

4.3 AMQP 协议

AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。
特性:

  1. 分布式事务支持。
  2. 的持久化支持。
  3. 高性能和高可靠的消息处理优势。

AMQP 协议的支持者:

4.4 MQTT 协议

MQTT 协议:(Message Queueing Telemetry Transport)消息队列是 IBM 开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
特点:

  1. 轻量
  2. 结构简单
  3. 传输快,不支持事务
  4. 没有持久化设计。

应用场景:

  1. 适用于计算能力有限
  2. 低带宽
  3. 网络不稳定的场景。

支持者:

4.5 OpenMessage 协议

是近几年由阿里、雅虎和滴滴出行、Stremalio 等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:

  1. 结构简单
  2. 解析速度快
  3. 支持事务和持久化设计。

4.6 Kafka 协议

Kafka 协议是基于 TCP/IP 的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。
特点是:

  1. 结构简单
  2. 解析速度快
  3. 无事务支持
  4. 有持久化设计

4.7 小结

协议:是在 TCP/IP 协议基础之上构建的一种约定成俗的规范和机制、它的主要目的可以让客户端(应用程序 Java、GO)进行沟通和通讯,并且这种协议下规范必须具有持久性,高可用,高可靠的性能。

5. 消息队列持久化

5.1 持久化

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

5.2 常见的持久化方式

ActiveMQ RabbitMQ Kafka RocketMQ
文件存储 支持 支持 支持 支持
数据库 支持 / / /

6. 消息的分发策略

MQ 消息队列有如下几个角色

  1. 生产者
  2. 存储消息
  3. 消费者

那么生产者生成消息以后,MQ 进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的 git 就有推拉机制,我们发送的 http 请求就是一种典型的拉取数据库数据返回的过程。而消息队列 MQ 是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。

6.2 场景分析一

比如我在 APP 上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论。

6.3 场景分析二

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费 MQ 接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发。

6.4 消息分发策略的机制和对比

ActiveMQ RabbitMQ Kafka RocketMQ
发布订阅 支持 支持 支持 支持
轮询分发 支持 支持 支持 /
公平分发 / 支持 支持 /
重发 支持 支持 / 支持
消息拉取 / 支持 支持 支持

7. 消息队列高可用和高可靠

7.1 什么是高可用机制

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU, 内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。

7.2 集群模式 1 - Master-slave 主从共享数据的部署方式

解释:生产者讲消费发送到 Master 节点,所有的都连接这个消息队列共享这块数据区域,Master 节点负责写入,一旦 Master 挂掉,slave 节点继续服务。从而形成高可用,

7.3 集群模式 2 - Master- slave 主从同步部署方式

解释:这种模式写入消息同样在 Master 主节点上,但是主节点会同步数据到 slave 节点形成副本,和 zookeeper 或者 Redis 主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会暂用很大的带宽和网络资源。在后续的 RabbitMQ 中会有使用。

7.4 集群模式 3 - 多主集群同步部署模式

解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入。

7.5 集群模式 4 - 多主集群转发部署模式

解释:如果你插入的数据是 broker-1 中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。
它会对描述信息也就是元数据信息就行同步,如果消费者在 broker-2 中进行消费,发现自己几点没有对应的消息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。

7.6 集群模式 5 Master-slave 与 Broker-cluster 组合的方案

解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高。

这么集群模式,具体在后续的课程中会进行一个分析和讲解。他们的最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以抱着消息服务继续使用。

反正终归三句话:

  1. 要么消息共享,
  2. 要么消息同步
  3. 要么元数据共享

7.7 什么是高可靠机制

所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:

  1. 消息的传输:通过协议来保证系统间数据解析的正确性。
  2. 消息的存储可靠:通过持久化来保证消息的可靠性。

8. RabbitMQ 入门及安装

8.1 概述

官网:https://www.rabbitmq.com/
什么是 RabbitMQ, 官方给出来这样的解释:

RabbitMQ is the most widely deployed open source message broker.
With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. From T-Mobile to Runtastic, RabbitMQ is used worldwide at small startups and large enterprises.
RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
RabbitMQ runs on many operating systems and cloud environments, and provides a wide range of developer tools for most popular languages.
翻译以后:
RabbitMQ 是部署最广泛的开源消息代理。
RabbitMQ 拥有成千上万的用户,是最受欢迎的开源消息代理之一。从 T-Mobile 到 Runtastic,RabbitMQ 在全球范围内的小型初创企业和大型企业中都得到使用。
RabbitMQ 轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
RabbitMQ 可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。

简单概述:
RabbitMQ 是一个开源的遵循 AMQP 协议实现的基于 Erlang 语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征

安装 RabbitMQ

  1. 下载地址:https://www.rabbitmq.com/download.html
  2. 环境准备:CentOS7.x+ / Erlang

RabbitMQ 是采用 Erlang 语言开发的,所以系统环境必须提供 Erlang 环境,第一步就是安装 Erlang。

erlang 和 RabbitMQ 版本的按照比较: https://www.rabbitmq.com/which-erlang.html

8.2 Erlang 安装

查看系统版本号

[root@iZm5eauu5f1ulwtdgwqnsbZ ~]# lsb_release -a
LSB Version:    :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description:    CentOS Linux release 8.3.2011
Release:        8.3.2011
Codename:       n/a

8.2.1 安装 rpm

参考地址:https://www.erlang-solutions.com/downloads/

wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm

8.2.2 下载 erlang

yum install -y erlang

8.2.3 查看版本号

erl -v

8.2.4 安装 socat

yum install -y socat

8.3 安装 RabbitMQ

下载地址:https://www.rabbitmq.com/download.html

8.3.1 下载 RabbitMQ

  1. wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.13/rabbitmq-server-3.8.13-1.el8.noarch.rpm
    
  2. rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm
    
  3. yum install rabbitmq-server -y
    

8.3.2 启动 RabbitMQ 服务

# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 开机启动服务
systemctl enable rabbitmq-server

8.3.3 RabbitMQ 的配置

RabbitMQ 默认情况下有一个配置文件,定义了 RabbitMQ 的相关配置信息,默认情况下能够满足日常的开发需求。如果需要修改需要,需要自己创建一个配置文件进行覆盖。
参考官网:

  1. https://www.rabbitmq.com/documentation.html
  2. https://www.rabbitmq.com/configure.html
  3. https://www.rabbitmq.com/configure.html#config-items
  4. https://github.com/rabbitmq/rabbitmq-server/blob/add-debug-messages-to-quorum_queue_SUITE/docs/rabbitmq.conf.example

8.3.4 相关端口

端口 用途
5672 RabbitMQ 的通讯端口
25672 RabbitMQ 的节点间的 CLI 通讯端口
15672 RabbitMQ HTTP_API 的端口,管理员用户才能访问,用于管理 RabbitMQ,需要启动 Management 插件。
1883、8883 MQTT 插件启动时的端口。
61613、61614 STOMP 客户端插件启用的时候的端口。
15674、15675 基于 webscoket 的 STOMP 端口和 MOTT 端口

一定要注意:RabbitMQ 在安装完毕以后,会绑定一些端口,如果购买的是阿里云或者腾讯云相关的服务器一定要在安全组中把对应的端口添加到防火墙

9. RabbitMQ Web 管理界面及授权操作

9.1 RabbitMQ 管理界面

  1. 默认情况下,RabbitMQ 是没有安装 web 端的客户端插件,需要安装才可以生效

    rabbitmq-plugins enable rabbitmq_management
    

    说明:RabbitMQ 有一个默认账号和密码是:guest 默认情况只能在 localhost 本机下访问,所以需要添加一个远程登录的用户。

  2. 安装完毕以后,重启服务即可

    systemctl restart rabbitmq-server
    

    一定要记住,在对应服务器 (阿里云,腾讯云等) 的安全组中开放 15672 的端口

  3. 在浏览器访问

    http://ip:15672/ 如下:

9.2 授权账号和密码

  1. 新增用户

    rabbitmqctl add_user admin admin
    
  2. 设置用户分配操作权限

    rabbitmqctl set_user_tags admin administrator
    

用户级别

  1. administrator 可以登录控制台、查看所有信息、可以对 RabbitMQ 进行管理

  2. monitoring 监控者 登录控制台,查看所有信息

  3. policymaker 策略制定者 登录控制台, 指定策略

  4. managment 普通管理员 登录控制台

  5. 为用户添加资源权限

    rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
    

9.3 小结

rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号
administratorrabbitmqctl change_password Username Newpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置 administrator 角色
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

10. RabbitMQ 之 Docker 安装

10.1 Docker 安装 RabbitMQ

10.1.1 虚拟化容器技术—Docker 的安装

# yum 包更新到最新
yum update
# 安装需要的软件包, yum-util 提供 yum-config-manager 功能,另外两个是 devicemapper 驱动依赖的
yum install -y yum-utils device-mapper-persistent-data lvm2
# 设置 yum 源为阿里云
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装 docker>
yum install docker-ce -y
# 安装后查看 docker 版本
docker -v
# 安装加速镜像
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
} EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

10.1.2 Docker 的相关命令

# 启动 docker
systemctl start docker
# 停止 docker
systemctl stop docker
# 重启 docker
systemctl restart docker
# 查看 docker 状态
systemctl status docker
# 开机启动
systemctl enable docker
systemctl unenable docker
# 查看 docker 概要信息
docker info
# 查看 docker 帮助文档
docker --help
# 查看所有镜像
docker ps -a
# 根据 docker ID 来启动镜像
docker start 0a2a48c56eb7

10.1.3 安装 RabbitMQ

参考网站:

  1. https://www.rabbitmq.com/download.html

  2. https://registry.hub.docker.com/_/rabbitmq/

  3. 获取 RabbitMQ 镜像

    docker pull rabbitmq:management
    
  4. 创建并运行容器

    docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
    

    —hostname:指定容器主机名称
    —name:指定容器名称
    -p:将 mq 端口号映射到本地
    或者运行时设置用户和密码

    docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e
    RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672
    -p 61613:61613 -p 1883:1883 rabbitmq:management
    

    查看日志

    docker logs -f myrabbit
    
  5. 容器运行正常

    使用 http:// 你的 IP 地址: 15672 访问 RabbitMQ 控制台

10.2 额外 Linux 相关排查命令

# 查看日记信息
more xxx.log  >
# 查看端口是否被占用
netstat -naop | grep 5672
# 查看进程
ps -ef | grep 5672
# 停止服务
systemctl stop '服务名称'

11. RabbitMQ 的角色分类

11.1 None

  • 不能访问 management plugin(相当于没有权限,一般不会使用)

11.2 Management

  • 查看字节相关节点信息

  • 列出自己通过 AMQP 登入的虚拟机

  • 查看自己的虚拟机节点 virtual hosts 的 queues、exchanges 和 bindings 信息

  • 查看和关闭自己的 channels 和 connections

  • 查看自己有关的虚拟机节点 virtual hosts 的统计信息。包括其他用户

11.3 Policymaker

  • 包含 management 所有权限
  • 查看、创建和删除自己的 virtual hosts 所属的 polices 和 parameters 信息

11.4 Monitoring

  • 包含 management 所有权限
  • 罗列出所有的 virtual hosts,包括不能登录 virtual hosts
  • 查看其他用户的 connections 和 channels 信息
  • 查看节点级别的数据如 clustering 和 memory 使用情况
  • 查看所有的 virtual hosts 的全局统计信息

11.5 Administrator

  • 最高权限
  • 可以创建和删除 virtual hosts
  • 可以查看、创建和删除 users
  • 查看创建 permissions
  • 关闭所有用户的 connections

12. AMQP

12.1 什么是 AMQP

AMQP 全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计

12.2 AMQP 生产者流转过程

12.3 AMQP 消费者流转过程

13. RabbitMQ 的核心组成部分

13.1 RabbitMQ 的核心组成部分

核心概念:

*Server:又称 Broker,接受客户端的连接,实现 AMQP 实体服务。安装 rabbitmq-server
*Connection:连接,应用程序与 Broker 的网络连接。TCP/IP 的三次握手和四次挥手
*Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道,客户端可以建立 Channel,每个 Channel 代表一个会话任务。
*Message:消息:服务与应用程序之间传送的数据,由 Properties 和 body 组成,Properties 可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body 则是消息体的内容。
*Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理应可以有若干个 Exchange 和 Queue,同一个虚拟主机里面不能有相同名字的 Exchange。
*Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
*Bindings:Exchange 和 Queue 之间的虚拟连接,binding 中可以保护多个 routing key。
*Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
*Queue:队列:也称为 Message Queue,消息队列,保存消息并将它们转发给消费者。

13.2 RabbitMQ 整体架构

13.3 RabbitMQ 的运行流程

13.4 RabbitMQ 支持消息的模式

  1. 简单模式 Simple
  2. 工作模式 Work。特点:分发机制
  3. 发布订阅模式 Fanout。特点:Fanout——发布与订阅模式是一种广播机制,它是没有路由 key 的模式。
  4. 路由模式 Direct。特点:有 routing-key 的匹配模式。
  5. 主题模式 Topic。特点,模糊的 routing-key 的匹配模式
  6. 参数模式 Headers。特点:参数匹配模式

小结:RabbitMQ 发送消息一定有一个交换机,如果队列没有交换机会默认绑定一个交换机

14. RabbitMQ 入门案例 - Simple 简单模式

14.1 实现步骤

  1. JDK 1.8
  2. 构建一个 Maven 工程
  3. 导入 rabbitmq 的 Maven 依赖
  4. 启动 rabbitmq-server 服务
  5. 定义生产者
  6. 定义消费者
  7. 观察消息在 rabbitmq-server 服务中的过程

14.2 构建一个 Maven 工程

工程的目录结构如下:

14.3 导入 RabbitMQ 的 Maven 依赖

14.3.1 Java 原生依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>

14.3.2 Spring 依赖

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.5.RELEASE</version>
</dependency>
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.5.RELEASE</version>
</dependency>

14.3.3 SpringBoot 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.3</version>
</dependency>

根据自己的项目环境选择依赖导入即可。

14.4 启动启动 rabbitmq-server 服务

14.5 定义生产者

package com.example.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** <p>* 简单模式——消息生产者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Producer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 准备发送消息 ➡ 发送消息给队列 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue1";/*** @param1 队列的名称* @param2 是否要持久化,存盘,false 非持久化,true 持久化* @param3 排他性,是否独占独立* @param4 是否自动删除,随着最后一个消费者消费完毕消息以后是否把队列自动删除* @param5 携带附属参数*/channel.queueDeclare(queueName, false, false, false, null);// 5. 准备消息内容String message = "Hello RabbitMQ!";// 6. 发送消息给队列 queue/*** @param1 交换机名称* @param2 队列名称、路由 key* @param3 消息的状态控制* @param4 消息主体*/channel.basicPublish("", queueName, null, message.getBytes());System.out.println("消息发送成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

14.6 定义消费者

package com.example.rabbitmq.simple;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;/*** <p>* 简单模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue1";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息接收失败……");}});System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

15. RabbitMQ 入门案例 - Fanout 模式

生产者

package com.example.rabbitmq.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** <p>* 发布订阅模式——消息生产者* 类型:fanout。发布与订阅模式,是一种广播机制,它是没有路由 key 的模式。* </p>** @author YangShuKai* @date 2021/3/18*/
public class Producer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 准备发送消息 ➡ 发送消息给队列 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 准备交换机String exchangeName = "fanout-exchange";// 6. 指定交换机的类型String exchangeType = "fanout";channel.exchangeDeclare(exchangeName, exchangeType, true);// 5. 定义路由 keyString routeKey = "";// 7. 准备消息内容String message = "Fanout-Hello RabbitMQ!";channel.queueDeclare("queue3", true, false, false, null);channel.queueBind("queue3", exchangeName, routeKey);// 8. 发送消息给队列 queue/*** @param1 交换机名称* @param2 队列名称、路由 key* @param3 消息的状态控制* @param4 消息主体*/channel.basicPublish(exchangeName, routeKey, null, message.getBytes());System.out.println("消息发送成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者

package com.example.rabbitmq.fanout;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;/*** <p>* 发布订阅模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("消费者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue3";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息接收失败……");}});System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

16. RabbitMQ 入门案例 - Direct 模式

生产者

package com.example.rabbitmq.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** <p>* 路由模式——消息生产者* 类型:direct。有 routing-key 的匹配模式* </p>** @author YangShuKai* @date 2021/3/18*/
public class Producer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 准备发送消息 ➡ 发送消息给队列 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 准备交换机String exchangeName = "direct-exchange";// 6. 指定交换机的类型String exchangeType = "direct";/*** 第三个参数,是否持久化,所谓的持久化指的是交换机不会随着服务器重启而造成丢失,如果是 true 代表不丢失,false 会丢失*/channel.exchangeDeclare(exchangeName, exchangeType, true);// 5. 定义路由 keyString routeKey = "email";// 7. 准备消息内容String message = "Direct-Hello RabbitMQ!";channel.queueDeclare("queue2", true, false, false, null);channel.queueBind("queue2", exchangeName, routeKey);// 8. 发送消息给队列 queue/*** @param1 交换机名称* @param2 队列名称、路由 key* @param3 消息的状态控制* @param4 消息主体*/channel.basicPublish(exchangeName, routeKey, null, message.getBytes());System.out.println("消息发送成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者

package com.example.rabbitmq.direct;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;/*** <p>* 路由模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("消费者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue2";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息接收失败……");}});System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

17. RabbitMQ 入门案例 - Work 模式

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

  1. 轮询模式的分发:一个消费者一条,按均分配;
  2. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理得多,处理慢得处理得少;按劳分配;

17.1 轮询模式 Round-Robin

特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;

生产者

package com.example.rabbitmq.work.roundRobin;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** <p>* 工作模式——消息生产者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Producer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 准备发送消息 ➡ 发送消息给队列 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue-roundRobin";/*** @param1 队列的名称* @param2 是否要持久化,存盘,false 非持久化,true 持久化* @param3 排他性,是否独占独立* @param4 是否自动删除,随着最后一个消费者消费完毕消息以后是否把队列自动删除* @param5 携带附属参数*/channel.queueDeclare(queueName, false, false, false, null);// 5. 准备消息内容// 6. 发送消息给队列 queue/*** @param1 交换机名称* @param2 队列名称、路由 key* @param3 消息的状态控制* @param4 消息主体*/for (int i = 0; i < 20; i++) {String message = "RoundRobin Hello RabbitMQ==>"+i;channel.basicPublish("", queueName, null, message.getBytes());}System.out.println("消息发送成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者 1

package com.example.rabbitmq.work.roundRobin;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;/*** <p>* 工作模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer1 {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue-roundRobin";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {// 使用线程延迟 2 秒只是为了观察轮询模式接收消息的过程,不会影响结果try {System.out.println("Consumer1 - 收到消息是:" + new String(message.getBody(), "UTF-8"));Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息接收失败……");}});System.out.println("Consumer1 - 开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者 2

package com.example.rabbitmq.work.roundRobin;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;/*** <p>* 工作模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer2 {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue-roundRobin";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("Consumer2 - 收到消息是:" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息接收失败……");}});System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

17.2 公平模式 Fair

特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是按劳分配,多劳多得,直至消息消费完成;

生产者

package com.example.rabbitmq.work.fair;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** <p>* 工作模式——消息生产者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Producer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 准备发送消息 ➡ 发送消息给队列 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue-fair";/*** @param1 队列的名称* @param2 是否要持久化,存盘,false 非持久化,true 持久化* @param3 排他性,是否独占独立* @param4 是否自动删除,随着最后一个消费者消费完毕消息以后是否把队列自动删除* @param5 携带附属参数*/channel.queueDeclare(queueName, false, false, false, null);// 5. 准备消息内容// 6. 发送消息给队列 queue/*** @param1 交换机名称* @param2 队列名称、路由 key* @param3 消息的状态控制* @param4 消息主体*/for (int i = 0; i < 20; i++) {String message = "Fair Hello RabbitMQ==>"+i;channel.basicPublish("", queueName, null, message.getBytes());}System.out.println("消息发送成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者 1

package com.example.rabbitmq.work.fair;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** <p>* 工作模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer1 {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();Channel finalChannel = channel;// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue-fair";channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer1 - 收到消息是:" + message);// 使用线程延迟 2 秒只是为了观察轮询模式接收消息的过程,不会影响结果try {Thread.sleep(1000);// 一定要使用收到应答} catch (InterruptedException e) {e.printStackTrace();}finally {finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};finalChannel.basicConsume(queueName, false, deliverCallback,consumerTag -> { });System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者 2

package com.example.rabbitmq.work.fair;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** <p>* 工作模式——消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer2 {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();Channel finalChannel = channel;// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue-fair";channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer2 - 收到消息是:" + message);// 使用线程延迟 2 秒只是为了观察轮询模式接收消息的过程,不会影响结果try {Thread.sleep(2000);// 一定要使用收到应答} catch (InterruptedException e) {e.printStackTrace();}finally {finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};finalChannel.basicConsume(queueName, false, deliverCallback,consumerTag -> { });System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

18. RabbitMQ 使用场景

异步、削峰、解耦

18.1 解耦

18.1.1 同步异步的问题

串行方式:将订单信息写入数据库成功后发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

代码

public void makeOrder() {// 1. 保存订单orderService.saveOrder();// 2. 发送短信服务messageService.sendSms("order");// 3. 发送 email 服务emailService.sendEmail("order");// 4. 发送 app 服务appService.sendApp("order");
}

18.1.2 并行方式 异步线程池

并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

代码

public void makeOrder() {// 1. 保存订单orderService.saveOrder();// 相关发送relationMessage();
}public void relationMessage() {// 异步threadPool.submit(new Callable<Object>{public Object call() {// 2. 发送短信服务messageService.sendSMS("order");}});// 异步threadPool.submit(new Callable<Object>{public Object call() {// 3. 发送短信服务emailService.sendEmail("order");}});// 异步threadPool.submit(new Callable<Object>{public Object call() {// 4. 发送 app 服务appService.sendApp("order");}});
}

存在问题:

  1. 耦合度高
  2. 需要自己写线程池,自己维护成本太高
  3. 出现了消息可能会丢失,需要自己做消息补偿
  4. 如何保证消息的可靠性需要自己写
  5. 如果服务器承载不了,需要自己去写高可用

18.1.3 异步消息队列的方式

代码

public void makeOrder() {// 1. 保存订单orderService.saveOrder();rabbitTemplate.convertSend("ex","2","消息内容");
}

好处

  1. 完全解耦,用 MQ 建立连接
  2. 有独立的线程池和运行模型
  3. 出现了消息可能会丢失,MQ 有持久化功能
  4. 如何保证消息的可靠性,死信队列和消息转移等
  5. 如果服务器承载不了,需要自己去写高可用,HA 镜像模型高可用

按照以上约定,用户的响应时间相当于是订单消息写入数据库的时间,也就是 50 毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是 50 毫秒。因此架构改变后,系统的吞吐量提高到每秒 20 QPS。比串行提高了 3 倍,比并行提高了 2 倍。

18.2 解耦

18.3 削峰

18.4 其他场景

  1. 分布式事务的可靠消费和可靠生产
  2. 索引、缓存、静态化处理的数据同步
  3. 流量监控
  4. 日志监控(ELK)
  5. 下单、订单分发、抢票

19. RabbitMQ 入门案例 - 完整创建

生产者

package com.example.rabbitmq.all;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** <p>* 路由模式——消息生产者* 类型:direct。有 routing-key 的匹配模式* </p>** @author YangShuKai* @date 2021/3/18*/
public class Producer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 准备发送消息 ➡ 发送消息给队列 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 准备交换机String exchangeName = "all-direct-exchange";// 6. 指定交换机的类型String exchangeType = "direct";/*** 第三个参数,是否持久化,所谓的持久化指的是交换机不会随着服务器重启而造成丢失,如果是 true 代表不丢失,false 会丢失*/channel.exchangeDeclare(exchangeName, exchangeType, true);// 5. 定义路由 keyString routeKey = "email";// 7. 准备消息内容String message = "All-Direct-Hello RabbitMQ!";channel.queueDeclare("queue5", true, false, false, null);channel.queueDeclare("queue6", true, false, false, null);channel.queueDeclare("queue7", true, false, false, null);channel.queueBind("queue5", exchangeName, "email");channel.queueBind("queue6", exchangeName, "email");channel.queueBind("queue7", exchangeName, "order");// 8. 发送消息给队列 queue/*** @param1 交换机名称* @param2 队列名称、路由 key* @param3 消息的状态控制* @param4 消息主体*/channel.basicPublish(exchangeName, "order", null, message.getBytes());System.out.println("消息发送成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

消费者

package com.example.rabbitmq.all;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;/*** <p>* 消息消费者* </p>** @author YangShuKai* @date 2021/3/18*/
public class Consumer {public static void main(String[] args) {// 创建连接工厂 ➡ 创建连接 ➡ 通过连接获取通道 ➡ 通过通道创建交换机 ➡ 声明队列 ➡ 接收消息 ➡ 关闭通道 ➡ 关闭连接// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.112.191.158");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("消费者");// 3. 通过连接获取通道 channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息String queueName = "queue7";channel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息接收失败……");}});System.out.println("开始接收消息");System.in.read();System.out.println("消息接收成功……");} catch (Exception e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 8. 关闭连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}

20. RabbitMQ-SpringBoot 案例 - Fanout 模式

21. RabbitMQ-SpringBoot 案例 - Direct 模式

22. RabbitMQ-SpringBoot 案例 - Topic 模式

23. RabbitMQ-SpringBoot 案例 - Work 模式

24. RabbitMQ 的内存警告

24.1 RabbitMQ 的内存警告

当内存使用超过配置的阈值或者磁盘空间剩余空间少于配置的阈值时,RabbitMQ 会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务器端的心跳检测机制也会失效。

如下图:

当出现 blocking 或 blocked 的时候说明到达了阈值以及高负荷运行了。

24.2 RabbitMQ 的内存控制

参考帮助文档:https://ww.rabbitmq.com/configure.html

当出现警告的时候,可以通过配置去修改和调整。

24.2.1 命令的方式

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ 的内存值超过内存的 40% 时,就会产生警告并且阻塞所有生产者的连接。通过此命令修改阈值在 Broker 重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 broker 才会生效。

24.2.2 配置文件方式 rabbitmq.conf

当前配置文件:/etc/rabbitmq/rabbitmq.conf

# 默认
# vm_memory_high_watermark.relative = 0.4
# 使用 relative 相对值进行设置 fraction,建议取值在 0.4~0.7 之间,不建议超过 0.7
vm_memory_high_watermark.relative = 0.6
# 使用 absolute 的绝对值的方式,但是是 KB、MB、GB 对应得命令如下
vm_memory_high_watermark.absolute = 2GB

24.3 RabbitMQ 的内存换页

在某个 Broker 节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。

默认情况下,内存到达的阈值是 50% 时就会换页处理。

也就是说,在默认情况下该内存的阈值是 0.4 的情况下,当内存超过 0.4*0.5=0.2 时,会进行换页动作。

比如有 1000MB 内存,当内存的使用达到了 400MB,已经达到了极限,但是因为配置的换页内存 0.5,这个时候会在达到极限 400MB 之前,会把内存中的 200MB 进行转移到磁盘中。从而达到稳健的运行。

可以通过设置 vm_memory_high_watermark_paging_ration 来进行调整

vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置小于 1 的值)

为什么设置小于 1,因为如果设置为 1 的阈值。内存都已经到达了极限了,再去换页意义不是很大了。

24.4 RabbitMQ 的磁盘预警

当磁盘的剩余空间低于确定的阈值,RabbitMQ 同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。

默认情况下:磁盘预警为 50MB 的时候会进行预警。表示当前磁盘空间低于 50MB 的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。

这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB,第二检查可能就是 1MB,就会出现警告。

通过命令方式修改如下:

rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>disk_limit:固定单位 KB、MB、GB
fraction:是相对阈值,建议范围在 1.0~2.0 之间。(相对于内存)

通过配置文件配置如下:

disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb

25. RabbitMQ-SpringBoot 案例 - 集群部署

SpringBoot 整合 RabbitMQ 集群配置详解

  1. 引入 starter

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.6.RELEASE</version><relativePath/>
    </parent>
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 详细配置如下

    # 配置 RabbitMQ 服务
    spring:rabbitmq:host: 47.112.191.158port: 5672username: adminpassword: adminvirtual-host: /addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6607 #指定 client 连接到的 server 地址,多个以逗号分隔(优先取 address,然后再取 host)# addresses: ip:port,ip:port #集群配置 addressed 之间用逗号隔开requested-heartbeat: #指定心跳超时,单位秒,0 为不指定;默认为 60spublisher-confirms: false #是否启用发布确认:true 是,false 否publisher-returns: false #是否启用发布返回:true 是,false 否connection-timeout: #连接超时,单位毫秒,0 表示无穷大,不超时cache:channel:size: #缓存中保持的 channel 数量checkout-timeout: #当缓存数量被设置时,从缓存中获取一个 channel 的超时时间,单位毫秒;如果为 0,则总是创建一个新 channelconnection:size: #缓存中保持的 channel 数量mode: #连接工厂缓存模式:CHANNEL 和 CONNECTIONlistener:simple:auto-startup: false #是否启动时自动启动容器:true 是,false 否acknowledge-mode: #表示消息确认方式,其有三种配置方式,分别是 none、manual 和 auto;默认为 autoconcurrency: #最小的消费数量max-concurrency: #最大的消费者数量prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于 transaction 数量transaction-size: #指定一个事务处理的消息数量,最好是小于等于 prefetch 的数量default-requeue-rejected: #决定被拒绝的消息是否重新入队;默认是 true(与参数 acknowledge-mode 有关系)idle-event-interval: #多长时间发布空闲容器时间,单位毫秒retry:enabled: #监听重试是否可用:true 是,false 否max-attempts: #最大重试次数initial-interval: #第一次和第二次尝试发布或传递消息之间的间隔multiplier: #应用于上一重试间隔的乘数max-interval: #最大重试时间间隔stateless: #重试是有状态 or 无状态:true 是,false 否template:mandatory: #是否启用强制信息:true 是,false 否;默认为 falsereceive-timeout: #receive()操作的超时时间reply-timeout: #sendAndReceive()操作的超时时间retry:enabled: #发送重试是否可用:true 是,false 否max-attempts: #最大重试次数initial-interval: #第一次和第二次尝试发布或传递消息之间的间隔multiplier: #应用于上一重试间隔的乘数max-interval: #最大重试时间间隔
    

    相关配置很多,只需要关注一些常用的配置即可

    对于发送方而言,需要做以下配置:

    1. 配置 CachingConnectionFactory
    2. 配置 Exchange/Queue/Binding
    3. 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
    4. 配置 RabbitTemplate 用于发送消息,RabbitTemplate 通过 CachingConnectionFactory 获取到 Connection,然后想指定 Exchange 发送

    对于消费方而言,需要做以下配置:

    1. 配置 CachingConnectionFactory
    2. 配置 Exchange/Queue/Binding
    3. 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
    4. 配置 RabbitListenerContainerFactory
    5. 配置 @RabbitListener/@RabbitHandler 用于接收消息

    在默认情况下主要的配置如下:

    配置项 默认值 作用
    host localhost RabbitMQ 服务器地址
    port 5672 RabbitMQ 服务器端口
    username 账户名 guest
    password 密码 guest
    virtualHost RabbitMQ 虚拟主机名 /
    publisherConfirms false 设置是否启用生产方确认
    publisherReturns false 设置是否启用生产方消息返回
    ssl 对象 设置 SSL,默认停用
    template 对象 配置 RabbitTemplate
    template.retry 默认停用 设置 RabbitTemplate 发送消息时的重试,主要用于 RabbitTemplate 与 RabbitMQ 之间的网络连接
    template.mandatory false 设置发送消息失败时(无接收)是否 return 消息,与 return callback 一并使用
    template.exchange - 默认发送的 exchange
    template.routingKey - 默认发送的 routingKey
    template.defaultReceiveQueue null 默认接收消息的 queue
    listener.simple 对象 设置 SimpleRabbitListenerContainerFactory
    listener.direct 对象 设置 DirectRabbitListenerContainerFactory
    listener.simple.concurrency null 并发消费方数量
    listener.simple.acknowledgeMode AUTO 设置消息方确认模式,这里的 AUTO 与 RabbitMQ 的自动确认不是一回事
    listener.simple.prefetch 250 设置消费方一次性接收消息的条数
    listener.simple.defaultRequeueRejected true 当 Listener 发生异常时是否 requeue
    listener.simple.retry 对象 设置 Listener 的重试机制,默认停用,当启用时,Listener 对于消息处理过程中的异常将进行 requeue 重试,超过重试次数再抛弃,此时 AmqpRejectAndDontRequeueException 异常也会被重试
  3. Spring AMQP 的主要对象(如果不了解 AMQP 可前往官网了解)

    作用
    Queue 对应 RabbitMQ 中 Queue
    AmqpTemplate 接口,用于向 RabbitMQ 发送和接收 Message
    RabbitTemplate AmqpTemplate 的实现类
    @RabbitListener 指定消息接收方,可以配置在类和方法上
    @RabbitHandler 指定消息接收方,只能配置在方法上,可以与 RabbitListener 一起使用
    Message 对 RabbitMQ 消息的封装
    Exchange 对 RabbitMQ 的 Exchange 的封装,子类有 TopicExchange、FanoutExchange 和 DirectExchange 等
    Binding 将一个 Queue 绑定到某个 Exchange,本身只是一个声明,并不做实际绑定操作
    AmqpAdmin 接口,用于 Exchange 和 Queue 的管理,比如创建 / 删除 / 绑定等,自动检查 Binding 类并完成绑定操作
    RabbitAdmin AmqpAdmin 的实现类
    ConnectionFactory 创建 Connection 的工厂类,RabbitMQ 也有一个名为 ConnectionFactory 的类。但二者没有继承关系,Spring ConnectionFactory 可以认为是对 RabbitMQ ConnectionFactory 的封装
    CachingConnectionFactory Spring ConnectionFactory 的实现类,可以用于缓存 Channel 和 Connection
    Connection Spring 中用于创建 Channel 的连接类,RabbitMQ 也有一个名为 Connection 的类,但二者没有继承关系,Spring Connection 是对 RabbitMQ Connection 的封装
    SimpleConnection Spring Connection 的实现类,将实际工作代理给 RabbitMQ 的 Connection 类
    MessageListenerContainer 接口,消费端负责与 RabbitMQ 服务器保持连接并将 Message 传递给实际的 @RabbitListener/@RabbitHandler 处理
    RabbitListenerContainerFactory 接口,用于创建 MessageListenerContainer
    SimpleMessageListenerContainer MessageListenerContainer 的实现类
    SimpleRabbitListenerContainerFactory RabbitListenerContainerFactory 的实现类
    RabbitProperties 用于配置 Spring AMQP 的 Property 类
  4. RabbitMQ 配置类

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;import java.util.HashMap;
    import java.util.Map;/*** <p></p>** @author YangShuKai* @date 2021/3/26*/
    @Configuration
    public class RabbitConfig {private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);public static final String RECEIVED_EXCHANGE = "spring-exchange";public static final String RECEIVED_QUEUE = "spring-queue";public static final String RECEIVED_ROUTING_KEY = "spring-key";public static final String DIRECT_EXCHANGE = "spring-exchange";public static final String MDM_QUEUE = "mdmQueue";public static final String TOPIC_EXCHANGE = "spring-top";@Value("${spring.rabbitmq.addresses}")private String hosts;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;/*@Value("${spring.rabbitmq.channelCacheSize}")private int channelCacheSize;@Value("${spring.rabbitmq.addresses}")private int port;@Autowiredprivate ConfirmCallbackListener confirmCallbackListener;@Autowiredprivate ReturnCallbackListener returnCallbackListener;*/@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(hosts);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);/*connectionFactory.setChannelCacheSize(channelCacheSize);connectionFactory.setPort(port);*/connectionFactory.setVirtualHost(virtualHost);// 设置连接工厂缓存模式connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);// 缓存连接数connectionFactory.setConnectionCacheSize(3);// 设置连接限制connectionFactory.setConnectionLimit(6);LOGGER.info("连接工厂设置完成,连接地址:[{}]",hosts);LOGGER.info("连接工厂设置完成,连接用户:[{}]",userName);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin() {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());rabbitAdmin.setAutoStartup(true);rabbitAdmin.setIgnoreDeclarationExceptions(true);rabbitAdmin.declareBinding(bindingMdmQueue());// 声明 topic 交换器rabbitAdmin.declareExchange(directExchange());LOGGER.info("管理员设置完成");return rabbitAdmin;}@Beanpublic RabbitListenerContainerFactory listenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory());factory.setMessageConverter(new Jackson2JsonMessageConverter());// 最小消费者数量factory.setConcurrentConsumers(10);// 最大消费者数量factory.setMaxConcurrentConsumers(10);// 一个请求最大处理的消息数量factory.setPrefetchCount(10);factory.setChannelTransacted(true);// 默认不排队factory.setDefaultRequeueRejected(true);// 手动确认接收到了消息factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);LOGGER.info("监听者设置完成");return factory;}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}@Beanpublic Queue mdmQueue() {Map args = new HashMap();// 绑定该队列到死信交换机args.put("x-dead-letter-exchange", RECEIVED_EXCHANGE);args.put("x-dead-letter-routing-key", RECEIVED_ROUTING_KEY);LOGGER.info("队列交换机绑定完成");return new Queue(RECEIVED_QUEUE, true, false, false, args);}@Beanpublic Binding bindingMdmQueue() {return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");}@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());rabbitTemplate.setMandatory(true);/*// 发布确认rabbitTemplate.setConfirmCallback(confirmCallbackListener);// 启用发布返回rabbitTemplate.setReturnsCallback(returnCallbackListener);*/LOGGER.info("连接模板设置完成");return rabbitTemplate;}/*@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE, true, false);}@Beanpublic Queue directQueue() {return new Queue(RECEIVED_QUEUE, true);}@Beanpublic Binding binding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with(RECEIVED_ROUTING_KEY);}*/
    }
    

    通过两种方式加载

    1. 通过配置文件
    2. 通过配置类

    说明:上面是通过配置文件与配置类的方式去加载,常用的配置如上所示。实际使用中生产方与消费方要分开配置,相关配置也会有小变动,大体配置不变。更多信息可查看官网配置。

RabbitMQ - 高级 - 集群

1. RabbitMQ 集群

RabbitMQ 这款消息队列中间件产品本身是基于 Erlang 编写,Erlang 语言天生具备分布式特性(通过同步 Erlang 集群各节点的 magic cookie 来实现)。因此,RabbitMQ 天然支持 Clustering。这使得 RabbitMQ 本身不需要像 ActiveMQ、Kafka 那样通过 Zookeeper 分布来实现 HA 方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

在实际使用过程中多采取多机多实例部署部署方式,为了便于练习搭建,有时候不得不在一台机器上去搭建一个 RabbitMQ 集群,本章主要针对单机多实例这种方式来进行开展。

主要参考官方文档:https://www.rabbitmq.com/clustering.html

2. 集群搭建

配置的前提是 RabbitMQ 可以运行起来,比如 ps aux|grep rabbitmq 命令可以看到相关进程,又比如运行 rabbitmqctl status 命令可以看到类似如下信息,而不报错:

执行下面命令进行查看:

ps aux|grep rabbitmq

或者

systemctl status rabbitmq-server

注意:确保 RabbitMQ 可以运行,确保完成后,把单机版的 RabbitMQ 服务停止,后台看不到 RabbitMQ 的进程为止

停止单机版的 RabbitMQ 服务

systemctl stop rabbitmq-server

3. 单机多实例搭建

场景:假设有两个 RabbitMQ 节点,分别为 rabbit-1,rabbit-2,rabbit-1 作为主节点,rabbit-2 作为从节点。

启动命令RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached

结束命令rabbitmqctl -n rabbit-1 stop

3.1 启动第一个节点 rabbit-1

sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &

至此节点 rabbit-1 启动完成

3.2 启动第二个节点 rabbit-2

注意:web 管理插件端口占用,所以还要指定其 web 插件占用的端口号

RABBITMQ_SERVER_START_ARGS=“-rabbitmq_management listener [{port,15673}]"

sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &

至此节点 rabbit-2 启动完成

3.3 验证启动

**使用 ps aux|grep rabbitmq**命令验证启动

3.4 rabbit-1 操作为主节点

# 停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
# 目的是清楚节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
# 启动应用
sudo rabbitmqctl -n rabbit-1 start_app

3.5 rabbit-2 操作为从节点

# 停止应用
sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清楚节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-2 reset
# 将 rabbit-2 节点加入到 rabbit-1(主节点)集群当中【centos7->服务器的主机名】
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'centos7'
# 启动应用
sudo rabbitmqctl -n rabbit-2 start_app

3.6 验证集群状态

sudo rabbitmqctl cluster_status -n rabbit-1

3.7 Web 监控

注意在访问的时候:Web 界面的管理需要给 15672 node-1 和 15673 node-2 设置用户名和密码。如下:

rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"

3.8 小结

Tips:

如果采用多级部署方式,需读取其中一个节点的 cookie,并复制到其他节点(节点之间通过 cookie 确定相互是否可通信)。cookie 存放在 / var/lib/rabbitmq/.erlang.cookie。

例如:主机名分别为 rabbit-1、rabbit-2

  1. 逐个启动各节点

  2. 配置各节点的 hosts 文件(vim /etc/hosts)

    ip1:rabbit-1

    ip2:rabbit-2

其他步骤雷同单机部署方式。

RabbitMQ - 高级 - 过期时间 TTL

过期时间 TTL 表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ 可以对消息和队列设置 TTL。目前有两种方法可以设置。

*第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
*第二种方法是对消息进行单独设置,每条消息 TTL 可以不同

如果上述两种方法同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL 值,就称为 dead message 被投递到死信队列,消费者将无法再收到该消息。

设置队列 TTL

package com.example.springboot.rabbitmq.order.producer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TTLRabbitMqConfiguration {/*** 1. 声明注册 direct 模式的交换机*/@Beanpublic DirectExchange ttlDirectExchange() {return new DirectExchange("ttl_direct_exchange", true, false);}/*** 2. 设置队列的过期时间*/@Beanpublic Queue directTTLQueue() {// 设置过期时间Map<String, Object> args = new HashMap<>();// 这里一定是 int 类型args.put("x-message-ttl", 5000);return new Queue("ttl.direct.queue", true, false, false, args);}@Beanpublic Binding directTTLBinding() {return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl");}
}

设置消息 TTL

消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。

package com.example.springboot.rabbitmq.order.producer.service;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void makeOrderMessageTtl(String userId, String productId, int number) {// 1、根据商品 ID 查询库存是否充足// 2、保存订单// 3、通过 MQ 来完成消息的分发String orderId = UUID.randomUUID().toString();System.out.println("订单生成成功" + orderId);String exchangeName = "ttl_direct_exchange";String routingKey = "ttlMessage";// 给消息设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 这里就是设置字符串message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}};// 参数 1:交换机;参数 2:路由 key/queue 队列名称;参数 3:消息内容rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);}
}

RabbitMQ - 高级 - 死信队列

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列。消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 消息达到最大长度

DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

队列不可以修改,只可以删除,要修改时,应先删除再新增新的队列。

在实际场景中,可以增加新的队列并发送消息到新队列中,将旧的队列启用。

在 Web 界面中查看死信队列

消息创建时,设置 5 秒的过期时间,存在于 test.dead.direct.queue 队列中

5 秒后消息未被消费,消息存放到死信队列中

死信队列流程示例

RabbitMQ - 高级 - 分布式事务

1. 简述

分布式事务指事务的操作位于不同的节点上,需要保证事务的 ACID 特性。例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务

2. 分布式事务的方式

2.1 2PC

两阶段提交(Two-phase Commit,2PC),需要数据库厂商的支持,Java 组件有 atomikos 等。2PC 通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否真正执行事务。

准备阶段

协调者询问参与者事务是否执行成功,参与者发回事务执行的结果。

存在的问题

  1. 同步阻塞所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其他操作。
  2. 单点问题协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其他操作。
  3. 数据不一致在阶段二,如果协调者只发送了部分 Commit 消息,此时网络异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
  4. 太过保守任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

2.2 TCC

补偿事务(TCC)其实就是采用的补偿机制(严选、阿里、蚂蚁金服),其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  1. Try 阶段主要是对业务系统做检测及资源预留
  2. Confirm 阶段主要是对业务系统做确认提交,Try 阶段执行成功并开始执行 Confirm 阶段时,默认 Confirm 阶段是不会出错的。即:只要 Try 成功,Confirm 一定成功。
  3. Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假如 Bob 要向 Smith 转账,思路大概是:我们有一个本地方法,里面依次调用

  1. 首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
  2. 在 Confirm 阶段,执行远程调用的转账操作,转账成功进行解冻。
  3. 如果第 2 步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法(Cancel)。

优点:跟 2PC 比起来,实现以及流程相对简单了一些,但数据的一致性比 2PC 也要差一些

缺点:缺点还是比较明显的,在 2,3 步中都有可能失败。TCC 属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用 TCC 不太好定义及处理。

2.3 本地消息表

本地消息表(异步确保)与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。(比如:支付宝、微信支付主动查询支付状态,对账单的形式)

  • 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
  • 之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
  • 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。

优点:一种非常经典的实现,避免了分布式事务,实现了最终一致性。

缺点:消息表会耦合到业务系统中,如果没有封装号的解决方案,会有很多杂活需要处理。

2.4 MQ 事务消息

MQ 事务消息使用与异步场景,通用性较强,拓展性较高。有一些第三方的 MQ 是支持事务消息的,比如 RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的 MQ 都是不支持事务消息的,比如 Kafka 不支持。

以阿里的 RabbitMQ 中间件为例,其思路大致为:

  • 第一阶段 Prepared 消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
  • 也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了,RabbitMQ 会定期扫描消息集群中的事务消息,这时候发现了 Prepared 消息,它会向消息发送者确认,所以生产方需要实现一个 check 接口,RabbitMQ 会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

优点:实现了最终一致性,不需要依赖本地数据库事务。

缺点:实现难度大,主流 MQ 不支持,RocketMQ 事务消息部分代码也未开源。

2.5 总结

分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去选择。阿里 RocketMQ 去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如 LCN 等,可以多去尝试。

3. 具体实现

3.1 分布式事务的完整架构图如下

3.2 分布式系统分布式事务问题

3.3 基于 MQ 的分布式事务整体设计思路

3.4 基于 MQ 的分布式事务消息的可靠生产问题

3.5 基于 MQ 的分布式事务消息的可靠生产问题 - 定时重发

3.6 基于 MQ 的分布式事务消息的可靠消费

3.7 基于 MQ 的分布式事务消息的消息重发

3.8 基于 MQ 的分布式事务消息的死信队列消息转移 + 人工处理

如果死信队列报错就进行人工处理

3.9 基于 MQ 的分布式事务消息的死信队列消息重试注意事项

3.10 基于 MQ 的分布式事务消息的定时重发

4. 总结

优点

基于 MQ 的分布式事务解决方案优点

  1. 通用性强
  2. 拓展方便
  3. 耦合度低,方案也比较成熟

缺点

基于 MQ 的分布式事务解决方案缺点

  1. 基于消息中间件,只适合异步场景
  2. 消息会延迟处理,需要业务上能够容忍

建议

  1. 尽量去避免分布式事务
  2. 尽量将非核心业务做成异步

RabbitMQ 面试题

1. RabbitMQ 为什么需要信道,为什么不是 TCP 直接通信

  1. TCP 的创建和销毁,开销大,需要三次握手,销毁要 4 次挥手。
  2. 如果不用信道,那应用程序就会 TCP 连接到 RabbitMQ 服务器,高峰时每秒成千上万连接就会造成资源的巨大浪费,而且底层操作系统每秒处理 TCP 连接数也是有限制的,必定造成性能瓶颈。
  3. 信道的原理是一条线程一条信道,多条线程多条信道同用一条 TCP 连接,一条 TCP 连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈

2. QUEUE 队列到底在消费者创建还是生产者创建?

  1. 一般建议是在 RabbitMQ 操作面板创建。这是一种稳妥的做法。
  2. 按照常理来说,确实应该消费者这边创建是最好,消息的消费是在消费者这边。这样你承受一个后果,可能我生产在生产消息可能会丢失消息。
  3. 在生产者创建队列也是可以,这样是稳妥的方法,消息是不会出现丢失。
  4. 如果你生产者和消费者都创建队列,谁先启动谁先创建,后面启动就覆盖前面的。

3. 可以存在没有交换机的队列吗?

不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。

RabbitMQ 学习笔记相关推荐

  1. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

  2. RabbitMQ学习笔记(3)----RabbitMQ Worker的使用

    1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...

  3. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  4. RabbitMQ学习笔记(高级篇)

    RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...

  5. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  6. 分布式消息中间件之RabbitMQ学习笔记[一]

    写在前面 嗯,陆续的整理一些中间件的笔记 今天和小伙伴们分享RabbitMQ 相关笔记 博文偏理论,内容涉及: RabbitMQ的简单介绍 AMQP协议标准介绍 RabbitMQ Demo 食用方式: ...

  7. 官网英文版学习——RabbitMQ学习笔记(二)RabbitMQ安装

    一.安装RabbitMQ的依赖Erlang 要进行RabbitMQ学习,首先需要进行RabbitMQ服务的安装,安装我们可以根据官网指导进行http://www.rabbitmq.com/downlo ...

  8. RabbitMQ学习笔记(一)

    前言: 学习B站UP主狂神说视频笔记整理视频链接 什么是中间件 中间件( Middleware ) 是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分.人们在使用中间件时,往往 ...

  9. RabbitMQ学习笔记-RabbitMQ深入理解

    导语    RabbitMQ 整体是一个与其他中间件相同的模型,主要是负责接收.存储和转发消息.可以把消息传递的过程想想成快递:将一个快递送到快递公司,快递公司由快递员送到收件人的人中,RabbitM ...

最新文章

  1. mysql5.5数据库名_mysql5.5数据库名
  2. 写一个函数,2 个参数,1 个字符串,1 个字节数,返回截取的字符串,要求字符串中的中文不能出现乱码
  3. C# API中的模型和它们的接口设计
  4. 使用YII2 构建一个定时任务
  5. 关于随机验证码的一些小见解。
  6. JQuery获取元素的N种方法
  7. 关于静态联编和动态联编
  8. VTK:不透明度用法实战
  9. FreeSql (三十三)CodeFirst 类型映射
  10. 计算机的原理两条分别是,微机原理习题答案4
  11. [Django]Windows下的Django安装——通过pip
  12. Eclipse切换回中文
  13. 报错:信息:INFO: Error parsing HTTP request header
  14. C语言实现逆波兰表示法(栈)
  15. 如何把晨光计算机调成音乐模式,伴着晨光走向你——广播《晨光音乐行》栏目运作心得...
  16. 带有鸿蒙logo的壁纸,鸿蒙OS Logo曝光
  17. 计算机网络调试记录表,计算机网络管理员中级操作技能考核评分记录表.doc
  18. CJT长江连接器公司的A2541系列线对板连接器PCB封装库
  19. (2016/02/19)多传感器数据融合算法---9轴惯性传感器
  20. hadoop文件读写示例

热门文章

  1. 深度学习系列(四):什么是稀疏编码
  2. 阿里云直播鉴权和直播地址算法
  3. 【译】css动画里的steps()用法详解
  4. 【转】一个游戏程序员的学习资料
  5. Xilinx XC7Z020双核ARM+FPGA开发板试用合集——自定义硬件工程
  6. Selenium 循环删除页面元素
  7. 【Java】【系列篇】【Spring源码解析】【三】【体系】【BeanFactory体系】
  8. windows路由表完全掌握(内容:路由表解析,读懂路由表)
  9. lo4net的简单运用
  10. 计算机科班与培训开发编程的区别在哪里?