RocketMQ源码分析(十五)之文件恢复
文章目录
- 版本
- 异常分析
- StoreCheckpoint
- ConsumeQueue与Index文件恢复
- 正常恢复
- 异常恢复
版本
- 基于
rocketmq-all-4.3.1
版本
异常分析
由于ConsumeQueue和Index文件都是根据CommitLog文件异步构建的,所以ConsumeQueue、Index与CommitLog文件的数据就是最终一致,而不是强一致的。这样在Broker重启时就可能出现不一致性的情况
- CommitLog文件同步刷盘,当准备转发给ConsumeQueue文件时突然断电或者出现故障,导致ConsumeQueue存储失败
- 在刷盘时,由于突然断电,只写入一部分数据到磁盘CommitLog文件中
- 当数据写入CommitLog文件后才会将刷盘点记录到检查点中,有可能刷盘完成,但是写入检查点文件并没有完成
RocketMQ 有两种文件恢复机制。判断异常的方式是在 broker启动的时候创建一个 abort 空文件,在正常结束的时候删掉这个文件。在下一次启动 broker 时,如果发现了 abort 文件,则认为是异常宕机,否则就是正常关机。
- 正常关机恢复:先从倒数第三个文件开始进行恢复,然后按照消息的存储格式进行查找,如果改文件中所有的消息都符合消息存储格式,则继续查找下一个文件,直到找到最后一条消息所在的位置
- 异常宕机恢复:异常停止刷盘时,从最后一个文件开始查找,在查找时读取改文件第一条消息的存储时间,如果这个存储时间小于检查点文件中的刷盘时间,就可以从这个文件开始恢复,如果这个文件中第一条消息的存储时间大于检查点,说明不能从这个文件开始恢复,需要寻找上一个文件。因为检查点文件中的刷盘点代表的是100%可靠的消息。
关机恢复机制设计的目的就是保证数据0丢失,RocketMQ通过abort和checkpoint来保证数据0丢失
- abort文件:abort文件时一个空文件,在Broker启动时会被创建,当正常关闭的时候会被删除。如果Broker是异常关闭,则不会删除此文件
- checkpoint文件:是一个检查点文件,此文件保存了Broker最后一次正常存储数据的时间,当重启Broker时,恢复程序可以从此文件获取应该从哪个时刻开始恢复数据
当索引文件刷盘成功,消息队列消费文件未刷盘成功且宕机时,会造成消息消费队列文件丢失的问题。但只要 Commitlog 文件没有丢失,就可以利用 RocketMQ 的文件恢复机制,恢复丢失的消息消费队列文件。在 RocketMQ 的文件恢复机制中,有针对异常宕机进行文件恢复的机制。当 broker 异常启动,在文件恢复过程中,RocketMQ 会将最后一个有效文件的所有消息转发到消息消费队列和索引文件,确保不丢失消息,但同时也会带来重复消费的问题,RocketMQ 保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。
StoreCheckpoint
StoreCheckpoint(检查点)主要用于记录
CommitLog
、ConsumeQueue
、Index
文件的刷盘时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复。checkpoint(检查点)文件固定长度为4KB
当索引文件刷盘成功,消息队列消费文件未刷盘成功且宕机时,会造成消息消费队列文件丢失的问题。但只要 Commitlog 文件没有丢失,就可以利用 RocketMQ 的文件恢复机制,恢复丢失的消息消费队列文件。在 RocketMQ 的文件恢复机制中,有针对异常宕机进行文件恢复的机制。当 broker 异常启动,在文件恢复过程中,RocketMQ 会将最后一个有效文件的所有消息转发到消息消费队列和索引文件,确保不丢失消息,但同时也会带来重复消费的问题,RocketMQ 保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。
StoreCheckpoint文件源码
public class StoreCheckpoint {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private final RandomAccessFile randomAccessFile;private final FileChannel fileChannel;private final MappedByteBuffer mappedByteBuffer;//CommitLog最新一条记录的存储时间private volatile long physicMsgTimestamp = 0;//ConsumeQueue最新一条记录的存储时间private volatile long logicsMsgTimestamp = 0;//Index File最新一条记录的存储时间private volatile long indexMsgTimestamp = 0;public StoreCheckpoint(final String scpPath) throws IOException {File file = new File(scpPath);MappedFile.ensureDirOK(file.getParent());boolean fileExists = file.exists();this.randomAccessFile = new RandomAccessFile(file, "rw");//一旦建立映射(map),fileChannel其实就可以关闭了,关闭fileChannel对映射不会有影响//TODO 所以这个地方的fileChannel是不是直接关闭就好?this.fileChannel = this.randomAccessFile.getChannel();this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);if (fileExists) {log.info("store checkpoint file exists, " + scpPath);this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));} else {log.info("store checkpoint file not exists, " + scpPath);}}public void shutdown() {this.flush();// unmap mappedByteBufferMappedFile.clean(this.mappedByteBuffer);try {this.fileChannel.close();} catch (IOException e) {log.error("Failed to properly close the channel", e);}}public void flush() {this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);this.mappedByteBuffer.force();}public long getPhysicMsgTimestamp() {return physicMsgTimestamp;}public void setPhysicMsgTimestamp(long physicMsgTimestamp) {this.physicMsgTimestamp = physicMsgTimestamp;}public long getLogicsMsgTimestamp() {return logicsMsgTimestamp;}public void setLogicsMsgTimestamp(long logicsMsgTimestamp) {this.logicsMsgTimestamp = logicsMsgTimestamp;}public long getMinTimestampIndex() {return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);}public long getMinTimestamp() {long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);//TODO 这里为什么要减去3000?min -= 1000 * 3;if (min < 0)min = 0;return min;}public long getIndexMsgTimestamp() {return indexMsgTimestamp;}public void setIndexMsgTimestamp(long indexMsgTimestamp) {this.indexMsgTimestamp = indexMsgTimestamp;}}
ConsumeQueue与Index文件恢复
存储文件的启动时恢复主要完成成
flushedWhere
、committedWhere
指针的设置、将消息消费队列最大偏移量加载到内存,并删除flushedWhere
之后所有的文件DefaultMessageStore#load
是文件恢复的入口判断abort文件是否存在,此文件在启动时创建,正常停止后时会被删除
加载延迟日志文件
加载CommitLog文件,按照文件名进行排序。如果文件与配置中的CommitLog文件大小不一致,则直接返回,会忽略后续的文件
加载ComsumeQueue文件
加载checkpoint文件
加载Index文件,如果上次异常退出,而且Index文件刷盘时间大于检查点文件最大的消息时间戳,则立即销毁此文件
根据是否正常停止,执行不同的恢复策略
load源码
public boolean load() {boolean result = true;try {//判断abort文件是否存在,此文件在启动时创建,正常停止时会被删除boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");if (null != scheduleMessageService) {result = result && this.scheduleMessageService.load();}// load Commit Logresult = result && this.commitLog.load();// load Consume Queueresult = result && this.loadConsumeQueue();if (result) {this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));this.indexService.load(lastExitOK);this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {this.allocateMappedFileService.shutdown();}return result; }
正常恢复
正常恢复通过
CommitLog#recoverNormally
实现- 第一步:从倒数第三个文件开始恢复,如果不足3个文件,从第一个文件开始恢复
- 第二步:遍历CommitLog文件,每次取出一条消息,验证消息。
- 如果验证结果为true,并且消息长度大于0,表示消息正确并且没有达到末尾,mappedFileOffset指针向前移动本条消息的长度。继续循环,验证下一条消息。
- 如果验证结果为true,并且消息长度等于0,表示已经达到文件末尾,此时如果有下一个文件,则重置mappedFileOffset和processOffset,继续下一次循环
- 如果验证结果为false,表示文件读取错误,此时文件可能不完整。直接退出循环
- 第三步:更新mappedFileQueue的FlushedWhere和CommittedWhere指针位置
- 第四步:删除processOffset之后的所有文件(因为文件不完整,不能加载)
正常恢复
CommitLog#recoverNormally
的源码public void recoverNormally() {// 默认开启CRC验证boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third file// 从倒数第三个文件开始恢复int index = mappedFiles.size() - 3;if (index < 0)// 不足三个文件,则从第一个文件开始恢复index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//processOffset为CommitLog文件已确认的物理偏移量long processOffset = mappedFile.getFileFromOffset();//当前文件已校验通过的物理偏移量long mappedFileOffset = 0;// 遍历CommitLog文件while (true) {// 查找消息,根据配置是否验证CRCDispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (dispatchRequest.isSuccess() && size > 0) {// 没有到文件末尾,mappedFileOffset指针向前移动本条消息的长度mappedFileOffset += size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offset// 文件末尾else if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenlog.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());break;} else {// 下一个文件,重置mappedFileOffset和processOffset,继续下一次循环mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();//processOffset为CommitLog文件已确认的物理偏移量processOffset = mappedFile.getFileFromOffset();//当前已经校验通过的偏移量mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}// Intermediate file read errorelse if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}// 更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);this.mappedFileQueue.truncateDirtyFiles(processOffset);} }
异常恢复
异常恢复通过
CommitLog#recoverAbnormally
实现基本与正常恢复逻辑差不多- 第一步:异常停止,从最后一个文件倒序,找到第一个消息存储正常的文件。
- 第二步:遍历CommitLog文件,每次取出一条消息,验证消息。
- 如果验证结果为true,并且消息长度大于0,表示消息正确并且没有达到末尾,mappedFileOffset指针向前移动本条消息的长度。继续循环,验证下一条消息。
- 如果验证结果为true,并且消息长度等于0,表示已经达到文件末尾,此时如果有下一个文件,则重置mappedFileOffset和processOffset,继续下一次循环
- 如果验证结果为false,表示文件读取错误,此时文件可能不完整。直接退出循环
- 第三步:更新mappedFileQueue的FlushedWhere和CommittedWhere指针位置
- 第四步:删除processOffset之后的所有文件(因为文件不完整,不能加载)
- 第五步:如果CommitLog目录没有消息文件,在ConsuneQueue目录下存在的文件则需要销毁
异常恢复
CommitLog#recoverAbnormally
源码分析public void recoverAbnormally() {// recover by the minimum time stamp// 默认为ttue,即校验消息CRCboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which file// 从最后一个文件开始向前遍历int index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);// 找到第一个消息存储正常的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {// 第一个文件index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//processOffset为CommitLog文件已确认的物理偏移量long processOffset = mappedFile.getFileFromOffset();//当前文件已校验通过的物理偏移量long mappedFileOffset = 0;// 遍历CommitLog文件while (true) {// 查找消息,根据配置是否验证CRCDispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (size > 0) {mappedFileOffset += size;if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {this.defaultMessageStore.doDispatch(dispatchRequest);}} else {this.defaultMessageStore.doDispatch(dispatchRequest);}}// Intermediate file read errorelse if (size == -1) {log.info("recover physics file end, " + mappedFile.getFileName());break;}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offsetelse if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();//当前已经校验通过的偏移量mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}}// 更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant datathis.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}// Commitlog case files are deletedelse {// 未找到有效的MappedFile,更新flushwhere和CommittedWhere为0this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);// 删除ConsumeQueue文件this.defaultMessageStore.destroyLogics();} }
RocketMQ源码分析(十五)之文件恢复相关推荐
- RocketMQ源码分析(十二)之CommitLog同步与异步刷盘
文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...
- 【转】ABP源码分析十五:ABP中的实用扩展方法
类名 扩展的类型 方法名 参数 作用 XmlNodeExtensions XmlNode GetAttributeValueOrNull attributeName Gets an attribu ...
- springfox 源码分析(十五) 归档得到Documentation文档对象
通过上篇的分析,我们已经得到了ApiListing的map集合,接下来最终做文档归档,得到Documentation对象 /**** 最终生成Documentation文档对象* @param con ...
- GCC源码分析(十六) — gimple转RTL(pass_expand)(下)
版权声明:本文为CSDN博主「ashimida@」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明. 原文链接:https://blog.csdn.net/lidan1 ...
- 《RocketMQ源码分析》NameServer如何处理Broker的连接
<RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...
- CloudCompare源码分析:读取ply文件
CloudCompare源码分析_读取ply文件 写这些博客的原因,是因为打算好好研究一下点云的各种库的源码,其中比较知名的是PCL(point cloud library)和CC(CloudComp ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- Flume 1.7 源码分析(五)从Channel获取数据写入Sink
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Cowboy 源码分析(十八)
在上一篇中,我们整理了下cowboy_http_protocol:header/3函数,在文章的末尾留下2个没有讲到的函数,今天,我们先看下cowboy_http_protocol:error_ter ...
- spring源码分析第五天------springAOP核心原理及源码分析
spring源码分析第五天------springAOP核心原理及源码分析 1. 面向切面编程.可以通过预 编译方式和运行期动态代理实现在不修改源代码的情况下给程序动态统一添加功能的一种技术 切面(A ...
最新文章
- RT-Thread 学习笔记(一)---系统节拍tick
- FastStone Capture
- python orm框架sqlalchemy_python orm 框架中sqlalchemy用法实例详解
- html同时执行多个ajax,Ajax方法详解以及多个Ajax并发执行
- Makefile的写法
- Qt:Qt使用WM_COPYDATA消息进行进程通信
- cesium入门示例-3dTiles加载
- Abp 为Swagger接口页添加详细注释
- ROS配置ipv6方法
- python 智能抠图GUI
- 鹰眼摄像头(OV7725)的使用
- ES集群health为yellow解决办法
- fastadmin 微信支付宝整合插件 支付宝APP支付 ALIN10146
- Convert Kilometers to Miles 2010.3.6
- vue玩转移动端H5微信支付和支付宝支付
- yolov3网络(DarkNet53)结构详解以及Pytorch代码实现
- 记录一次并发情况下的redis导致服务假死的问题
- angular ngx-bootstrap
- python Django音乐推荐系统
- c语言程序中*p代表什么,C语言声明指针的时候int*p到底是什么意思? 爱问知识人...