文章目录

  • 版本
  • 异常分析
  • StoreCheckpoint
  • ConsumeQueue与Index文件恢复
    • 正常恢复
    • 异常恢复

版本

  1. 基于rocketmq-all-4.3.1版本

异常分析

  1. 由于ConsumeQueue和Index文件都是根据CommitLog文件异步构建的,所以ConsumeQueue、Index与CommitLog文件的数据就是最终一致,而不是强一致的。这样在Broker重启时就可能出现不一致性的情况

    • CommitLog文件同步刷盘,当准备转发给ConsumeQueue文件时突然断电或者出现故障,导致ConsumeQueue存储失败
    • 在刷盘时,由于突然断电,只写入一部分数据到磁盘CommitLog文件中
    • 当数据写入CommitLog文件后才会将刷盘点记录到检查点中,有可能刷盘完成,但是写入检查点文件并没有完成
  2. RocketMQ 有两种文件恢复机制。判断异常的方式是在 broker启动的时候创建一个 abort 空文件,在正常结束的时候删掉这个文件。在下一次启动 broker 时,如果发现了 abort 文件,则认为是异常宕机,否则就是正常关机。

    • 正常关机恢复:先从倒数第三个文件开始进行恢复,然后按照消息的存储格式进行查找,如果改文件中所有的消息都符合消息存储格式,则继续查找下一个文件,直到找到最后一条消息所在的位置
    • 异常宕机恢复:异常停止刷盘时,从最后一个文件开始查找,在查找时读取改文件第一条消息的存储时间,如果这个存储时间小于检查点文件中的刷盘时间,就可以从这个文件开始恢复,如果这个文件中第一条消息的存储时间大于检查点,说明不能从这个文件开始恢复,需要寻找上一个文件。因为检查点文件中的刷盘点代表的是100%可靠的消息。
  3. 关机恢复机制设计的目的就是保证数据0丢失,RocketMQ通过abort和checkpoint来保证数据0丢失

    • abort文件:abort文件时一个空文件,在Broker启动时会被创建,当正常关闭的时候会被删除。如果Broker是异常关闭,则不会删除此文件
    • checkpoint文件:是一个检查点文件,此文件保存了Broker最后一次正常存储数据的时间,当重启Broker时,恢复程序可以从此文件获取应该从哪个时刻开始恢复数据
  4. 当索引文件刷盘成功,消息队列消费文件未刷盘成功且宕机时,会造成消息消费队列文件丢失的问题。但只要 Commitlog 文件没有丢失,就可以利用 RocketMQ 的文件恢复机制,恢复丢失的消息消费队列文件。在 RocketMQ 的文件恢复机制中,有针对异常宕机进行文件恢复的机制。当 broker 异常启动,在文件恢复过程中,RocketMQ 会将最后一个有效文件的所有消息转发到消息消费队列和索引文件,确保不丢失消息,但同时也会带来重复消费的问题,RocketMQ 保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。

