Hudi Clean 清理文件实现分析
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun
前言
源码层面总结分析Hudi Clean是如何实现的,不了解Hudi Clean的可以先看这篇:一文彻底理解Apache Hudi的清理服务。
Hudi Clean主要是清理删除不需要的历史文件,可以根据实际业务需要配置参数,不能影响查询,比如某个查询语句正在用某个文件,Clean如果删除了这个文件,查询就会报错。
这里只是删除历史文件,Hudi的文件是有多个版本的,不管配置什么参数,使用什么策略,都不会删除当前最新版本的文件。
Hudi 0.9.0版本有两种清理策略KEEP_LATEST_COMMITS
和KEEP_LATEST_FILE_VERSIONS
,默认为KEEP_LATEST_COMMITS
KEEP_LATEST_COMMITS:简单讲就是根据commit的次数,默认保留最新的10个commit的所有文件,对于10个之前的文件只保留最新版本的文件,历史文件全部删除
KEEP_LATEST_FILE_VERSIONS:简单讲就是保留文件的版本数,默认保留三个版本
具体的可以看上面的那篇公众号文章
目前最新版本0.11.0 添加了一个新的策略KEEP_LATEST_BY_HOURS
:根据小时数清理,默认保留最近24小时的文件,具体实现请查看PR:[HUDI-349] Added new cleaning policy based on number of hours
本文以Hudi 0.9.0 Java Client COW表 进行分析
Insert
HoodieJavaWriteClient->postWrite->postCommit->autoCleanOnCommit
以Insert为入口进行代码跟踪,Hudi源码里有java客户端的代码示例,这里只贴部分主要代码
writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
String newCommitTime = writeClient.startCommit();
writeClient.insert(records, newCommitTime);
HoodieJavaWriteClient.insert
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =getTableAndInitCtx(WriteOperationType.INSERT, instantTime);table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}
在执行完table.insert写完数据后会执行postWrite方法
@Overrideprotected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,String instantTime,HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());}if (result.isCommitted()) {// Perform post commit operations.if (result.getFinalizeDuration().isPresent()) {metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),result.getWriteStats().get().size());}postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());}return result.getWriteStatuses();}
postWrite方法里又会执行父类 AbstractHoodieWriteClient.postCommit
方法
protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {try {// Delete the marker directory for the instant.WriteMarkersFactory.get(config.getMarkersType(), table, instantTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());// We cannot have unbounded commit files. Archive commits if we have to archiveHoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);archiveLog.archiveIfRequired(context);// commit期间执行自动清理autoCleanOnCommit();if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {syncTableMetadata();}} catch (IOException ioe) {throw new HoodieIOException(ioe.getMessage(), ioe);} finally {this.heartbeatClient.stop(instantTime);}}
postCommit方法里会调用autoCleanOnCommit()执行清理文件
AbstractHoodieWriteClient.autoCleanOnCommit
autoCleanOnCommit->clean->scheduleTableServiceInternal->HoodieJavaCopyOnWriteTable.clean
首先调用scheduleTableServiceInternal
,该方法会根据清理策略配置参数获取最早的需要保留的instant(earliestInstant
),然后获取需要清理的分区路径列表(partitionsToClean
),再根据分区路径获取需要删除的文件列表,最后将这些信息封装成HoodieCleanerPlan
序列化到新创建的
.clean.requested文件中
再执行HoodieJavaCopyOnWriteTable.clean
,该方法首先获取刚才创建的.clean.requested
文件和其他的之前失败的(如果有的话).clean.inflight
,然后反序列化刚才保存的.clean.requested的文件内容为HoodieCleanerPlan
,然后通过deleteFilesFunc
方法依次删除HoodieCleanerPlan
里的要删除的文件列表并返回HoodieCleanStat
,最后将HoodieCleanStat
作为参数构建HoodieCleanMetadata
,然后将HoodieCleanMetadata
序列化保存到新创建的.clean
文件中,这样整个clean操作就基本完成了。
如何根据清理策略获取要被清理的文件列表,请看后面的部分:获取要删除的文件列表
/*** Handle auto clean during commit.**/protected void autoCleanOnCommit() {if (config.isAutoClean()) { // 默认true// Call clean to cleanup if there is anything to cleanup after the commit,if (config.isAsyncClean()) { // 默认falseLOG.info("Cleaner has been spawned already. Waiting for it to finish");AsyncCleanerService.waitForCompletion(asyncCleanerService);LOG.info("Cleaner has finished");} else {// Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.LOG.info("Auto cleaning is enabled. Running cleaner now");// 执行clean操作clean();}}}public HoodieCleanMetadata clean() {return clean(HoodieActiveTimeline.createNewInstantTime());}public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {return clean(cleanInstantTime, true);}
前面的只是调用链,下面才到了真正的逻辑
/*** Clean up any stale/old files/data lying around (either on file storage or index storage) based on the* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be* cleaned). This API provides the flexibility to schedule clean instant asynchronously via* {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling* of clean.*/public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {if (scheduleInline) {// 主要逻辑为:创建.clean.requested// .clean.requested内容为序列化后的(包含了要删除的文件列表等信息的)cleanerPlanscheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);}LOG.info("Cleaner started");final Timer.Context timerContext = metrics.getCleanCtx();LOG.info("Cleaned failed attempts if any");// 判断是否执行rollback,默认策略为EAGER,clean期间不执行rollbackCleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());// 执行table.clean,删除需要删除的文件,转化.clean.requested=>.clean.inflight=>.clean,返回HoodieCleanMetadata// 这里为HoodieJavaCopyOnWriteTable.cleanHoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);if (timerContext != null && metadata != null) {long durationMs = metrics.getDurationInMs(timerContext.stop());metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()+ " cleanerElapsedMs" + durationMs);}// 返回metadatareturn metadata;}
scheduleTableServiceInternal
private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,TableServiceType tableServiceType) {switch (tableServiceType) {case CLUSTER:LOG.info("Scheduling clustering at instant time :" + instantTime);Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf).scheduleClustering(context, instantTime, extraMetadata);return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();case COMPACT:LOG.info("Scheduling compaction at instant time :" + instantTime);Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf).scheduleCompaction(context, instantTime, extraMetadata);return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();case CLEAN:LOG.info("Scheduling cleaning at instant time :" + instantTime);// 这里在子类`HoodieJavaCopyOnWriteTable.scheduleCleaning`实现Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf).scheduleCleaning(context, instantTime, extraMetadata);return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();default:throw new IllegalArgumentException("Invalid TableService " + tableServiceType);}}
HoodieJavaCopyOnWriteTable.scheduleCleaning
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();}public Option<HoodieCleanerPlan> execute() {// Plan a new clean actionreturn requestClean(instantTime);}
/*** Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.* // 如果有需要被清理的文件,创建一个cleanerPlan,并且将它们保存到instant文件中* Cleaner Plan contains absolute file paths.* cleanerPlan 包含文件的绝对路径** @param startCleanTime Cleaner Instant Time* @return Cleaner Plan if generated*/protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {// cleanerPlan包含需要被清理的文件列表final HoodieCleanerPlan cleanerPlan = requestClean(context);if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)&& !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {// 如果要删除的文件列表不为空// Only create cleaner plan which does some work// 创建.clean.requestedfinal HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);// Save to both aux and timeline foldertry {// 保存.clean.requested,.clean.requested文件里包含了序列化的cleanerPlan,也就包含了文件列表等信息// 后面删除文件时会用到table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));LOG.info("Requesting Cleaning with instant time " + cleanInstant);} catch (IOException e) {LOG.error("Got exception when saving cleaner requested file", e);throw new HoodieIOException(e.getMessage(), e);}// 返回cleanerPlanreturn Option.of(cleanerPlan);}// 返回空return Option.empty();}
/*** Generates List of files to be cleaned.* 生成需要被清理的文件列表** @param context HoodieEngineContext* @return Cleaner Plan*/HoodieCleanerPlan requestClean(HoodieEngineContext context) {try {CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);// 获取最早需要保留的HoodieInstantOption<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();// 获取需要被清理的分区路径List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);if (partitionsToClean.isEmpty()) {// 如果分区路径为空,直接返回一个空的HoodieCleanerPlanLOG.info("Nothing to clean here. It is already clean");return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();}LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());// 清理的并发度int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());LOG.info("Using cleanerParallelism: " + cleanerParallelism);context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");// Map<分区路径,要删除的文件列表>, 真正的实现是在planner.getDeletePathsMap<String, List<HoodieCleanFileInfo>> cleanOps = context.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));// 构造HoodieCleanerPlan并返回,参数分别:// earliestInstantToRetain = 根据earliestInstant生成的HoodieActionInstant// policy = config.getCleanerPolicy().name()// filesToBeDeletedPerPartition = CollectionUtils.createImmutableMap() 一个空的只读的map// version = 2// filePathsToBeDeletedPerPartition = cleanOps,即上面我们获取的要删除的文件列表return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);} catch (IOException e) {throw new HoodieIOException("Failed to schedule clean operation", e);}}
HoodieJavaCopyOnWriteTable.clean
public HoodieCleanMetadata clean(HoodieEngineContext context,String cleanInstantTime) {return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute();}
BaseCleanActionExecutor.execute
public HoodieCleanMetadata execute() {List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();// If there are inflight(failed) or previously requested clean operation, first perform them// 获取状态为inflight或者requested的clean instant// 因为我们前面创建了.clean.requested所以首先包含前面创建的.requested// 如果还有其他的.clean.inflight文件,这表明是之前失败的操作,也需要执行cleanList<HoodieInstant> pendingCleanInstants = table.getCleanTimeline().filterInflightsAndRequested().getInstants().collect(Collectors.toList());if (pendingCleanInstants.size() > 0) {pendingCleanInstants.forEach(hoodieInstant -> {LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);try {cleanMetadataList.add(runPendingClean(table, hoodieInstant));} catch (Exception e) {LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);}});table.getMetaClient().reloadActiveTimeline();}// return the last clean metadata for now// 返回最后一个cleanMetadata// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services// This requires the BaseCleanActionExecutor to be refactored as BaseCommitActionExecutorreturn cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null;}/*** Executes the Cleaner plan stored in the instant metadata.*/HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {try {// 将.clean.requested或者.clean.inflight反序列为cleanerPlanHoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);} catch (IOException e) {throw new HoodieIOException(e.getMessage(), e);}}
private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)|| cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));try {final HoodieInstant inflightInstant;final HoodieTimer timer = new HoodieTimer();timer.startTimer();if (cleanInstant.isRequested()) {// 如果是.clean.requested,转化为.clean.inflightinflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));} else {inflightInstant = cleanInstant;}// 执行clean方法,主要是删除文件,返回HoodieCleanStat列表// 具体在实现类,这里是 JavaCleanActionExecutorList<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);if (cleanStats.isEmpty()) {return HoodieCleanMetadata.newBuilder().build();}table.getMetaClient().reloadActiveTimeline();// 构建HoodieCleanMetadataHoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(inflightInstant.getTimestamp(),Option.of(timer.endTimer()),cleanStats);// 生成.clean,并将metadata序列化到.cleantable.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,TimelineMetadataUtils.serializeCleanMetadata(metadata));LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");return metadata;} catch (IOException e) {throw new HoodieIOException("Failed to clean up after commit", e);}}
JavaCleanActionExecutor.clean
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {// 需要被删除的文件列表Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream().flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();// 通过deleteFilesFunc函数执行删除文件操作,返回PartitionCleanStatStream<Pair<String, PartitionCleanStat>> partitionCleanStats =deleteFilesFunc(filesToBeDeletedPerPartition, table).collect(Collectors.groupingBy(Pair::getLeft)).entrySet().stream().map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));// Return PartitionCleanStat for each partition passed.return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)? partitionCleanStatsMap.get(partitionPath): new PartitionCleanStat(partitionPath);HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath).withEarliestCommitRetained(Option.ofNullable(actionInstant != null? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),actionInstant.getAction(), actionInstant.getTimestamp()): null)).withDeletePathPattern(partitionCleanStat.deletePathPatterns()).withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()).withFailedDeletes(partitionCleanStat.failedDeleteFiles()).withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()).withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()).withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()).build();}).collect(Collectors.toList());}private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) {Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();FileSystem fs = table.getMetaClient().getFs();while (iter.hasNext()) {Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();String partitionPath = partitionDelFileTuple.getLeft();Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());String deletePathStr = deletePath.toString();Boolean deletedFileResult = null;try {// 删除文件返回是否删除成功deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);} catch (IOException e) {LOG.error("Delete file failed");}if (!partitionCleanStatMap.containsKey(partitionPath)) {partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));}boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);if (isBootstrapBasePathFile) {// For Bootstrap Base file deletions, store the full file path.partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);} else {partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);}}return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));}protected static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {Path deletePath = new Path(deletePathStr);LOG.debug("Working on delete path :" + deletePath);try {boolean deleteResult = fs.delete(deletePath, false);if (deleteResult) {LOG.debug("Cleaned file at path :" + deletePath);}return deleteResult;} catch (FileNotFoundException fio) {// With cleanPlan being used for retried cleaning operations, its possible to clean a file twicereturn false;}}
获取要删除的文件列表
这里和策略配置参数有关,并且逻辑相对复杂一点,就先贴一下入口的代码,先不深入,以后单独总结
返回AbstractHoodieWriteClient.autoCleanOnCommit
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);planner.getDeletePaths(partitionPathToClean)
/*** Returns earliest commit to retain based on cleaning policy.* 根据清理策略返回最早的需要保留的commit*/public Option<HoodieInstant> getEarliestCommitToRetain() {Option<HoodieInstant> earliestCommitToRetain = Option.empty();int commitsRetained = config.getCleanerCommitsRetained();if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS&& commitTimeline.countInstants() > commitsRetained) {earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);}return earliestCommitToRetain;}
/*** Returns list of partitions where clean operations needs to be performed.* 返回清理操作需要执行的分区列表** @param earliestRetainedInstant New instant to be retained after this cleanup operation* @return list of partitions to scan for cleaning* @throws IOException when underlying file-system throws this exception*/public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {switch (config.getCleanerPolicy()) {case KEEP_LATEST_COMMITS:return getPartitionPathsForCleanByCommits(earliestRetainedInstant);case KEEP_LATEST_FILE_VERSIONS:return getPartitionPathsForFullCleaning();default:throw new IllegalStateException("Unknown Cleaner Policy");}}
/*** Returns files to be cleaned for the given partitionPath based on cleaning policy.* 基于清理策略根据给出的分区路径返回需要被清理的文件列表*/public List<CleanFileInfo> getDeletePaths(String partitionPath) {HoodieCleaningPolicy policy = config.getCleanerPolicy();List<CleanFileInfo> deletePaths;if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);} else {throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());}LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);return deletePaths;}
注释代码
github
gitee
Hudi Clean 清理文件实现分析相关推荐
- Hudi Clean Policy 清理策略实现分析
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 总结Hudi Clean ...
- 如何手动生成Dump文件并分析Dump文件
大家都知道,当服务器出现蓝屏问题时,我们需要获取系统所产生的DUMP文件进行分析,如何确保在系统问题发生时,可以正确的生成所需要的DUMP文件呢?我们需要做如下检查: 1).右键点击"我的电 ...
- 对WoW Shader文件的分析
Wow的渲染引擎是同时支持固定渲染管线渲染和Shader渲染管线渲染的. bls文件是wow的shader文件,分析它的实现可以学习引擎是怎样渲染的,以及如何做一个兼容固定管线和Shader管线的引擎 ...
- JavaCore/HeapDump文件及其分析方法
产生时间 Java程序运行时,有时会产生JavaCore及HeapDump文件,它一般发生于Java程序遇到致命问题的情况下. 有时致命问题发生后,Java应用不会死掉,还能继续运行: 但有时致命问题 ...
- Android中对Log日志文件的分析
Android中对Log日志文件的分析 如何分析和研究Log文件 ,如何看日志信息 Log 在android中的地位非常重要,要是作为一个android程序员不能过分析log这关,算是android没 ...
- s3c2440启动文件详细分析
启动文件就是引导ARM启动,并进入我们熟悉的C语言程序.它主要完成了ARM最基本的硬件初始化工作.虽然启动文件的内容大同小异(就是设置系统时钟.内存.中断向量表.栈等内容),而且只要有一个现成的启动文 ...
- 分析脚本文件AndroidInitProcess分析心得(1)
本篇文章是一篇关于分析脚本文件的帖子 众所皆知,Android Init process是Android动启后先最起来的进程. 真正说来Android Init process是由Linux ...
- 嵌入式linux文件系统启动,嵌入式Linux之文件系统启动分析【原创】
this.p={ m:2, b:2, loftPermalink:'', id:'fks_094068082086089066084084095095080087080066082082083075' ...
- ADS中startup.s文件启动分析
映像文件分析,ADS 中startup.s 文件启动分析,学嵌入式开发ADS 必看 2010-04-17 10:21 声明: 我也是转来的,不是原创,由于别人是网易的日志,不能直接转,所以-- 感谢原 ...
最新文章
- Java后端进行经纬度点抽稀聚合,HTML呈现及前端聚合实现点聚合~
- 再见,Navicat!同事安利的这个IDEA的兄弟,真香!
- Servlet简介与Servlet和HttpServlet运行的流程
- BZOJ1951: [Sdoi2010]古代猪文
- 计算机制图 教学大纲,《计算机制图》课程教学大纲.doc
- java字符串根据空格截取并存进list,并在每个元素前后加上/
- poxtfix+dovecot+saslauthd+courier-authlib +mysql + extmail 完整虚拟邮箱系统部署
- button的图文设置
- 201571030128/201571030118《小学四则运算练习软件软件需求说明》结对项目报告
- (2021) 18 [代码讲解] 可执行文件
- 大数据之统计学基础(一)
- 一位资深php程序员在北京的面试30个题目
- 国密算法c语言实现,求 国密sm2 算法 第四部分 公钥加密算法 c语言实现代码,该怎么解决...
- 世界杯开赛看球吃麻辣小龙虾
- LWIP协议栈[I/drv.emac] RxCpltCallback err = -3错误解决办法
- Redis(一)数据结构解析
- 最全面的外包公司的解释
- flowable6.4 并行网关 驳回 跳转 回退 多实例加签减签
- js实现3D旋转相册
- vip混合测试v号打卡好的卡仕达看哈看收到货卡仕达库哈斯