RocketMQ 知识点整理
文章目录
- 什么是RMQ?架构介绍
- 消息持久化
- 异步刷盘
- RocketMQ为什么速度快
- 延迟消息
- 事务消息
- 过滤消息
- 有序消息
- 批量消息
- 消息轨迹(msg trace)
- 消费重试
- 死信队列
- 广播消息
- offset 持久化
- Consumer Group—Reblance机制
- 高可用模式
- Master/Slave
- Dleger模式
- LMQ
- 为什么有的公司会去做RocketMQ的定制?怎么做比较好?
- RocketMQ相对于Kafka好在哪里?
- 什么情况下会出现消息丢失的情况?
- 如何保证消息不丢失?
- 消息降级机制
- 消息幂等
什么是RMQ?架构介绍
RocketMQ | 解耦、异步、削峰原理解析
消息持久化
RocketMQ主要的存储文件包括commitlog
文件、consumequeue
文件、indexfile
文件。
Broker在收到消息之后,会把消息保存到commitlog
的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据。
同时,每个topic对应的messagequeue下都会生成consumequeue
文件用于保存commitlog
的物理位置偏移量offset
,indexfile
中会保存key和offset、timeStamp、NextIndexOffset的对应关系,辅助consumequeue
文件进行索引。
Producer将消息写入CommitLog之后,会分发到ComsumerQueue中
简单来讲就是:
- commitlog:顺序保存一条条的消息
- consumequeue:保存消费者消费到哪里了
- indexfile:辅助consumequeue文件进行索引,例如TimeStamp等,也因为indexFile,RMQ能够支持按照最新的消息进行消费
异步刷盘
RocketMQ为什么速度快
是因为使用了顺序存储、Page Cache
和异步刷盘、零拷贝技术。
我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache,最后由操作系统异步将缓存中的数据刷到磁盘
RMQ的消息是存储在磁盘中的,如果任何优化都不做,需要四次拷贝(由于文件系统、网卡都是内核态,涉及用户态到内核态、内核态到用户态),RMQ零拷贝技术基于mmap和SendFile两种实现。
mmap
技术实现了映射,不必把磁盘搞到内存中,直接映射修改,减少了两次拷贝,但是有大小限制(1.5G到2G),也因此我们可以注意到commitLog一般大小是1G左右SendFile
技术更快,则基于DMA技术,能够只用一次拷贝(从源地址读到目的地址),但是SendFIle不支持对文件的操作
延迟消息
生产者生产的消息,不能被立即被消费,需要延迟一定的时间
比较常见的场景就是用户下单之后未付款,那么我们可以把这个当成是一个消息发送给下游,等到下游消费的时候,把这个订单关闭就行。
消息被发送到 broker 时,首先正常写入 commit log,在分发至 consume queue 时,会将 topic 改成 SCHEDULE_TOPIC_XXXX
,将 queue id 改成 delayTimeLevel - 1 。同时暂存原来的 topic 和 queue,通过改变TOPIC,就不会被消费了。
- 生产者先把消息写入到commitLog中,修改消息Topic名称和队列信息,加上
SCHEDULE_TOPIC_XXXX
前缀 - 转发消息到延迟主题的
CosumeQueue
中 - 延迟服务
ScheduleMessageService
消费SCHEDULE_TOPIC_XXXX
消息 - 将信息重新存储到
CommitLog
中 - 将消息投递到目标
Topic
中 - 消费者消费目标
topic
中的数据
新的TOPICSCHEDULE_TOPIC_XXXX
将被归为系统 topic,当 client 订阅系统 topic 时,会抛出异常。
事务消息
RMQ支持事务消息,与常见的数据库事务流程类似,生产者生产一条事务消息,等生产者本地操作ok了,会向MQ Server发送提交确认。
在这种情况下,事务消息初始被扔到消息队列中是没办法消费的,只有当该事务消息被提交了,这个消息才能被消费者所消费。
同理,如果rollback了,RMQ会从消息队列中把该消息删掉。
- 生产者生产消息给MQ,此时未提交,称该消息为
HalfMsg
,即半消息 - MQ受到该消息,会给生产者一个OK确认
- 生产者执行本地事务
- 根据本地事务执行的结果向MQ提交commit或者rollback,MQ根据commit或者rollback对消息进行相应操作,即消费或者丢弃
- 如果生产者提交commit或者rollback提交超时,即第四步没有收到来自生产者的请求,MQ回调检查该消息
- MQ回调检查本地事务的状态
过滤消息
RMQ支持按照TAG过滤消息,例如TOPIC有TAG_A、TAG_B、TAG_C,如果只想要消费TAGA、TAG_C的消息,例如:
consumer.subscribe("TagFilterTest", "TagA || TagC");
为了更灵活地过滤消息,RMQ支持按照SQL 92语法过滤消息:
consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));
sql中的a是生产者设置的一个Property:
msg.putUserProperty("a", String.valueOf(i));
有序消息
例如对一个订单来说,订单创建、订单完成,这两条消息就需要是顺序的。
消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。
另一方面,如果要想实现顺序消息,就需要去保证保证生产者 - MQServer - 消费者是一对一对一的关系
但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。
RocketMQ是怎么做的呢?
- 一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。例如,OrderId相同的消息,会发送到同一个队列
批量消息
为了提升效率,RMQ支持Producer一次性发送多条消息到Broker,减少网络请求次数
批量消息存在限制,即单批消息大小不能大于1MB,实际上是4MB
RMQ提供了Spliter类分离大批量消息,使得单批消息不大于4MB
消息轨迹(msg trace)
消息轨迹可以理解为RMQ支持对一条消息从Producer再到Consumer这个过程的Trace,记录的内容如下所示
#Type #ProducerGroup #ClientHost #SendTime #CostTimes #Status
Pub 1623305799667 xxx.xxx.xxx.xxx 2021-06-10 14:16:40 131ms success
消息轨迹原理是使用系统TOPIC来实现的,这个TOPIC名字也可以自定义,基于customizedTraceTopic参数实现
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);}
消费重试
如果在处理消息的过程中出现异常,消息消费失败,肯定是不能直接扔掉该消息的,需要进行一定次数的消费重试,次数默认是16次,重试之前会存在间隔,例如第一次重试是1s,第二次重试是2s,第十五次是1h,第十六次是2h。
死信队列
当消费重试次数达到默认的16次,会把消息扔到死信队列中,保存3天之后再删掉(与正常消息的过期时间相同),进一步保障业务的可靠性。
死信消息拥有以下特性:
- 设置死信队列的权限是2,即不会再被消费者正常消费,留存有效期与正常消息相同,均为3天
- 死信队列与
Group ID
是对应的,所以某个死信队列里存在的消息可能属于不同的topic - 如果一个Group ID未产生死信消息,RMQ不会为其创建相应的死信队列
广播消息
普通消息都是RMQ让一个消费者组中的一个实例来消费的,而广播消息则会发给所有订阅了该Topic的消费者实例去消费消息,需要注意的是:
- 广播消息消费失败之后,不支持按照返回Action.ReconsumerLater进行重试,只会去消费新的消息
- 如果要使用广播模式消费消息,可以在消费者处设置消费模式为广播模式:
consumer.setMessageModel(MessageModel.BROADCASTING);
offset 持久化
RocketMQ 维护了一个每隔 10s 运行一次的 Timer,目的是将每个队列的消费情况写入文件,等到下次重启时,可以从上次消费位置进行消费。需要注意的是,因为不是实时同步,有一定的时间间隔,所以,有可能会出现重复消息的情况。我们需要在消费端做幂等处理,避免重复消费。
Consumer Group—Reblance机制
- 一个消费者实例可以是同一机器下的不同线程,也可以是不同机器实例下运行的线程
- 当多个消费者实例使用同一个消费者组订阅Topic时,RocketMQ有Reblance机制,对不同的Queue进行重新分配消费者实例,事实上是一个并行处理消息的操作
- 通过Reblance机制,就能够让多台消费者实例共同完成消费任务
- 分配策略RMQ也支持多种,例如轮询、同机房分配策略
前面说的是消费端负载均衡,在生产者端负载均衡比较简单,就是轮询,messagequeue是有ID例如0,1,2,3,4的,生产消息的时候,可以轮询给不同的messagequeue
高可用模式
Master/Slave
RMQ支持Master/Slave模式的高可用,slave通过同步复制或者异步复制的方式去同步Master的数据。
当Master节点的Broker挂掉时,Producer可以选择新的Master去投递消息,但是故障主节点需要人为切换才能解决故障。
一般来说同步复制、异步复制的选型需要注意:
同步复制:缺点在于需要所有的从节点都拷贝一份主节点的数据,一但有某个从节点出了延迟,主节点这边的业务就只能等。
异步复制:看起来数据是不太可靠的,当主节点挂掉的时候,从节点的数据不一定是ok的,存在丢数据的风险。异步复制也较为适合异地节点的情况,当从节点与主节点之间的距离过于远的时候,异步是一个不错的选择。
半同步复制:当大于一半的葱节点复制数据是ok的时候,即认为是ok的
Dleger模式
为了解决Master/Slave模式不能自动切主的问题, RMQ也支持Dleger模式,其基于Raft
协议实现,能够基于通信自动切主。
如果想要搞懂Dleger模式,一定要先搞清楚Raft一致性协议:分布式共识算法Raft
Dleger模式下,消息从生产者投递到Master是按照Raft
协议来的:
- 消息来到Master节点,消息的状态会是
UNCOMMITED
- 在Master向Slave节点同步复制之后,当收到半数Slave节点已经同步完成的响应时,才会把这条消息的状态设置成
COMMITED
,进而才会给消费者消费 - 当Master挂掉之后,Slave通过选举切换为Master,会重新把
UNCOMMITED
的消息搞到新的Master节点进行同步
RMQ曾考虑过使用ZK
或者ETCD
去实现选主复制,但这样会引入重量级的组件,会给之后的维护和诊断增加难度。
LMQ
LMQ的作用在于万物互联场景下,移动边缘设备向我们的数据中心传输数据所用的微消息队列。
RocketMQ在最新的版本中,基于MQTT
协议对LMQ进行了支持。
为什么有的公司会去做RocketMQ的定制?怎么做比较好?
- 社区版还是不能完全适配公司内部鸡架的需求,例如有的公司会在Client与Server之间加层Proxy,以满足定制化的需求
RocketMQ相对于Kafka好在哪里?
- 可以看出,在topic 量小的情况下,kafka的性能是优于rocketMQ的
- 但随着topic量的增大,kafka的消息处理能力急剧下降,而RocketMQ则几乎没有变化
在topic数量较小时,kafka在发送端做了批处理,将一些消息封装为一批,发送给broker,所以性能较优
在topic数量较大时,kafka的文件模型是一个partipation对应一个文件,而rocketmq则是所有的queue共用一个commit log,kafka散落文件导致磁盘IO慢
什么情况下会出现消息丢失的情况?
- 涉及到网络通信就可能出现消息丢失的情况,包括生产者/消费者与Broker之间的网络通信、Broker主从复制之间的网络通信、异步刷盘PageCache时机器宕机等情况
如何保证消息不丢失?
- MQ ACK机制,Broker收到生产者的消息才返回ACK,如果发送端没收到ACK,超时重发。同时,事务消息机制可以进一步保证消息不丢失,通过Half消息起到一个嗅探MQ服务是否可用的作用
- 消费者同步消费,即当消费者业务处理完成之后再返回“消费成功”给Broker
- Broker同步复制+Dledger架构自动切主
消息降级机制
如果RMQ直接挂掉了,但生产者本地事务还是在执行的,消息会发不出去,那么这里最好有一个消息降级机制:
- 尝试多次发给MQ失败,那么就利用Redis或者磁盘存起来这些消息
- 同时搞一个线程,定时去尝试把这些消息重新发给MQ
消息幂等
- 对于一条消息就是一个完整的数据:唯一索引+Redis幂等可以实现
- 对于多条消息才能组成一个完整的数据:可以设置多个ready字段,当ready=0的时候可以插入,当ready=1的时候不插入,结合sql的
on duplicate key
来实现
RocketMQ 知识点整理相关推荐
- RocketMQ知识点整理
RocketMQ知识点整理 一.消息队列 二.RocketMQ简介 RocketMQ-组件 RocketMQ架构: 三. RocketMQ理解性问题整理 1.使用消息中间件之前需要先了解"同 ...
- Java进阶3 - 易错知识点整理(待更新)
Java进阶3 - 易错知识点整理(待更新) 该章节是Java进阶2- 易错知识点整理的续篇: 在前一章节中介绍了 ORM框架,中间件相关的面试题,而在该章节中主要记录关于项目部署中间件,监控与性能优 ...
- 面试题引出的知识点整理
1.自旋锁&可重复锁&公平锁&共享锁&分段锁你都知道吗? 2.无锁&偏向锁&轻量级锁&重量级锁如何膨胀升级? 3.Lock底层AQS实现与Syn ...
- C语言考研复试知识点整理
C语言考研复试知识点整理 1.由float x=3e-6,y=3e-6;不能得到x= =y的逻辑值为真. 解析:float类型的变量只能进行>或<运算,不能进行==运算 2.自增和自减运算 ...
- 【Android 面试基础知识点整理】
针对Android面试中常见的一些知识点整理,Max 仅仅是个搬运工.感谢本文中引用文章的各位作者,给大家分享了这么多优秀文章.对于当中的解析,是原作者个人见解,有错误和不准确的地方,也请大家积极指正 ...
- mysql 存储引擎 面试_搞定PHP面试 - MySQL基础知识点整理 - 存储引擎
MySQL基础知识点整理 - 存储引擎 0. 查看 MySQL 支持的存储引擎 可以在 mysql 客户端中,使用 show engines; 命令可以查看MySQL支持的引擎: mysql> ...
- mysql 全面知识点_Mysql知识点整理
1.存储引擎区别 MyISAM:不支持事物.仅支持表级锁.支持B+树索引 MEMORY:不支持事物.仅支持表级锁.支持B+树和HASH索引 InnoDB:支持事物.支持行级锁.支持B+树索引 2.锁机 ...
- 04741计算机网络原理知识点,04741计算机网络原理知识点整理.doc
04741计算机网络原理知识点整理 1.计算机网络大发展 计算机网络从20世纪70年代开始发展,他的演变可以概括为 面向终端的计算机网络.计算机-计算机网络.开放式标准化网络以及因特网广泛应用和高速网 ...
- python基础知识整理-python入门基础知识点整理-20171214
一.知识点整理 1.python2与python3的区别: (1)宏观比对 python2 源码不标准,较为混乱,并且重复的代码很多. python3 源码统一了标准,同时也去除了重复代码. (2)编 ...
最新文章
- 微信小程序设置域名、不校验域名
- 新浪微博开放平台开发-android客户端(2)
- PHP学习笔记--抽象类和抽象方法的应用
- linux显示中文乱码
- Apollo 分布式配置中心 搭建篇
- centos上安装adobe flash
- 【转】简单的解释XSS攻击
- Hive性能优化(全面)
- ​小米 11 发布,售价 3999 元起;罗永浩回应败诉半导体公司;deepin 20.1(1010) 发布|极客头条...
- 线条边框简笔画图片大全_表情包丨表情包简笔画图片大全可爱
- [译] 在浏览器里使用 TenserFlow.js 实时估计人体姿态
- 扇贝有道180628每日一句
- 利用VBS脚本让qq永远在线
- 黑苹果系统镜像稳定版 10.9.5 - 10.15.6 整合下载
- 无法启动游戏 因为计算机,win7电脑无法启动游戏怎么办?
- 安卓逆向,Python爬虫,网页逆向和其他学习计划
- 智能合约漏洞检测论文整理
- html5获取坐标高德,vue 单纯的获取经纬度 百度与高德 H5
- NPOI之Excel——合并单元格、设置样式、输入公式
- 手把手教你如何用python制作自动翻译程序
热门文章
- Pandas 秘籍:1~5
- 关于python学成之后如何接单
- 大专计算机知识,大专院校计算机教学
- 计算机硬件维护实验报告,计算机硬件维护实验报告.pdf
- qemu创建linux虚拟机(亲测有效,virt-manger方式)
- word无法保存此文件,因为它已经在别处打开。(C:/…/STARTUP/Powerword.dot)
- iOS 静态库制作,Framework制作,Bundle制作
- 右键新建里面没有记事本和word以及excel简单解决
- 为了快速响应全球用户请求,汇量科技选择了上云
- 华为与Qualcomm率先完成TDD制式LTE Cat.1 MDM对接测试