StoreCheckpoint

  1. StoreCheckpoint(检查点)主要用于记录CommitLogConsumeQueueIndex文件的刷盘时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复。checkpoint(检查点)文件固定长度为4KB

  2. 当索引文件刷盘成功,消息队列消费文件未刷盘成功且宕机时,会造成消息消费队列文件丢失的问题。但只要 Commitlog 文件没有丢失,就可以利用 RocketMQ 的文件恢复机制,恢复丢失的消息消费队列文件。在 RocketMQ 的文件恢复机制中,有针对异常宕机进行文件恢复的机制。当 broker 异常启动,在文件恢复过程中,RocketMQ 会将最后一个有效文件的所有消息转发到消息消费队列和索引文件,确保不丢失消息,但同时也会带来重复消费的问题,RocketMQ 保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。

  3. StoreCheckpoint文件源码

    public class StoreCheckpoint {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private final RandomAccessFile randomAccessFile;private final FileChannel fileChannel;private final MappedByteBuffer mappedByteBuffer;//CommitLog最新一条记录的存储时间private volatile long physicMsgTimestamp = 0;//ConsumeQueue最新一条记录的存储时间private volatile long logicsMsgTimestamp = 0;//Index File最新一条记录的存储时间private volatile long indexMsgTimestamp = 0;public StoreCheckpoint(final String scpPath) throws IOException {File file = new File(scpPath);MappedFile.ensureDirOK(file.getParent());boolean fileExists = file.exists();this.randomAccessFile = new RandomAccessFile(file, "rw");//一旦建立映射(map),fileChannel其实就可以关闭了,关闭fileChannel对映射不会有影响//TODO 所以这个地方的fileChannel是不是直接关闭就好?this.fileChannel = this.randomAccessFile.getChannel();this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);if (fileExists) {log.info("store checkpoint file exists, " + scpPath);this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));} else {log.info("store checkpoint file not exists, " + scpPath);}}public void shutdown() {this.flush();// unmap mappedByteBufferMappedFile.clean(this.mappedByteBuffer);try {this.fileChannel.close();} catch (IOException e) {log.error("Failed to properly close the channel", e);}}public void flush() {this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);this.mappedByteBuffer.force();}public long getPhysicMsgTimestamp() {return physicMsgTimestamp;}public void setPhysicMsgTimestamp(long physicMsgTimestamp) {this.physicMsgTimestamp = physicMsgTimestamp;}public long getLogicsMsgTimestamp() {return logicsMsgTimestamp;}public void setLogicsMsgTimestamp(long logicsMsgTimestamp) {this.logicsMsgTimestamp = logicsMsgTimestamp;}public long getMinTimestampIndex() {return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);}public long getMinTimestamp() {long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);//TODO 这里为什么要减去3000?min -= 1000 * 3;if (min < 0)min = 0;return min;}public long getIndexMsgTimestamp() {return indexMsgTimestamp;}public void setIndexMsgTimestamp(long indexMsgTimestamp) {this.indexMsgTimestamp = indexMsgTimestamp;}}
    

ConsumeQueue与Index文件恢复

  1. 存储文件的启动时恢复主要完成成flushedWherecommittedWhere指针的设置、将消息消费队列最大偏移量加载到内存,并删除flushedWhere之后所有的文件

  2. DefaultMessageStore#load是文件恢复的入口

    • 判断abort文件是否存在,此文件在启动时创建,正常停止后时会被删除

    • 加载延迟日志文件

    • 加载CommitLog文件,按照文件名进行排序。如果文件与配置中的CommitLog文件大小不一致,则直接返回,会忽略后续的文件

    • 加载ComsumeQueue文件

    • 加载checkpoint文件

    • 加载Index文件,如果上次异常退出,而且Index文件刷盘时间大于检查点文件最大的消息时间戳,则立即销毁此文件

    • 根据是否正常停止,执行不同的恢复策略

  3. load源码

    public boolean load() {boolean result = true;try {//判断abort文件是否存在,此文件在启动时创建,正常停止时会被删除boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");if (null != scheduleMessageService) {result = result && this.scheduleMessageService.load();}// load Commit Logresult = result && this.commitLog.load();// load Consume Queueresult = result && this.loadConsumeQueue();if (result) {this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));this.indexService.load(lastExitOK);this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {this.allocateMappedFileService.shutdown();}return result;
    }
    

正常恢复

  1. 正常恢复通过CommitLog#recoverNormally实现

    • 第一步:从倒数第三个文件开始恢复,如果不足3个文件,从第一个文件开始恢复
    • 第二步:遍历CommitLog文件,每次取出一条消息,验证消息。
      • 如果验证结果为true,并且消息长度大于0,表示消息正确并且没有达到末尾,mappedFileOffset指针向前移动本条消息的长度。继续循环,验证下一条消息。
      • 如果验证结果为true,并且消息长度等于0,表示已经达到文件末尾,此时如果有下一个文件,则重置mappedFileOffset和processOffset,继续下一次循环
      • 如果验证结果为false,表示文件读取错误,此时文件可能不完整。直接退出循环
    • 第三步:更新mappedFileQueue的FlushedWhere和CommittedWhere指针位置
    • 第四步:删除processOffset之后的所有文件(因为文件不完整,不能加载)
  2. 正常恢复CommitLog#recoverNormally的源码

    public void recoverNormally() {// 默认开启CRC验证boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third file// 从倒数第三个文件开始恢复int index = mappedFiles.size() - 3;if (index < 0)// 不足三个文件,则从第一个文件开始恢复index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//processOffset为CommitLog文件已确认的物理偏移量long processOffset = mappedFile.getFileFromOffset();//当前文件已校验通过的物理偏移量long mappedFileOffset = 0;// 遍历CommitLog文件while (true) {// 查找消息,根据配置是否验证CRCDispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (dispatchRequest.isSuccess() && size > 0) {// 没有到文件末尾,mappedFileOffset指针向前移动本条消息的长度mappedFileOffset += size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offset// 文件末尾else if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenlog.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());break;} else {// 下一个文件,重置mappedFileOffset和processOffset,继续下一次循环mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();//processOffset为CommitLog文件已确认的物理偏移量processOffset = mappedFile.getFileFromOffset();//当前已经校验通过的偏移量mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}// Intermediate file read errorelse if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}// 更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);this.mappedFileQueue.truncateDirtyFiles(processOffset);}
    }
    

异常恢复

  1. 异常恢复通过CommitLog#recoverAbnormally实现基本与正常恢复逻辑差不多

    • 第一步:异常停止,从最后一个文件倒序,找到第一个消息存储正常的文件。
    • 第二步:遍历CommitLog文件,每次取出一条消息,验证消息。
      • 如果验证结果为true,并且消息长度大于0,表示消息正确并且没有达到末尾,mappedFileOffset指针向前移动本条消息的长度。继续循环,验证下一条消息。
      • 如果验证结果为true,并且消息长度等于0,表示已经达到文件末尾,此时如果有下一个文件,则重置mappedFileOffset和processOffset,继续下一次循环
      • 如果验证结果为false,表示文件读取错误,此时文件可能不完整。直接退出循环
    • 第三步:更新mappedFileQueue的FlushedWhere和CommittedWhere指针位置
    • 第四步:删除processOffset之后的所有文件(因为文件不完整,不能加载)
    • 第五步:如果CommitLog目录没有消息文件,在ConsuneQueue目录下存在的文件则需要销毁
  2. 异常恢复CommitLog#recoverAbnormally源码分析

    public void recoverAbnormally() {// recover by the minimum time stamp// 默认为ttue,即校验消息CRCboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which file// 从最后一个文件开始向前遍历int index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);// 找到第一个消息存储正常的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {// 第一个文件index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//processOffset为CommitLog文件已确认的物理偏移量long processOffset = mappedFile.getFileFromOffset();//当前文件已校验通过的物理偏移量long mappedFileOffset = 0;// 遍历CommitLog文件while (true) {// 查找消息,根据配置是否验证CRCDispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (size > 0) {mappedFileOffset += size;if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {this.defaultMessageStore.doDispatch(dispatchRequest);}} else {this.defaultMessageStore.doDispatch(dispatchRequest);}}// Intermediate file read errorelse if (size == -1) {log.info("recover physics file end, " + mappedFile.getFileName());break;}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offsetelse if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();//当前已经校验通过的偏移量mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}}// 更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant datathis.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}// Commitlog case files are deletedelse {// 未找到有效的MappedFile,更新flushwhere和CommittedWhere为0this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);// 删除ConsumeQueue文件this.defaultMessageStore.destroyLogics();}
    }
    

RocketMQ源码分析(十五)之文件恢复相关推荐

  1. RocketMQ源码分析(十二)之CommitLog同步与异步刷盘

    文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...

  2. 【转】ABP源码分析十五:ABP中的实用扩展方法

    类名 扩展的类型 方法名 参数 作用 XmlNodeExtensions XmlNode GetAttributeValueOrNull attributeName Gets an   attribu ...

  3. springfox 源码分析(十五) 归档得到Documentation文档对象

    通过上篇的分析,我们已经得到了ApiListing的map集合,接下来最终做文档归档,得到Documentation对象 /**** 最终生成Documentation文档对象* @param con ...

  4. GCC源码分析(十六) — gimple转RTL(pass_expand)(下)

    版权声明:本文为CSDN博主「ashimida@」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明. 原文链接:https://blog.csdn.net/lidan1 ...

  5. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  6. CloudCompare源码分析:读取ply文件

    CloudCompare源码分析_读取ply文件 写这些博客的原因,是因为打算好好研究一下点云的各种库的源码,其中比较知名的是PCL(point cloud library)和CC(CloudComp ...

  7. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  8. Flume 1.7 源码分析(五)从Channel获取数据写入Sink

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  9. Cowboy 源码分析(十八)

    在上一篇中,我们整理了下cowboy_http_protocol:header/3函数,在文章的末尾留下2个没有讲到的函数,今天,我们先看下cowboy_http_protocol:error_ter ...

  10. spring源码分析第五天------springAOP核心原理及源码分析

    spring源码分析第五天------springAOP核心原理及源码分析 1. 面向切面编程.可以通过预 编译方式和运行期动态代理实现在不修改源代码的情况下给程序动态统一添加功能的一种技术 切面(A ...

最新文章

  1. RT-Thread 学习笔记(一)---系统节拍tick
  2. FastStone Capture
  3. python orm框架sqlalchemy_python orm 框架中sqlalchemy用法实例详解
  4. html同时执行多个ajax,Ajax方法详解以及多个Ajax并发执行
  5. Makefile的写法
  6. Qt:Qt使用WM_COPYDATA消息进行进程通信
  7. cesium入门示例-3dTiles加载
  8. Abp 为Swagger接口页添加详细注释
  9. ROS配置ipv6方法
  10. python 智能抠图GUI
  11. 鹰眼摄像头(OV7725)的使用
  12. ES集群health为yellow解决办法
  13. fastadmin 微信支付宝整合插件 支付宝APP支付 ALIN10146
  14. Convert Kilometers to Miles 2010.3.6
  15. vue玩转移动端H5微信支付和支付宝支付
  16. yolov3网络(DarkNet53)结构详解以及Pytorch代码实现
  17. 记录一次并发情况下的redis导致服务假死的问题
  18. angular ngx-bootstrap
  19. python Django音乐推荐系统
  20. c语言程序中*p代表什么,C语言声明指针的时候int*p到底是什么意思? 爱问知识人...

热门文章

  1. 重装战姬服务器维护,《重装战姬》10月29日更新维护公告
  2. java计算机毕业设计学生成绩管理系统源程序+mysql+系统+lw文档+远程调试
  3. 近14年美股各个板块收益之间的差异
  4. vue + Electron 制作桌面应用
  5. 小波图像处理 —— 奇异点(不连续点)检测
  6. 笔记本电脑加装内存条和固态硬盘的前期准备工作
  7. android 自定义剪裁,Android自定义View实现照片裁剪框与照片裁剪功能
  8. 随机森林 OOB理解
  9. python3.5.2 mysql Exccel
  10. 臀部肌群锻炼方法大全