1、由于RocketMQ操作CommitLog、ConsumeQueue文件,都是基于内存映射方法并在启动的时候,会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要一种机制来删除已过期的文件。RocketMQ顺序写Commitlog、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后,将不会再被更新,RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会管这个这个文件上的消息是否被全部消费。默认每个文件的过期时间为72小时。通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。接下来详细分析RocketMQ是如何设计与实现上述机制的。

DefaultMessageStore#addScheduleTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

DefaultMessageStore.this.cleanFilesPeriodically();

}

}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

RocketMQ会每隔10s调度一次cleanFilesPeriodically,已检测是否需要清除过期文件。执行频率可以通过设置cleanResourceInterval,默认为10s。

DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {

this.cleanCommitLogService.run();

this.cleanConsumeQueueService.run();

}

主要清除CommitLog、ConsumeQueue的过期文件。

CommitLog与ConsumeQueue对于过期文件的删除算法、逻辑大同小异,本文将以CommitLog过期文件为例来详细分析其实现原理。

DefaultMessageStore$CleanCommitLogService#run

public void run() {

try {

this.deleteExpiredFiles();

this.redeleteHangedFile();

} catch (Throwable e) {

DefaultMessageStore.log.warn(this.getServiceName() + " service has

exception. ", e);

}

}

整个执行过程分为两个大的步骤,第一个步骤:尝试删除过期文件;第二个步骤:重试删除被hange(由于被其他线程引用在第一阶段未删除的文件),在这里再重试一次。

DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();

int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();

int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

Step1:解释一下这个三个配置属性的含义。

fileReservedTime:文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除。

deletePhysicFilesInterval:删除物理文件的间隔,因为在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔时间。

destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务,

同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留的最大时间,在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除。

DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles:

boolean timeup = this.isTimeToDelete();

boolean spacefull = this.isSpaceToDelete();

boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

if (timeup || spacefull || manualDelete) {

//继续执行删除逻辑

return;

} else {

// 本次删除任务无作为。

}

Step2:RocketMQ在如下三种情况任意满足之一的情况下将继续执行删除文件操作。

1)到了删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。

2)判断磁盘空间是否充足,如果不充足,则返回true,表示应该触发过期文件删除操作。

3)预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。

重点分析一下磁盘不足的判断依据。

DefaultMessageStore$CleanCommitLogService#isSpaceToDelete

double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; // @1

cleanImmediately = false;

{

String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();

double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); // @2

if (physicRatio > diskSpaceWarningLevelRatio) { // @3

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();

if (diskok) {

DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");

}

cleanImmediately = true;

} else if (physicRatio > diskSpaceCleanForciblyRatio) {

cleanImmediately = true;

} else {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();

if (!diskok) {

DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");

}

}

if (physicRatio < 0 || physicRatio > ratio) {

DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);

return true;

}

}

代码@1:获取maxUsedSpaceRatio,表示commitlog、consumequeue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件。

代码@2:通过File#getTotalSpace()获取commitlog所在磁盘分区总的存储容量,通过File#getFreeSpace()获取commitlog目录所在磁盘文件剩余容量并得出当前该分区的物理磁盘使用率physicRatio 。

代码@3:RocketMQ另外提供了两个与磁盘空间使用率相关的系统级参数:

-Drocketmq.broker.diskSpaceWarningLevelRatio=0.90:如果磁盘分区使用率超过该阔值,将设置磁盘不可写,此时会拒绝新消息的写入。

-Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85:如果磁盘分区使用超过该阔值,建议立即执行过期文件清除,但不会拒绝新消息的写入。

判断磁盘是否可用,用当前已使用物理磁盘率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio,如果当前磁盘使用率达到上述阔值,将返回true表示磁盘已满,需要进行过期文件删除操作。

MappedFile#destroy

Step3:然后根据文件的最后一次更新时间与当前时间做比较,判断是否过期,如果已过期,调用MappedFile的destory。

MappedFile#shutdown

public void shutdown(final long intervalForcibly) {

if (this.available) {

this.available = false;

this.firstShutdownTimestamp = System.currentTimeMillis();

this.release();

} else if (this.getRefCount() > 0) {

if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {

this.refCount.set(-1000 - this.getRefCount());

this.release();

}

}

}

如果available为true,表示第一次执行shutdown方法,首先设置available为false,并记录firstShutdownTimestamp时间戳,如果当前该文件被其他线程引用,则本次不强制删除,如果没有其他线程在使用该文件,则清除MappedFile相关资源,并最终执行File#delete()方法清除文件。在拒绝被删除保护期内(destroyMapedFileIntervalForcibly)每执行一次清理任务,将引用次数减去1000,引用数小于1后,该文件最终将被删除。

关于ConsumeQueue的过期文件删除机制与Commitlog文件机制类似,本文就不重复讲解。

本文重点是理解如下参数的含义:

fileReservedTime、deletePhysicFilesInterval、destroyMapedFileIntervalForcibly、-Drocketmq.broker.diskSpaceWarningLevelRatio

