文章目录

  • 什么是RMQ?架构介绍
  • 消息持久化
  • 异步刷盘
  • RocketMQ为什么速度快
  • 延迟消息
  • 事务消息
  • 过滤消息
  • 有序消息
  • 批量消息
  • 消息轨迹(msg trace)
  • 消费重试
  • 死信队列
  • 广播消息
  • offset 持久化
  • Consumer Group—Reblance机制
  • 高可用模式
    • Master/Slave
    • Dleger模式
  • LMQ
  • 为什么有的公司会去做RocketMQ的定制?怎么做比较好?
  • RocketMQ相对于Kafka好在哪里?
  • 什么情况下会出现消息丢失的情况?
  • 如何保证消息不丢失?
  • 消息降级机制
  • 消息幂等

什么是RMQ?架构介绍

RocketMQ | 解耦、异步、削峰原理解析

消息持久化

RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。

Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据。

同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offsetindexfile中会保存key和offset、timeStamp、NextIndexOffset的对应关系,辅助consumequeue文件进行索引。


Producer将消息写入CommitLog之后,会分发到ComsumerQueue中

简单来讲就是:

  • commitlog:顺序保存一条条的消息
  • consumequeue:保存消费者消费到哪里了
  • indexfile:辅助consumequeue文件进行索引,例如TimeStamp等,也因为indexFile,RMQ能够支持按照最新的消息进行消费

异步刷盘

RocketMQ为什么速度快

是因为使用了顺序存储、Page Cache和异步刷盘、零拷贝技术。

  • 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多

  • 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache,最后由操作系统异步将缓存中的数据刷到磁盘

  • RMQ的消息是存储在磁盘中的,如果任何优化都不做,需要四次拷贝(由于文件系统、网卡都是内核态,涉及用户态到内核态、内核态到用户态),RMQ零拷贝技术基于mmap和SendFile两种实现。

  • mmap技术实现了映射,不必把磁盘搞到内存中,直接映射修改,减少了两次拷贝,但是有大小限制(1.5G到2G),也因此我们可以注意到commitLog一般大小是1G左右

  • SendFile技术更快,则基于DMA技术,能够只用一次拷贝(从源地址读到目的地址),但是SendFIle不支持对文件的操作

延迟消息

生产者生产的消息,不能被立即被消费,需要延迟一定的时间

比较常见的场景就是用户下单之后未付款,那么我们可以把这个当成是一个消息发送给下游,等到下游消费的时候,把这个订单关闭就行。

消息被发送到 broker 时,首先正常写入 commit log,在分发至 consume queue 时,会将 topic 改成 SCHEDULE_TOPIC_XXXX ,将 queue id 改成 delayTimeLevel - 1 。同时暂存原来的 topic 和 queue,通过改变TOPIC,就不会被消费了。

  1. 生产者先把消息写入到commitLog中,修改消息Topic名称和队列信息,加上SCHEDULE_TOPIC_XXXX前缀
  2. 转发消息到延迟主题的CosumeQueue
  3. 延迟服务ScheduleMessageService消费SCHEDULE_TOPIC_XXXX消息
  4. 将信息重新存储到CommitLog
  5. 将消息投递到目标Topic
  6. 消费者消费目标topic中的数据

新的TOPICSCHEDULE_TOPIC_XXXX 将被归为系统 topic,当 client 订阅系统 topic 时,会抛出异常。

事务消息

RMQ支持事务消息,与常见的数据库事务流程类似,生产者生产一条事务消息,等生产者本地操作ok了,会向MQ Server发送提交确认。

在这种情况下,事务消息初始被扔到消息队列中是没办法消费的,只有当该事务消息被提交了,这个消息才能被消费者所消费。

同理,如果rollback了,RMQ会从消息队列中把该消息删掉。

  1. 生产者生产消息给MQ,此时未提交,称该消息为HalfMsg,即半消息
  2. MQ受到该消息,会给生产者一个OK确认
  3. 生产者执行本地事务
  4. 根据本地事务执行的结果向MQ提交commit或者rollback,MQ根据commit或者rollback对消息进行相应操作,即消费或者丢弃
  5. 如果生产者提交commit或者rollback提交超时,即第四步没有收到来自生产者的请求,MQ回调检查该消息
  6. MQ回调检查本地事务的状态

