前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

源码层面总结分析Hudi Clean是如何实现的,不了解Hudi Clean的可以先看这篇:一文彻底理解Apache Hudi的清理服务。
Hudi Clean主要是清理删除不需要的历史文件,可以根据实际业务需要配置参数,不能影响查询,比如某个查询语句正在用某个文件,Clean如果删除了这个文件,查询就会报错。
这里只是删除历史文件,Hudi的文件是有多个版本的,不管配置什么参数,使用什么策略,都不会删除当前最新版本的文件。
Hudi 0.9.0版本有两种清理策略KEEP_LATEST_COMMITSKEEP_LATEST_FILE_VERSIONS,默认为KEEP_LATEST_COMMITS
KEEP_LATEST_COMMITS:简单讲就是根据commit的次数,默认保留最新的10个commit的所有文件,对于10个之前的文件只保留最新版本的文件,历史文件全部删除
KEEP_LATEST_FILE_VERSIONS:简单讲就是保留文件的版本数,默认保留三个版本
具体的可以看上面的那篇公众号文章

目前最新版本0.11.0 添加了一个新的策略KEEP_LATEST_BY_HOURS:根据小时数清理,默认保留最近24小时的文件,具体实现请查看PR:[HUDI-349] Added new cleaning policy based on number of hours

本文以Hudi 0.9.0 Java Client COW表 进行分析

Insert

HoodieJavaWriteClient->postWrite->postCommit->autoCleanOnCommit

以Insert为入口进行代码跟踪,Hudi源码里有java客户端的代码示例,这里只贴部分主要代码

writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
String newCommitTime = writeClient.startCommit();
writeClient.insert(records, newCommitTime);

HoodieJavaWriteClient.insert

  public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =getTableAndInitCtx(WriteOperationType.INSERT, instantTime);table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}