-Drocketmq.broker.diskSpaceCleanForciblyRatio与获取磁盘分区总容量与剩余容量的方法。

rocketmq 消息删除_RocketMQ源码分析之文件过期删除机制相关推荐

  1. MyBatis 源码分析 - 映射文件解析过程

    1.简介 在上一篇文章中,我详细分析了 MyBatis 配置文件的解析过程.由于上一篇文章的篇幅比较大,加之映射文件解析过程也比较复杂的原因.所以我将映射文件解析过程的分析内容从上一篇文章中抽取出来, ...

  2. LinkedList中查询(contains)和删除(remove)源码分析

    一.contains源码分析 本文分析双向链表LinkedList的查询操作源码实现.jdk中源程序中,LinkedList的查询操作,通过contains(Object o)函数实现.具体见下面两部 ...

  3. Tornado源码分析 --- 静态文件处理模块

    每个web框架都会有对静态文件的处理支持,下面对于Tornado的静态文件的处理模块的源码进行分析,以加强自己对静态文件处理的理解. 先从Tornado的主要模块 web.py 入手,可以看到在App ...

  4. tfs_client php,TFS 源码分析 写文件操作 Client端

    整个写文件的总体流程这里有介绍 主要分析了写文件时,NameServer端的源码分析 这篇文章介绍写文件时,Client端的源码分析 本文描述的内容涉及TFS写入流程图中的step1, step2, ...

  5. Dubbo源码分析系列-深入Dubbo SPI机制

    导语   在之前的博客中介绍过关于Java中SPI的机制,也简单的分析了关于Java中SPI怎么去使用.SPI的全称Service Provider Interface,是一种服务发现机制.SPI的本 ...

  6. java类加载机制为什么双亲委派_[五]类加载机制双亲委派机制 底层代码实现原理 源码分析 java类加载双亲委派机制是如何实现的...

    Launcher启动类 本文是双亲委派机制的源码分析部分,类加载机制中的双亲委派模型对于jvm的稳定运行是非常重要的不过源码其实比较简单,接下来简单介绍一下我们先从启动类说起有一个Launcher类 ...

  7. mq补偿机制java代码_RocketMQ源码分析之消息消费机制-消费端消息负载均衡机制与重新分布 - Java 技术驿站-Java 技术驿站...

    1.消息消费需要解决的问题 首先再次重复啰嗦一下RocketMQ消息消费的一些基本元素的关系 主题 ---> 消息队列(MessageQueue) 1 对多 主题 ----> 消息生产者, ...

  8. 进程句柄表初始化,扩展,插入删除句柄源码分析

    一.为什么要有句柄 句柄是一个8字节的结构体,用途是指向内核对象.3环程序无法通过地址直接访问内核对象,所以需要用句柄来间接访问. 本文重点介绍句柄表,句柄本身则留到下一篇博客介绍.但因为接下来介绍句 ...

  9. View系列 :源码分析:RecyclerView滑动删除 全解析

    1:效果展示 效果很简单,就是 RecycleView的 滑动删除功能 2:效果分析 主要是三个步骤: 步骤一:是RecyclerView 的每一个条目上增加 删除 View控件,这个是静态xml页面 ...

最新文章

  1. Java SPI机制分析
  2. spring 源代码地址
  3. 有没有可以在JavaScript里可以用的锁?
  4. CF1497D Genius
  5. python3导入_Python3导入相对还是绝对的正确方法?
  6. Fast-SCNN:多分支结构共享低级特征的语义分割网络
  7. python编程(ply库)
  8. DB2 多表空间 重定向 还原
  9. UNIX高级环境编程(11)进程控制(Process Control)- 进程快照,用户标识符,进程调度...
  10. 做了三年Java,java参考文献近五年图书
  11. Python 文字转语音(TTS)
  12. Python处理图片缩略图
  13. 通过 web 录制视频(摄像头)并上传
  14. 阅读《吴军·硅谷来信》一年的回顾与思考
  15. 99乘法表,读写文件,函数
  16. 史上好电影集合--百度云
  17. 基于JAVA小微企业人事管理系统计算机毕业设计源码+数据库+lw文档+系统+部署
  18. python-requests官网_Python-Requests1-批量登录获取uid
  19. 如何拆台式计算机光驱,如何拆开光驱
  20. Adb shell命令直接打开语言设置界面

热门文章

  1. 【持续更新中】可用“海马王模拟器”来模拟安卓、苹果的环境
  2. python学习笔记-马哥2017
  3. unity3d android包太大了,unity/unity3d编译成android apk包瘦身方法
  4. DzzOffice管理员登陆方法和管理员应用介绍
  5. java 随机生成简体汉字_Java代码实现随机生成汉字的方法|chu
  6. 萌宠大作战服务器维护,萌宠大作战攻略 萌宠大作战部分常见疑问介绍
  7. 国产安卓模拟器无法安装Android9以上应用APP,你可以试试这一招!
  8. 脸部识别,简单图片识别
  9. iphone 强制刷新drawRect方法
  10. android scrollview 滚动高度,如何更改Android ScrollView的大小