过滤消息

RMQ支持按照TAG过滤消息,例如TOPIC有TAG_A、TAG_B、TAG_C,如果只想要消费TAGA、TAG_C的消息,例如:

 consumer.subscribe("TagFilterTest", "TagA || TagC");

为了更灵活地过滤消息,RMQ支持按照SQL 92语法过滤消息:

        consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));

sql中的a是生产者设置的一个Property:

msg.putUserProperty("a", String.valueOf(i));

有序消息

例如对一个订单来说,订单创建、订单完成,这两条消息就需要是顺序的。

消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。

另一方面,如果要想实现顺序消息,就需要去保证保证生产者 - MQServer - 消费者是一对一对一的关系

但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

RocketMQ是怎么做的呢?

  • 一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。例如,OrderId相同的消息,会发送到同一个队列

批量消息

为了提升效率,RMQ支持Producer一次性发送多条消息到Broker,减少网络请求次数

批量消息存在限制,即单批消息大小不能大于1MB,实际上是4MB

RMQ提供了Spliter类分离大批量消息,使得单批消息不大于4MB

消息轨迹(msg trace)

消息轨迹可以理解为RMQ支持对一条消息从Producer再到Consumer这个过程的Trace,记录的内容如下所示

#Type      #ProducerGroup       #ClientHost          #SendTime            #CostTimes #Status
Pub        1623305799667        xxx.xxx.xxx.xxx       2021-06-10 14:16:40  131ms      success

消息轨迹原理是使用系统TOPIC来实现的,这个TOPIC名字也可以自定义,基于customizedTraceTopic参数实现

    public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);}

消费重试

如果在处理消息的过程中出现异常,消息消费失败,肯定是不能直接扔掉该消息的,需要进行一定次数的消费重试,次数默认是16次,重试之前会存在间隔,例如第一次重试是1s,第二次重试是2s,第十五次是1h,第十六次是2h。

死信队列

当消费重试次数达到默认的16次,会把消息扔到死信队列中,保存3天之后再删掉(与正常消息的过期时间相同),进一步保障业务的可靠性。

死信消息拥有以下特性:

  • 设置死信队列的权限是2,即不会再被消费者正常消费,留存有效期与正常消息相同,均为3天
  • 死信队列与Group ID是对应的,所以某个死信队列里存在的消息可能属于不同的topic
  • 如果一个Group ID未产生死信消息,RMQ不会为其创建相应的死信队列

广播消息

普通消息都是RMQ让一个消费者组中的一个实例来消费的,而广播消息则会发给所有订阅了该Topic的消费者实例去消费消息,需要注意的是:

  • 广播消息消费失败之后,不支持按照返回Action.ReconsumerLater进行重试,只会去消费新的消息
  • 如果要使用广播模式消费消息,可以在消费者处设置消费模式为广播模式:
consumer.setMessageModel(MessageModel.BROADCASTING);

offset 持久化

RocketMQ 维护了一个每隔 10s 运行一次的 Timer,目的是将每个队列的消费情况写入文件,等到下次重启时,可以从上次消费位置进行消费。需要注意的是,因为不是实时同步,有一定的时间间隔,所以,有可能会出现重复消息的情况。我们需要在消费端做幂等处理,避免重复消费。

Consumer Group—Reblance机制

  • 一个消费者实例可以是同一机器下的不同线程,也可以是不同机器实例下运行的线程
  • 当多个消费者实例使用同一个消费者组订阅Topic时,RocketMQ有Reblance机制,对不同的Queue进行重新分配消费者实例,事实上是一个并行处理消息的操作
  • 通过Reblance机制,就能够让多台消费者实例共同完成消费任务
  • 分配策略RMQ也支持多种,例如轮询、同机房分配策略

    前面说的是消费端负载均衡,在生产者端负载均衡比较简单,就是轮询,messagequeue是有ID例如0,1,2,3,4的,生产消息的时候,可以轮询给不同的messagequeue

高可用模式

Master/Slave

RMQ支持Master/Slave模式的高可用,slave通过同步复制或者异步复制的方式去同步Master的数据。

当Master节点的Broker挂掉时,Producer可以选择新的Master去投递消息,但是故障主节点需要人为切换才能解决故障。