在执行完table.insert写完数据后会执行postWrite方法

  @Overrideprotected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,String instantTime,HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());}if (result.isCommitted()) {// Perform post commit operations.if (result.getFinalizeDuration().isPresent()) {metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),result.getWriteStats().get().size());}postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());}return result.getWriteStatuses();}

postWrite方法里又会执行父类 AbstractHoodieWriteClient.postCommit方法

  protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {try {// Delete the marker directory for the instant.WriteMarkersFactory.get(config.getMarkersType(), table, instantTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());// We cannot have unbounded commit files. Archive commits if we have to archiveHoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);archiveLog.archiveIfRequired(context);// commit期间执行自动清理autoCleanOnCommit();if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {syncTableMetadata();}} catch (IOException ioe) {throw new HoodieIOException(ioe.getMessage(), ioe);} finally {this.heartbeatClient.stop(instantTime);}}

postCommit方法里会调用autoCleanOnCommit()执行清理文件

AbstractHoodieWriteClient.autoCleanOnCommit

autoCleanOnCommit->clean->scheduleTableServiceInternal->HoodieJavaCopyOnWriteTable.clean

首先调用scheduleTableServiceInternal,该方法会根据清理策略配置参数获取最早的需要保留的instant(earliestInstant),然后获取需要清理的分区路径列表(partitionsToClean),再根据分区路径获取需要删除的文件列表,最后将这些信息封装成HoodieCleanerPlan序列化到新创建的
.clean.requested文件中
再执行HoodieJavaCopyOnWriteTable.clean,该方法首先获取刚才创建的.clean.requested文件和其他的之前失败的(如果有的话).clean.inflight,然后反序列化刚才保存的.clean.requested的文件内容为HoodieCleanerPlan,然后通过deleteFilesFunc方法依次删除HoodieCleanerPlan里的要删除的文件列表并返回HoodieCleanStat,最后将HoodieCleanStat作为参数构建HoodieCleanMetadata,然后将HoodieCleanMetadata序列化保存到新创建的.clean文件中,这样整个clean操作就基本完成了。

如何根据清理策略获取要被清理的文件列表,请看后面的部分:获取要删除的文件列表

  /*** Handle auto clean during commit.**/protected void autoCleanOnCommit() {if (config.isAutoClean()) { // 默认true// Call clean to cleanup if there is anything to cleanup after the commit,if (config.isAsyncClean()) { // 默认falseLOG.info("Cleaner has been spawned already. Waiting for it to finish");AsyncCleanerService.waitForCompletion(asyncCleanerService);LOG.info("Cleaner has finished");} else {// Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.LOG.info("Auto cleaning is enabled. Running cleaner now");// 执行clean操作clean();}}}public HoodieCleanMetadata clean() {return clean(HoodieActiveTimeline.createNewInstantTime());}public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {return clean(cleanInstantTime, true);}

前面的只是调用链,下面才到了真正的逻辑

  /*** Clean up any stale/old files/data lying around (either on file storage or index storage) based on the* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be* cleaned). This API provides the flexibility to schedule clean instant asynchronously via* {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling* of clean.*/public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {if (scheduleInline) {// 主要逻辑为:创建.clean.requested// .clean.requested内容为序列化后的(包含了要删除的文件列表等信息的)cleanerPlanscheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);}LOG.info("Cleaner started");final Timer.Context timerContext = metrics.getCleanCtx();LOG.info("Cleaned failed attempts if any");// 判断是否执行rollback,默认策略为EAGER,clean期间不执行rollbackCleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());// 执行table.clean,删除需要删除的文件,转化.clean.requested=>.clean.inflight=>.clean,返回HoodieCleanMetadata// 这里为HoodieJavaCopyOnWriteTable.cleanHoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);if (timerContext != null && metadata != null) {long durationMs = metrics.getDurationInMs(timerContext.stop());metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()+ " cleanerElapsedMs" + durationMs);}// 返回metadatareturn metadata;}

scheduleTableServiceInternal

  private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,TableServiceType tableServiceType) {switch (tableServiceType) {case CLUSTER:LOG.info("Scheduling clustering at instant time :" + instantTime);Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf).scheduleClustering(context, instantTime, extraMetadata);return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();case COMPACT:LOG.info("Scheduling compaction at instant time :" + instantTime);Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf).scheduleCompaction(context, instantTime, extraMetadata);return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();case CLEAN:LOG.info("Scheduling cleaning at instant time :" + instantTime);// 这里在子类`HoodieJavaCopyOnWriteTable.scheduleCleaning`实现Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf).scheduleCleaning(context, instantTime, extraMetadata);return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();default:throw new IllegalArgumentException("Invalid TableService " + tableServiceType);}}

HoodieJavaCopyOnWriteTable.scheduleCleaning

  public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();}public Option<HoodieCleanerPlan> execute() {// Plan a new clean actionreturn requestClean(instantTime);}
  /*** Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.* // 如果有需要被清理的文件,创建一个cleanerPlan,并且将它们保存到instant文件中* Cleaner Plan contains absolute file paths.* cleanerPlan 包含文件的绝对路径** @param startCleanTime Cleaner Instant Time* @return Cleaner Plan if generated*/protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {// cleanerPlan包含需要被清理的文件列表final HoodieCleanerPlan cleanerPlan = requestClean(context);if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)&& !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {// 如果要删除的文件列表不为空// Only create cleaner plan which does some work// 创建.clean.requestedfinal HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);// Save to both aux and timeline foldertry {// 保存.clean.requested,.clean.requested文件里包含了序列化的cleanerPlan,也就包含了文件列表等信息// 后面删除文件时会用到table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));LOG.info("Requesting Cleaning with instant time " + cleanInstant);} catch (IOException e) {LOG.error("Got exception when saving cleaner requested file", e);throw new HoodieIOException(e.getMessage(), e);}// 返回cleanerPlanreturn Option.of(cleanerPlan);}// 返回空return Option.empty();}
  /*** Generates List of files to be cleaned.* 生成需要被清理的文件列表** @param context HoodieEngineContext* @return Cleaner Plan*/HoodieCleanerPlan requestClean(HoodieEngineContext context) {try {CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);// 获取最早需要保留的HoodieInstantOption<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();// 获取需要被清理的分区路径List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);if (partitionsToClean.isEmpty()) {// 如果分区路径为空,直接返回一个空的HoodieCleanerPlanLOG.info("Nothing to clean here. It is already clean");return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();}LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());// 清理的并发度int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());LOG.info("Using cleanerParallelism: " + cleanerParallelism);context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");// Map<分区路径,要删除的文件列表>, 真正的实现是在planner.getDeletePathsMap<String, List<HoodieCleanFileInfo>> cleanOps = context.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));// 构造HoodieCleanerPlan并返回,参数分别:// earliestInstantToRetain = 根据earliestInstant生成的HoodieActionInstant// policy = config.getCleanerPolicy().name()// filesToBeDeletedPerPartition = CollectionUtils.createImmutableMap() 一个空的只读的map// version = 2// filePathsToBeDeletedPerPartition = cleanOps,即上面我们获取的要删除的文件列表return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);} catch (IOException e) {throw new HoodieIOException("Failed to schedule clean operation", e);}}

HoodieJavaCopyOnWriteTable.clean

  public HoodieCleanMetadata clean(HoodieEngineContext context,String cleanInstantTime) {return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute();}

BaseCleanActionExecutor.execute

  public HoodieCleanMetadata execute() {List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();// If there are inflight(failed) or previously requested clean operation, first perform them// 获取状态为inflight或者requested的clean instant// 因为我们前面创建了.clean.requested所以首先包含前面创建的.requested// 如果还有其他的.clean.inflight文件,这表明是之前失败的操作,也需要执行cleanList<HoodieInstant> pendingCleanInstants = table.getCleanTimeline().filterInflightsAndRequested().getInstants().collect(Collectors.toList());if (pendingCleanInstants.size() > 0) {pendingCleanInstants.forEach(hoodieInstant -> {LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);try {cleanMetadataList.add(runPendingClean(table, hoodieInstant));} catch (Exception e) {LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);}});table.getMetaClient().reloadActiveTimeline();}// return the last clean metadata for now// 返回最后一个cleanMetadata// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services// This requires the BaseCleanActionExecutor to be refactored as BaseCommitActionExecutorreturn cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null;}/*** Executes the Cleaner plan stored in the instant metadata.*/HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {try {// 将.clean.requested或者.clean.inflight反序列为cleanerPlanHoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);} catch (IOException e) {throw new HoodieIOException(e.getMessage(), e);}}
  private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)|| cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));try {final HoodieInstant inflightInstant;final HoodieTimer timer = new HoodieTimer();timer.startTimer();if (cleanInstant.isRequested()) {// 如果是.clean.requested,转化为.clean.inflightinflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));} else {inflightInstant = cleanInstant;}// 执行clean方法,主要是删除文件,返回HoodieCleanStat列表// 具体在实现类,这里是 JavaCleanActionExecutorList<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);if (cleanStats.isEmpty()) {return HoodieCleanMetadata.newBuilder().build();}table.getMetaClient().reloadActiveTimeline();// 构建HoodieCleanMetadataHoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(inflightInstant.getTimestamp(),Option.of(timer.endTimer()),cleanStats);// 生成.clean,并将metadata序列化到.cleantable.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,TimelineMetadataUtils.serializeCleanMetadata(metadata));LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");return metadata;} catch (IOException e) {throw new HoodieIOException("Failed to clean up after commit", e);}}

JavaCleanActionExecutor.clean

  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {// 需要被删除的文件列表Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream().flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();// 通过deleteFilesFunc函数执行删除文件操作,返回PartitionCleanStatStream<Pair<String, PartitionCleanStat>> partitionCleanStats =deleteFilesFunc(filesToBeDeletedPerPartition, table).collect(Collectors.groupingBy(Pair::getLeft)).entrySet().stream().map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));// Return PartitionCleanStat for each partition passed.return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)? partitionCleanStatsMap.get(partitionPath): new PartitionCleanStat(partitionPath);HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath).withEarliestCommitRetained(Option.ofNullable(actionInstant != null? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),actionInstant.getAction(), actionInstant.getTimestamp()): null)).withDeletePathPattern(partitionCleanStat.deletePathPatterns()).withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()).withFailedDeletes(partitionCleanStat.failedDeleteFiles()).withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()).withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()).withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()).build();}).collect(Collectors.toList());}private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) {Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();FileSystem fs = table.getMetaClient().getFs();while (iter.hasNext()) {Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();String partitionPath = partitionDelFileTuple.getLeft();Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());String deletePathStr = deletePath.toString();Boolean deletedFileResult = null;try {// 删除文件返回是否删除成功deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);} catch (IOException e) {LOG.error("Delete file failed");}if (!partitionCleanStatMap.containsKey(partitionPath)) {partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));}boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);if (isBootstrapBasePathFile) {// For Bootstrap Base file deletions, store the full file path.partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);} else {partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);}}return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));}protected static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {Path deletePath = new Path(deletePathStr);LOG.debug("Working on delete path :" + deletePath);try {boolean deleteResult = fs.delete(deletePath, false);if (deleteResult) {LOG.debug("Cleaned file at path :" + deletePath);}return deleteResult;} catch (FileNotFoundException fio) {// With cleanPlan being used for retried cleaning operations, its possible to clean a file twicereturn false;}}

获取要删除的文件列表

这里和策略配置参数有关,并且逻辑相对复杂一点,就先贴一下入口的代码,先不深入,以后单独总结
返回AbstractHoodieWriteClient.autoCleanOnCommit

      Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);planner.getDeletePaths(partitionPathToClean)
  /*** Returns earliest commit to retain based on cleaning policy.* 根据清理策略返回最早的需要保留的commit*/public Option<HoodieInstant> getEarliestCommitToRetain() {Option<HoodieInstant> earliestCommitToRetain = Option.empty();int commitsRetained = config.getCleanerCommitsRetained();if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS&& commitTimeline.countInstants() > commitsRetained) {earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);}return earliestCommitToRetain;}
  /*** Returns list of partitions where clean operations needs to be performed.* 返回清理操作需要执行的分区列表** @param earliestRetainedInstant New instant to be retained after this cleanup operation* @return list of partitions to scan for cleaning* @throws IOException when underlying file-system throws this exception*/public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {switch (config.getCleanerPolicy()) {case KEEP_LATEST_COMMITS:return getPartitionPathsForCleanByCommits(earliestRetainedInstant);case KEEP_LATEST_FILE_VERSIONS:return getPartitionPathsForFullCleaning();default:throw new IllegalStateException("Unknown Cleaner Policy");}}
  /*** Returns files to be cleaned for the given partitionPath based on cleaning policy.* 基于清理策略根据给出的分区路径返回需要被清理的文件列表*/public List<CleanFileInfo> getDeletePaths(String partitionPath) {HoodieCleaningPolicy policy = config.getCleanerPolicy();List<CleanFileInfo> deletePaths;if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);} else {throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());}LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);return deletePaths;}

注释代码

github
gitee

Hudi Clean 清理文件实现分析相关推荐

  1. Hudi Clean Policy 清理策略实现分析

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 总结Hudi Clean ...

  2. 如何手动生成Dump文件并分析Dump文件

    大家都知道,当服务器出现蓝屏问题时,我们需要获取系统所产生的DUMP文件进行分析,如何确保在系统问题发生时,可以正确的生成所需要的DUMP文件呢?我们需要做如下检查: 1).右键点击"我的电 ...

  3. 对WoW Shader文件的分析

    Wow的渲染引擎是同时支持固定渲染管线渲染和Shader渲染管线渲染的. bls文件是wow的shader文件,分析它的实现可以学习引擎是怎样渲染的,以及如何做一个兼容固定管线和Shader管线的引擎 ...

  4. JavaCore/HeapDump文件及其分析方法

    产生时间 Java程序运行时,有时会产生JavaCore及HeapDump文件,它一般发生于Java程序遇到致命问题的情况下. 有时致命问题发生后,Java应用不会死掉,还能继续运行: 但有时致命问题 ...

  5. Android中对Log日志文件的分析

    Android中对Log日志文件的分析 如何分析和研究Log文件 ,如何看日志信息 Log 在android中的地位非常重要,要是作为一个android程序员不能过分析log这关,算是android没 ...

  6. s3c2440启动文件详细分析

    启动文件就是引导ARM启动,并进入我们熟悉的C语言程序.它主要完成了ARM最基本的硬件初始化工作.虽然启动文件的内容大同小异(就是设置系统时钟.内存.中断向量表.栈等内容),而且只要有一个现成的启动文 ...

  7. 分析脚本文件AndroidInitProcess分析心得(1)

    本篇文章是一篇关于分析脚本文件的帖子     众所皆知,Android Init process是Android动启后先最起来的进程. 真正说来Android Init process是由Linux ...

  8. 嵌入式linux文件系统启动,嵌入式Linux之文件系统启动分析【原创】

    this.p={ m:2, b:2, loftPermalink:'', id:'fks_094068082086089066084084095095080087080066082082083075' ...

  9. ADS中startup.s文件启动分析

    映像文件分析,ADS 中startup.s 文件启动分析,学嵌入式开发ADS 必看 2010-04-17 10:21 声明: 我也是转来的,不是原创,由于别人是网易的日志,不能直接转,所以-- 感谢原 ...

最新文章

  1. Java后端进行经纬度点抽稀聚合,HTML呈现及前端聚合实现点聚合~
  2. 再见,Navicat!同事安利的这个IDEA的兄弟,真香!
  3. Servlet简介与Servlet和HttpServlet运行的流程
  4. BZOJ1951: [Sdoi2010]古代猪文
  5. 计算机制图 教学大纲,《计算机制图》课程教学大纲.doc
  6. java字符串根据空格截取并存进list,并在每个元素前后加上/
  7. poxtfix+dovecot+saslauthd+courier-authlib +mysql + extmail 完整虚拟邮箱系统部署
  8. button的图文设置
  9. 201571030128/201571030118《小学四则运算练习软件软件需求说明》结对项目报告
  10. (2021) 18 [代码讲解] 可执行文件
  11. 大数据之统计学基础(一)
  12. 一位资深php程序员在北京的面试30个题目
  13. 国密算法c语言实现,求 国密sm2 算法 第四部分 公钥加密算法 c语言实现代码,该怎么解决...
  14. 世界杯开赛看球吃麻辣小龙虾
  15. LWIP协议栈[I/drv.emac] RxCpltCallback err = -3错误解决办法
  16. Redis(一)数据结构解析
  17. 最全面的外包公司的解释
  18. flowable6.4 并行网关 驳回 跳转 回退 多实例加签减签
  19. js实现3D旋转相册
  20. vip混合测试v号打卡好的卡仕达看哈看收到货卡仕达库哈斯

热门文章

  1. 手机锂电池规格及充电曲线
  2. 过流保护的几种实现方式
  3. Lowest Commen Ancensor
  4. Flutter listview下拉刷新 上拉加载更多 功能实现
  5. 应届学弟咨询:是去华为拿1万多低薪,还是去二线自研公司拿2万多高薪?.....
  6. 谷粒商城商品服务模块数据库文件关系
  7. 使用cordova将Ext JS 6.2的Modern应用程序打包为安卓APP
  8. IE常见问题解决方案大全
  9. Linux磁盘与文件系统管理
  10. 去中心化应用:区块链技术概述