RocketMQ(七)RocketMQ消息生产及消息储存机制
目录
- 1、消息生产
- 1.1 消息的生产过程
- 1.2 Queue选择算法
- 2、消息储存
- 2.1 存储介质
- 2.2 消息的存储和发送
- 2.3 消息存储结构
- 2.4 刷盘机制
1、消息生产
1.1 消息的生产过程
Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程:
- Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求。
- NameServer返回该Topic的路由表及Broker列表。
- Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息。
- Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩。
- Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue中。
路由表:
实际是一个Map,key为Topic名称,value是一个QueueData实例列表。QueueData并不是一个Queue对应一个QueueData,而是一个Broker中该Topic的所有Queue对应一个QueueData。简单来说,路由表的key为Topic名称,value则为所有涉及该Topic的BrokerName列表。
Broker列表:
实际是一个Map,key为brokerName,value为BrokerData。brokerName名称相同的Master-Slave集群对应一BrokerData。BrokerData中包含brokerName及一个map。该map的key为brokerId,value为该broker对应的地址。brokerId为0表示该broker为Master,非0表示Slave。
1.2 Queue选择算法
对于无序消息,其Queue选择算法,也称为消息投递算法,常见的有两种:轮询算法、最小投递延迟算法
轮询算法
默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。该算法存在一个问题:由于某些原因,在某些Broker上的Queue可能投递延迟较严重。从而导致Producer的缓存队列中出现较大的消息积压,影响消息的投递性能。
最小投递延迟算法
该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。但是也存在一个问题:消息在Queue上的分配不均匀。投递延迟小的Queue可能会存在大量的消息。而对该Queue的消费者压力会增大,降低消息的消费能力,可能会导致MQ中消息的堆积。
2、消息储存
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
- 消息生成者发送消息
- MQ收到消息,将消息进行持久化,在存储中新增一条记录
- 返回ACK给生产者
- MQ push 消息给对应的消费者,然后等待消费者返回ACK
- 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
- MQ删除消息
2.1 存储介质
- 关系型数据库DB
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
- 文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
2.2 消息的存储和发送
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
从磁盘复制数据到内核态内存;
从内核态内存复制到用户态内存;
然后从用户态内存复制到网络驱动的内核态内存;
最后是从网络驱动的内核态内存复制到网卡中进行传输。
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
2.3 消息存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
- CommitLog:存储消息的元数据
- ConsumerQueue:存储消息在CommitLog的索引
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
1)CommitLog:
commitlog目录中存放着很多的mappedFile文件,当前Broker中的消息的元数据都是落盘到这些mappedFile文件中的,一个Broker中仅包含一个commitlog目录。mappedFile文件大小为1G(小于等于1G),文件名由20位十进制数构成,表示当前文件的第一条消息的起始位移偏移量。
第一个文件名一定是20位0构成的。因为第一个文件的第一条消息的偏移量commitlog offset为0当第一个文件放满时,则会自动生成第二个文件继续存放消息。假设第一个文件大小是1073741820字节(1G = 1073741824字节),则第二个文件名就是00000000001073741824。以此类推,第n个文件名应该是前n-1个文件大小之和。一个Broker中所有mappedFile文件的commitlog offset是连续的
2)ConsumeQueue:
为了提高效率,会为每个Topic在~/store/consumequeue
中创建一个目录,目录名为Topic名称。在该Topic目录下,会再为每个该Topic的Queue建立一个目录,目录名为queueId。每个目录中存放着若干consumequeue文件,consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息。
consumequeue文件名也由20位数字构成,表示当前文件的第一个索引条目的起始位移偏移量。
每个consumequeue文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性:消息在mappedFile文件中的偏移量CommitLog Offset、消息长度、消息Tag的hashcode值。这三个属性占20个字节,所以每个文件的大小是固定的30w * 20字节。
3)IndexFile:
RocketMQ提供了根据key进行消息查询的功能。该查询是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索引数据是在包含了key的消息被发送到Broker时写入的。如果消息中没有包含key,则不会写入,key是发送消息时手动设置的消息标识。
4)对文件的读写:
消息写入
一条消息进入到Broker后经历了以下几个过程才最终被持久化。
- Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即
QueueOffset - 将queueId、queueOffset等数据,与消息一起封装为消息单元
- 将消息单元写入到commitlog
- 同时,形成消息索引条目
- 将消息索引条目分发到相应的consumequeue
消息拉取
当Consumer来拉取消息时会经历以下几个步骤:
- Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的消息offset
- Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag
- Broker计算在该consumequeue中的queueOffset
- 从该queueOffset处开始向后查找第一个指定Tag的索引条目
- 解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset
- 从对应commitlog offset中读取消息单元,并发送给Consumer
2.4 刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1)同步刷盘:
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
2)异步刷盘:
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置:
同步刷盘与异步刷盘,都是通过Broker配置文件里的flushDiskType
参数设置的,这个参数被配置成SYNC_FLUSH(同步)、ASYNC_FLUSH(异步)中的 一个。
RocketMQ(七)RocketMQ消息生产及消息储存机制相关推荐
- 17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列
点击上方"方志朋",选择"置顶公众号" 技术文章第一时间送达! 作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/qu ...
- activemq后台管理 看topic消息_17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列...
作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/question/43557507 一.资料文档 二.开发语言 三.支持的协议 四.消息存储 五.消息事务 ...
- 消息中间件学习总结(16)——17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列
本文将从,Kafka.RabbitMQ.ZeroMQ.RocketMQ.ActiveMQ 17 个方面综合对比作为消息队列使用时的差异. 一.资料文档 Kafka:中.有kafka作者自己写的书,网上 ...
- 详解,最新整理,RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略
消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...
- RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略
消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...
- RocketMQ同步消息、异步消息、单向消息详解
一.RocketMQ 支持 3 种消息发送方式 : 1.同步消息(sync message ) producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送 ...
- 阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
从"消息"到"消息.事件.流"的大融合 消息队列作为当代应用的通信基础设施,微服务架构应用的核心依赖,通过异步解耦能力让用户更高效地构建分布式.高性能.弹性健壮 ...
- RocketMQ 千锤百炼--哈啰在分布式消息治理和微服务治理中的实践
作者|梁勇 背景 哈啰已进化为包括两轮出行(哈啰单车.哈啰助力车.哈啰电动车.小哈换电).四轮出行(哈啰顺风车.全网叫车.哈啰打车)等的综合化移动出行平台,并向酒店.到店团购等众多本地生活化生 ...
- 消息中间件:RocketMQ 介绍(特性、术语、原理、优缺点、消息顺序、消息重复)
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 消息中间件的作用 1. 应用解耦 2. 异步处理 比如用户注册场景,注册主流程完成以后,需要调用邮件 ...
最新文章
- 日本16岁编程少年,课余打造一款新冠感染追踪App
- 点云三角化之后还能贴图嘛_雪糕化了之后重新冷冻还能吃吗?宁波这个实验真相了!...
- flutter ios打包_Flutter通过BasicMessageChannel与Android iOS 的双向通信
- 跨平台桌面应用开发工具Electron v11.0.4
- Flutter PageView 使用详细概述
- Servlet实现图片读取显示
- java 按字节读写二进制文件(Base64编码解码)
- mysql4.1数据库_MySQL数据库练习-4.1
- docker安装jdk1.8
- 算法入门 13.并查集
- Win10 最下面的任务栏不显示正在打开的窗口了,打开任何东西任务栏都不显示
- L3立法试水,为自动驾驶产业带来什么?
- php给发qq消息,PHP 模拟QQ登录及发送消息实现方法
- 【C++】复制省略(Copy elision)
- 许久不动笔,,再来点人生感悟吧
- 这份 pip 使用方法,应该算是全网最全了
- Android 上实现像微信一样的用Fragment来实现的Tab切页效果 提供源码下载
- Linux文件和目录管理(3)
- shell中如何判断两个字符串相等
- MPI并行程序开发设计----------------------------------并行编程模型和算法等介绍