一般来说同步复制、异步复制的选型需要注意:

  • 同步复制:缺点在于需要所有的从节点都拷贝一份主节点的数据,一但有某个从节点出了延迟,主节点这边的业务就只能等。

  • 异步复制:看起来数据是不太可靠的,当主节点挂掉的时候,从节点的数据不一定是ok的,存在丢数据的风险。异步复制也较为适合异地节点的情况,当从节点与主节点之间的距离过于远的时候,异步是一个不错的选择。

  • 半同步复制:当大于一半的葱节点复制数据是ok的时候,即认为是ok的

Dleger模式

为了解决Master/Slave模式不能自动切主的问题, RMQ也支持Dleger模式,其基于Raft协议实现,能够基于通信自动切主。

如果想要搞懂Dleger模式,一定要先搞清楚Raft一致性协议:分布式共识算法Raft

Dleger模式下,消息从生产者投递到Master是按照Raft协议来的:

  • 消息来到Master节点,消息的状态会是UNCOMMITED
  • 在Master向Slave节点同步复制之后,当收到半数Slave节点已经同步完成的响应时,才会把这条消息的状态设置成COMMITED,进而才会给消费者消费
  • 当Master挂掉之后,Slave通过选举切换为Master,会重新把UNCOMMITED的消息搞到新的Master节点进行同步

RMQ曾考虑过使用ZK或者ETCD去实现选主复制,但这样会引入重量级的组件,会给之后的维护和诊断增加难度。

LMQ

LMQ的作用在于万物互联场景下,移动边缘设备向我们的数据中心传输数据所用的微消息队列

RocketMQ在最新的版本中,基于MQTT协议对LMQ进行了支持。

为什么有的公司会去做RocketMQ的定制?怎么做比较好?

  • 社区版还是不能完全适配公司内部鸡架的需求,例如有的公司会在Client与Server之间加层Proxy,以满足定制化的需求

RocketMQ相对于Kafka好在哪里?

  • 可以看出,在topic 量小的情况下,kafka的性能是优于rocketMQ的
  • 但随着topic量的增大,kafka的消息处理能力急剧下降,而RocketMQ则几乎没有变化

在topic数量较小时,kafka在发送端做了批处理,将一些消息封装为一批,发送给broker,所以性能较优

在topic数量较大时,kafka的文件模型是一个partipation对应一个文件,而rocketmq则是所有的queue共用一个commit log,kafka散落文件导致磁盘IO慢

什么情况下会出现消息丢失的情况?

  • 涉及到网络通信就可能出现消息丢失的情况,包括生产者/消费者与Broker之间的网络通信、Broker主从复制之间的网络通信、异步刷盘PageCache时机器宕机等情况

如何保证消息不丢失?

  • MQ ACK机制,Broker收到生产者的消息才返回ACK,如果发送端没收到ACK,超时重发。同时,事务消息机制可以进一步保证消息不丢失,通过Half消息起到一个嗅探MQ服务是否可用的作用
  • 消费者同步消费,即当消费者业务处理完成之后再返回“消费成功”给Broker
  • Broker同步复制+Dledger架构自动切主

消息降级机制

如果RMQ直接挂掉了,但生产者本地事务还是在执行的,消息会发不出去,那么这里最好有一个消息降级机制:

  • 尝试多次发给MQ失败,那么就利用Redis或者磁盘存起来这些消息
  • 同时搞一个线程,定时去尝试把这些消息重新发给MQ

消息幂等

  • 对于一条消息就是一个完整的数据:唯一索引+Redis幂等可以实现
  • 对于多条消息才能组成一个完整的数据:可以设置多个ready字段,当ready=0的时候可以插入,当ready=1的时候不插入,结合sql的on duplicate key来实现

