1. 简介

hudi采用的是mvcc设计,提供了清理工具cleaner来把旧版本的文件分片删除,默认开启了清理功能,可以防止文件系统的存储空间和文件数量的无限增长。

1.1 环境

  • flink 1.13.6
  • hudi 0.11.0

1.2 清理保留策略

清理旧文件需要考虑数据查询的情况,有些长查询会占用着旧版本的文件,需要设置合适的清理策略来保留一定数量的commit或者文件版本,以提高系统的容错性

  • KEEP_LATEST_COMMITS:默认策略,表示保留最后n次提交,默认为10,通过参数hoodie.cleaner.commits.retained​或clean.retain_commits​(flink)设置
  • KEEP_LATEST_FILE_VERSIONS:保留最后n个文件版本,默认为3,通过参数hoodie.cleaner.fileversions.retained设置​
  • KEEP_LATEST_BY_HOURS:保留最后n小时,默认24小时,通过参数hoodie.cleaner.hours.retained设置,这是0.11版本后新增的

1.3 清理触发策略

目前仅支持一种触发清理的策略:CleaningTriggerStrategy#NUM_COMMITS,即根据提交的次数,默认为1,可以通过设置参数hoodie.clean.max.commits​进行修改,在flink job的每次checkpoint时都会进行触发策略的条件判断,所以在两次chekpoint之间发生过1次或n次提交,都会触发清理动作。

2. 清理流程分析

2.1 清理器(cleaner)初始化

清理逻辑是被包装成一个flink sink,在HoodieTableSink#getSinkRuntimeProvider中进行初始化

if (StreamerUtil.needsAsyncCompaction(conf)) {return Pipelines.compact(conf, pipeline);
} else {return Pipelines.clean(conf, pipeline);
}

如果是mor表且开启了异步合并(compaction.async.enabled),则创建CompactionCommitSink,继承了CleanFunction,所以包含了清理逻辑,这是由于SQL API中一个SinkRuntimeProvider不支持多个sink.
否则,直接将CleanFunction作为sink,这种情况必需启用异步清理配置clean.async.enabled,因为CleanFunction的主要方法都判断了是否为异步清理。

2.2 清理启动入口

  1. compact成功后同步清理
    需要满足条件:1)mor表,2)启用异步合并compaction.async.enabled,3)禁用异步清理clean.async.enabled。入代码在CompactionCommitSink#doCommit中:
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {this.writeClient.clean();
}
  1. chenpoint时异步清理
    需要满足条件:1)非mor表或启用异步合并compaction.async.enabled,2)启用异步清理clean.async.enabled。入口代码在CleanFunction#snapshotState中:
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {this.writeClient.startAsyncCleaning();this.isCleaning = true;
}

AsyncCleanerService#startService会启动一个线程放到线程池中执行

2.3 清理逻辑执行

清理逻辑的流程控制在基类方法BaseHoodieWriteClient#clean中,主要包含有三个步骤:生成清理计划、刷新ActiveTimeline、执行清理计划

//生成清理计划
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
//刷新ActiveTimeline缓存
table.getMetaClient().reloadActiveTimeline();
//执行清理计划
metadata = table.clean(context, cleanInstantTime, skipLocking);

2.3.1 生成清理计划

scheduleTableServiceInternal是通用的方法,根据不同的tableServiceType(clean,compact,archive,cluster)调用对应的ActionExecutor去生成plan,CleanPlanActionExecutor#execute

if (!needsCleaning(config.getCleaningTriggerStrategy())) {return Option.empty();
}
return requestClean(instantTime);

先判断是否满足1.3清理触发策略,不满足表示无需清理,否则在CleanPlanActionExecutor#requestClean中生成清理计划:

final HoodieCleanerPlan cleanerPlan = requestClean(context);
table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
  1. 扫描.hoodie目录结合1.2清理保留策略计算需要保留的最小的HoodieInstant(earliestInstant),进而找到所有需要清理的分区和分区下的文件(HoodieCleanFileInfo)
  2. 将清理计划cleanerPlan序列化成arvo文件,并在.hoodie目录保存成xxx.clean.requested文件

2.3.2 刷新ActiveTimeline缓存

因为如果清理计划生成成功,表元数据目录.hoodie下会增加**instant[action=clean,state=requested]**文件,由于ActiveTimeline频繁被读取,为了避免每次从文件系统加载,需要实时保持内存与文件系统的元数据同步。

2.3.3 执行清理计划

也是调用对应的ActiveExecutor去执行清理,实现在CleanActionExecutor#execute

List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline().filterInflightsAndRequested().getInstants().collect(Collectors.toList());
pendingCleanInstants.forEach(hoodieInstant -> {cleanMetadataList.add(runPendingClean(table, hoodieInstant));table.getMetaClient().reloadActiveTimeline();
});

从ActiveTimeline中过滤状态为requested和inflight的instant,这两个状态都是需要执行清理的

HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);
}

将instant文件(requested,inflight)反序列化为清理计划,然后进入runClean

private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {final HoodieInstant inflightInstant;if (cleanInstant.isRequested()) {inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));} else {inflightInstant = cleanInstant;}List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,TimelineMetadataUtils.serializeCleanMetadata(metadata));
}
  1. 如果处理的instant状态为requested需要先转换为inflight状态(生成xxx.clean.inflight文件),表示开始清理。
  2. 执行清理clean(context, cleanerPlan),根据清理计划的数据进行文件删除即可,首先删除每个分区下需要清理的文件,然后删除需清理的分区目录,最后收集统计数据返回。
  3. 清理成功后将infight状态转换为completed状态,表示清理完成。

3. 整体流程图

hudi系列-旧文件清理(clean)相关推荐

  1. 一键清理系统垃圾文件脚本: clean.bat

    一键清理系统垃圾文件脚本: clean.bat 最后更新于:2007-09-03 09:09 版权声明:可以任意转载,转载时请务必以超链接形式标明文章 原始出处和作者信息及 本版权声明. http:/ ...

  2. 「Hudi系列」Hudi查询写入常见问题汇总

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 阅读本文前必读: 1. 「Apache Hudi系列」核心概念与 ...

  3. vc6.0垃圾文件清理工具_MacClean360 for Mac(mac系统清理软件)

    MacClean360 for Mac能够帮助您更好的清理您的mac电脑,MacClean360 for Mac版包含了让您可以安全深度清理mac硬盘所需的所有工具,它们让您保持你的Mac正常运行,清 ...

  4. Mac重复文件清理软件—Cisdem Duplicate Finder for mac

    对于Mac用户来说,Cisdem Duplicate Finder是一款出色的重复查找工具,可以检测和查找所有重复内容,并批量删除它们以快速释放外部/内部硬盘空间.所有重复的照片,音乐,文档,视频或其 ...

  5. git 还原文件到其他版本_如何在Git中还原旧文件版本

    git 还原文件到其他版本 读: 第1部分:什么是Git? 第2部分:Git入门 第3部分:创建第一个Git存储库 第4部分:如何在Git中还原旧文件版本 第5部分:3个用于Git的图形工具 第6部分 ...

  6. 旧版java_Java旧版本清理|JavaRa旧版本清理下载_V2.4 官方版_9号软件下载

    JavaRa 是一款非常实用的Java旧版清理工具,可以帮你检查目前系统安装的 Java 是否为最新版本,并且自动移除最新版本以外的所有旧版本,让这些旧版可以不要再占据硬盘的空间了. 软件特点: 1. ...

  7. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  8. channelsftp 上传文件为空_文件上传踩坑记及文件清理原理探究

    目录 1. 糟糕的异步存储文件实现 2. 异常原因推理 3. 问题解决方式 4. spring清理文件原理 5. tomcat清理文件原理 最近搞一个文件上传功能,由于文件太大,或者说其中包含了比较多 ...

  9. vscode老编译之前的文件_vscode单击新文件时覆盖旧文件的解决方法

    vscode单击新文件时覆盖旧文件的解决方法 当一个项目很大的时候我们去找某一个文件经常使用搜索功能,本人经常使用快捷键ctrl+p进行某个文件的搜索,或者单机一个文件时会覆盖掉原来窗口中打开的文件, ...

  10. Windows下批量删除旧文件、清除缓存文件、解救C盘、拒绝C盘爆炸

    Windows下批量删除旧文件.清除缓存文件.解救C盘.拒绝C盘爆炸 目录 Windows下批量删除旧文件.清除缓存文件.解救C盘.拒绝C盘爆炸 #删除旧文件1 #删除旧文件2 #删除旧文件1 rem ...

最新文章

  1. React是如何在后台运行的
  2. 图数据库Titan安装与部署
  3. b+树时间复杂度_深入理解数据库系统之存储存引擎(二叉搜索树)
  4. lr:lr中错误解决方法19种
  5. 为衣服添加NFC功能:挥下袖子就能安全支付,打开车门坐进去就能启动汽车|Nature子刊...
  6. Markdown的基本语法
  7. LINQ体验系列文章导航
  8. Java在使用时需要注意那些问题_java使用String.split方法时要注意的问题
  9. 计算机文化基础操作考试,(计算机文化基础上机考试操作指南.doc
  10. Spark创建RDD的四种方式(一):从集合(内存)中创建 RDD代码示例
  11. java用beaninfo_JavaBeanInfo 和 Spring 之间的关系
  12. 阿里云机器学习怎么玩?这本新手入门指南揭秘了!
  13. 2018上IEC计算机高级语言(C)作业 第0次作业
  14. 栈实现二叉树的前中后序遍历
  15. 基础 | 管理视图、序列、同义词
  16. 泰戈尔《飞鸟集》节选
  17. 2020家用千兆路由器哪款好_什么路由器比较好(2020年最好千兆路由器)
  18. Eidetic:助你提升记忆力的酷应用
  19. 读《混世小农民》有感
  20. 安卓怎么实现计算纪念日

热门文章

  1. 格式工厂采样率,比特率怎样设置才能使音频声音大容量小
  2. 毕设看的硕博士论文速记
  3. 如何填写台式计算机参数,教你如何看懂电脑各配置参数
  4. 发送邮件 显示对方服务器未响应,邮件对方服务器未响应
  5. wireshark抓包分析POP3协议
  6. 【渗透技巧】pop3协议渗透
  7. Video Classification with Channel-Separated Convolutional Netwroks 论文阅读
  8. 武汉大学计算机学院乒乓球室,武汉大学经济管理学院教职工乒乓球队在武汉大学师生乒乓球赛中获佳绩...
  9. AntV G6设置高亮
  10. 关于Tungsten Fabic版本问题,这一篇文章说清了