RocketMQ 知识点整理相关推荐

  1. RocketMQ知识点整理

    RocketMQ知识点整理 一.消息队列 二.RocketMQ简介 RocketMQ-组件 RocketMQ架构: 三. RocketMQ理解性问题整理 1.使用消息中间件之前需要先了解"同 ...

  2. Java进阶3 - 易错知识点整理(待更新)

    Java进阶3 - 易错知识点整理(待更新) 该章节是Java进阶2- 易错知识点整理的续篇: 在前一章节中介绍了 ORM框架,中间件相关的面试题,而在该章节中主要记录关于项目部署中间件,监控与性能优 ...

  3. 面试题引出的知识点整理

    1.自旋锁&可重复锁&公平锁&共享锁&分段锁你都知道吗? 2.无锁&偏向锁&轻量级锁&重量级锁如何膨胀升级? 3.Lock底层AQS实现与Syn ...

  4. C语言考研复试知识点整理

    C语言考研复试知识点整理 1.由float x=3e-6,y=3e-6;不能得到x= =y的逻辑值为真. 解析:float类型的变量只能进行>或<运算,不能进行==运算 2.自增和自减运算 ...

  5. 【Android 面试基础知识点整理】

    针对Android面试中常见的一些知识点整理,Max 仅仅是个搬运工.感谢本文中引用文章的各位作者,给大家分享了这么多优秀文章.对于当中的解析,是原作者个人见解,有错误和不准确的地方,也请大家积极指正 ...

  6. mysql 存储引擎 面试_搞定PHP面试 - MySQL基础知识点整理 - 存储引擎

    MySQL基础知识点整理 - 存储引擎 0. 查看 MySQL 支持的存储引擎 可以在 mysql 客户端中,使用 show engines; 命令可以查看MySQL支持的引擎: mysql> ...

  7. mysql 全面知识点_Mysql知识点整理

    1.存储引擎区别 MyISAM:不支持事物.仅支持表级锁.支持B+树索引 MEMORY:不支持事物.仅支持表级锁.支持B+树和HASH索引 InnoDB:支持事物.支持行级锁.支持B+树索引 2.锁机 ...

  8. 04741计算机网络原理知识点,04741计算机网络原理知识点整理.doc

    04741计算机网络原理知识点整理 1.计算机网络大发展 计算机网络从20世纪70年代开始发展,他的演变可以概括为 面向终端的计算机网络.计算机-计算机网络.开放式标准化网络以及因特网广泛应用和高速网 ...

  9. python基础知识整理-python入门基础知识点整理-20171214

    一.知识点整理 1.python2与python3的区别: (1)宏观比对 python2 源码不标准,较为混乱,并且重复的代码很多. python3 源码统一了标准,同时也去除了重复代码. (2)编 ...

最新文章

  1. 微信小程序设置域名、不校验域名
  2. 新浪微博开放平台开发-android客户端(2)
  3. PHP学习笔记--抽象类和抽象方法的应用
  4. linux显示中文乱码
  5. Apollo 分布式配置中心 搭建篇
  6. centos上安装adobe flash
  7. 【转】简单的解释XSS攻击
  8. Hive性能优化(全面)
  9. ​小米 11 发布,售价 3999 元起;罗永浩回应败诉半导体公司;deepin 20.1(1010) 发布|极客头条...
  10. 线条边框简笔画图片大全_表情包丨表情包简笔画图片大全可爱
  11. [译] 在浏览器里使用 TenserFlow.js 实时估计人体姿态
  12. 扇贝有道180628每日一句
  13. 利用VBS脚本让qq永远在线
  14. 黑苹果系统镜像稳定版 10.9.5 - 10.15.6 整合下载
  15. 无法启动游戏 因为计算机,win7电脑无法启动游戏怎么办?
  16. 安卓逆向,Python爬虫,网页逆向和其他学习计划
  17. 智能合约漏洞检测论文整理
  18. html5获取坐标高德,vue 单纯的获取经纬度 百度与高德 H5
  19. NPOI之Excel——合并单元格、设置样式、输入公式
  20. 手把手教你如何用python制作自动翻译程序

热门文章

  1. Pandas 秘籍:1~5
  2. 关于python学成之后如何接单
  3. 大专计算机知识,大专院校计算机教学
  4. 计算机硬件维护实验报告,计算机硬件维护实验报告.pdf
  5. qemu创建linux虚拟机(亲测有效,virt-manger方式)
  6. word无法保存此文件,因为它已经在别处打开。(C:/…/STARTUP/Powerword.dot)
  7. iOS 静态库制作,Framework制作,Bundle制作
  8. 右键新建里面没有记事本和word以及excel简单解决
  9. 为了快速响应全球用户请求,汇量科技选择了上云
  10. 华为与Qualcomm率先完成TDD制式LTE Cat.1 MDM对接测试