之前的压缩计划生成后,被保存在basePath/.hoodie下的instanttime.compaction.request文件里。 现在可以继续从执行压缩计划的角度进行。同样使用compactor类来提交spark作业,参数里可以带压缩计划对应的instantTime,也可以不带,不带的话则是找到时间最早对应的压缩计划。本文还是以MOR表,经手动异步压缩作为开始。

执行压缩计划部分,需要提前了解下该过程涉及的封装类,例如

RunCompactionActionExecutor、IndexedRecord、各种handle类等等

下方为执行异步压缩计划的提交命令,并给出了目标instantTime:

通常这个spark会跑2-10分钟就自己停了。该作业跑完并不是表示真正完成了压缩,而是把压缩作为一个特殊的commit,继续按照顺序处理各种commit。以0.10.0版本来看,这里触发的压缩过程和正常写入hudi是独占的,即同一时间只能进行写入或压缩。

省掉前面一部分调用,代码从SparkRDDWriteClient.java 的compact方法开始。

HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);pendingCompactionTimeline.containsInstant(inflightInstant)

就是确保给的instantTime,对应其inflight的instant不能也出现在pending状态的hoodieInstant,否则就要回滚这个inflight的instant。简单来说就是不允许该时间的compaction类型的hoodieInstant,又是pending又是inflight,避免逻辑上矛盾。

这步同时也获得了table。于是继续table.compact(context, compactionInstantTime);

    即用HoodieEngineContext、HoodieWriteConfig、table、compactionInstantTime、HoodieSparkMergeOnReadTableCompactor、HoodieSparkCopyOnWriteTable这些变量构造了一个RunCompactionActionExecutor对象。

HoodieEngineContext 由HoodieSparkEngineContext实现,其包括

JavaSparkContext和SQLContext,最后转为spark作业dag才会用到;
HoodieWriteConfig就是用户给定的各种参数,包括压缩策略名等等配置。
table就是HoodieSparkMergeOnReadTable,也就是前面就已经从HoodieWriteConfig和HoodieEngineContext里得到的table

compactionInstantTime是String类型的instantTime,也就是执行压缩时用户指定的(现有最早的可压缩计划)

HoodieSparkMergeOnReadTableCompactor暂时先不讲,他负责preCompact(确保目标instantTime对应的hoodieInstant被pendingCompactionInstant包含着,也就是被先前生成的压缩计划里所包含)和maybePersist(设置RDDstorage level)。
HoodieSparkCopyOnWriteTable一个根据相同HoodieWriteConfig和HoodieEngineContext构建的cow表。

并调用该对象的execute()方法

先经过上方讲过的preCompact过程,即把现存的pending状态的compaction拿出来,然后检查本次压缩时给定的instantTime在不在其中,不在其中就会抛出异常。

然后再是CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime),目的就是从输入的instantTime对应的istantTime.compaction.requested文件里获取压缩计划(计划怎么写入该文件可参考上一篇)。并将压缩计划和新版本号写入元数据中。

再进一步执行compactor.compact( context, compactionPlan, table, config, instantTime, compactionHandler); 注意这里的compactionHandler就是前面创建的HoodieSparkCopyOnWriteTable对象。

新的instant就是instantTime.compaction.requested,instantTime还是一开始给的那个目标时间。

  然后把这个requested的instant,经过transitionState方法转为inflight的instant,即开始执行压缩计划。

下方try部分,就是从一开始给定的schema里(已经在HoodieTableMetaClient)创建新的schema,注意这里指定了不添加新的元数据字段。如果添加那schema就多5个字段,包括主键(_hoodie_record_key)、分区(_hoodie_partition_path)等字段。

这里是把上方拿到的压缩计划,取出其operations(operation包含什么信息请看上篇)转成流,就能进行map操作。map里的逻辑是把每个operation从HoodieCompactionOperation转为CompactionOperation并保存到List中(前者是avro格式,其他详细的后面再区分)。

context.parallelize(operations).map(operation -> compact(compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)).flatMap(List::iterator);

context.parallelize(operations)就是把operation转为spark的JavaRDD(不准确,本身还是HoodieData)。之后对每个RDD都单独调用一次中间的compact( compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)

到此,compactionHandler即上方的cow表、HoodieTableMetaClient、writeConfig、压缩计划operation、目标instantTime以及SparkTaskContextSupplier都参与进一步的针对每个operation,如下所示:

先给上一步拿到的schema增加提到的5个元数据字段,是否额外添加第6个字段operation,则要看用户配置的writeConfig中是否明确相关参数为true,否则还是默认的只加5个字段。

maxInstantTime是在所有active的timeline里拿出deltacommit commit rollback这三种action且timeline对应的instant为completed状态的timeline,并且取其最新(最大的时间值)。

maxMemoryPerCompaction是从writeConfig里读取到的参数,表示压缩可用最大的内存,超出这个阈值的则会被暂存到本地磁盘上。

logFiles说的就是FileSlice里哪些avro的*.log*文件。这里他从CompactionOperation的deltaFileNames里获得。那么这个daltaFileNames当初怎么初始化的值呢?其实同样也是从logFiles里折腾出来的:

回到daltaFileNames,他经过给自己加上bastPath(含表名)和分区目录后,也就是重新回到了logFile。

这里的scanner是比较麻烦的部分,包含了很多用户指定(或default)参数,以及提到的logfile、带meta字段的新schema等等。下方会用到一个scanner.getRecords()

this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
ExternalSpillableMap类按照其类最上方的解释来看,是为了防止内存满时,以kv形式存储数据,同时也存储了Key-ValueMetadata即保存在磁盘上的位置。这一点还需要进一步确认

其参数分别是

hoodie.memory.compaction.max.size、
hoodie.memory.spillable.map.path、也就是baseFilepath(待补全)
DefaultSizeEstimator用来评估key大小、
HoodieRecordSizeEstimator(readerSchema)用来评估value大小、

diskMapType(分为BITCASK或ROCKS_DB)、

hoodie.common.diskmap.compression.enabled默认是true
    oldDataFileOpt暂时理解为basefile,是否为空取决于是只有logfile还是logfile和basefile都存在。前者情况下oldDataFileOpt为空,后者则为basefile。(因为oldDataFileOpt也是来在option中的bootstrapFilePath,所以他跟basefile一样可能有也可能不存在)

所以这里走else的逻辑,即handleInsert()。下方继续构建HoodieCreateHandle实例,注意

taskContextSupplier就是上方构建过的,recordMap就是上面提到的map类型的数据。

write方法如下:

hoodieTable.requireSortedRecords()首先是false,因为当前basefile都是.parquet文件,只有当为.hfile时才为true。

再把上方说的map形式的recordMap,其key的集合搞Iterator<String>,通过该iterator来循环处理所有map中的key对应的value。具体value也就是代码中的"record",HoodieRecord类型。

关于useWriterSchema,上方new这个HoodieCreateHandle对象时,已经把useWriterSchema写死了。

所以继续 write(record, record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()))。注意后续一系列方法调用中都是这两个参数,均来自value:"record"这个值。

其中,record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())

tableSchemaWithMetaFields变量就是之前写入writeConfig的schema并且是有5个元数据字段的schema。config.getProps()就是所有writeConfig的配置。

后续会走processNotMatchedRecord来处理这些消息,把byte消息转为avro,再结合schema生成GenericRecord以及后面的SqlTypedRecord,省掉getEvaluator等逻辑,最后会把这些消息的结果返回(avro的IndexedRecord类型,可以理解为avro包下实现了一种对数据的封装,可以set和get其中的值)。

再回到这里的write方法延申,注意preserveHoodieMetadata参数为false,创建HoodieCreateHandle对象时依然保持其为false。

    IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());大致是把老数据重新写入一个新schema里,依旧是IndexedRecord 。(待确认:过程中writeSchemaWithMetaFields 这个schema是怎么生成的?)fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);这个就是把参数里的recordWithMetadataInSchema调用org.apache.parquet.hadoop的write方法去写入。写parquet文件的同时再把对应的消息的key更新到布隆过滤器中(顺便也判断下是否再插入后修改布隆过滤器的最大最小边界为刚才的key)

record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));

给本条消息确认出他归属的位置(文件名中的instant和Fileid)并保存。

到此insert场景大致流程已经有了,下方继续update分支

回到 HoodieCompactor.java此处

新创建HoodieMergeHandle对象,其中
keyToNewRecords和insert过程一样,scanner.getRecords()返回的一个ExternalSpillableMap
oldDataFile是现存的baseFile

config.populateMetaFields()这个值默认为true,于是keyGeneratorOpt暂时为空。

requireSortedRecords还是false,因为是parquet不是hfile。

再创建对象:

new HoodieMergeHandle(config, instantTime, this, keyToNewRecords, partitionPath, fileId,dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);

注意dataFileToBeMerged就是oldDataFile。

重点说下init方法

baseFileToMerge为dataFileToBeMerged 现存的basefile。

String latestValidFilePath = baseFileToMerge.getFileName();获得baseFile名且包含了完整的路径。
String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());

大致就是根据提供的instantTIme等因素,拼凑出一个.parquet文件的名字例如

/user/ocdp/test/hudi/表名/time_15min=20220329134500/5eaef46b-f581-41a2-8012-c66f608b3070-0_8-514-118306_20220329133607800.parquet
上方就先后获取了老的和新baseFile的完整名字,再进行
makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);

也就是为HoodieMergeHandle的两个path oldFilePath、newFilePath附上各自处理后的值。也就是新老各自的basePath+分区名+xxx.parquet。

到此init方法结束,继续把包括空的keyGeneratorOpt等变量都继续赋给HoodieMergeHandle对象upsertHandle。再回到handleUpdateInternal(upsertHandle, instantTime, fileId)

因为upsertHandle.getOldFilePath()是上一步刚获得的oldFilePath,正常情况下肯定不为空。

所以走分支:SparkMergeHelper.newInstance().runMerge(this, upsertHandle)

(未完待续)

Apache hudi 0.10.1学习笔记之压缩Compaction(下篇)——执行压缩计划相关推荐

  1. 【C#8.0 and .NET Core 3.0 高级编程学习笔记】

    @C#8.0 and .NET Core 3.0 高级编程学习笔记 前言 为了能精细地完成对C#语言的学习,我决定选择一本书,精读它,理解它,记录它.我想选择什么书并不是最重要的,最重要的是持之以恒的 ...

  2. Apache Hudi 0.7.0 和 0.8.0 新功能已在 Amazon EMR 中可用

    文末限时福利倒计时3天,不要错过! 前言 Apache Hudi 是一个开源事务性数据湖框架,通过提供记录级插入.更新和删除功能,极大地简化了增量数据处理和数据管道开发.如果您要在 Amazon Si ...

  3. 2017-2-15从0开始前端学习笔记(HTML)-图片-表格-表单

    2017-2-15从0开始前端学习笔记-图片-表格-表单 标签 图片 图片<img src="#" alt="文本说明 不能加载图片时显示" title= ...

  4. ISUP5.0语音对讲学习笔记

    ISUP5.0语音对讲学习笔记 文章目录 ISUP5.0语音对讲学习笔记 准备阶段 CMS初始化 SMS初始化 CMS监听 CMS注册事件回调 SMS语音对讲监听 工作阶段 发送对讲命令 发送推流命令 ...

  5. 英宝通Unity4.0公开课学习笔记Vol.0

    英宝通Unity4.0公开课学习笔记 公开课地址 学习笔记目录 公开课地址 公开课地址:游戏蛮牛网. 源码素材:游戏蛮牛网. 公开课在官网上刷新不出来,只能去B站上找资源了.视频链接:哔哩哔哩. Un ...

  6. Apache Hudi 0.8.0 版本发布,Flink 集成有重大提升以及支持并行写

    4月初,Apache Hudi 发布了 0.8 版本,这个版本供解决了 97 个 ISSUES,下面简单介绍一下这个版本的迁移以及重要特性. 迁移指南 •如果从 0.5.3 以下版本迁移,请检查这个版 ...

  7. ET6.0服务器框架学习笔记(二、一条登录协议)

    ET6.0服务器框架学习笔记(二.一条登录协议) 上一篇主要记录ET6.0的服务器启动功能,本篇主要记录ET6.0完整的一条协议,从配置到生成协议数据,到从客户端发送给服务端,再发送回客户端的流程 文 ...

  8. Linux内存从0到1学习笔记(4,TLB)

    一.TLB简介 Kernel初始化的时候,会在初始化内存中创建页表:而处理器读取指令和数据的时候需要首先通过MMU查表得到物理地址,然后在访问物理地址读取指令或数据.MMU查表过程汇中需要4次访问内存 ...

  9. Linux学习笔记(七):文件压缩、打包与备份

    Linux学习笔记(七):文件压缩.打包与备份 常见的压缩指令 gzip, zcat/zmore/zless/zgrep bzip2, bzcat/bzmore/bzless/bzgrep xz, x ...

最新文章

  1. oracle怎么查别的库,ORACLE_SID、实例名和数据库名的区别及查看方法
  2. 第二讲 无穷级数的性质
  3. 口令加密算法 - Java加密与安全
  4. python画二次函数图像的顶点_画二次函数图像的步骤
  5. oracle数据库全数据库名,Oracle数据库中的 数据库域名、数据库名、全局数据库名、SID、数据库实例名、服务名 解释...
  6. php 删除文件夹及文件夹,php删除一个路径下的所有文件夹和文件的方法
  7. MySQL工作笔记-检索出某一时间段中的数据,并更新
  8. C#使用正则表达式检测数字 char 和韩文
  9. Linux,vi编辑器使用手册
  10. 单片机p2.0引脚c语言,单片机p2.0?
  11. ros的插件库 pluginlib 的简介
  12. java.lang.UnsupportedOperationException WebView is not allowed in privileged processes
  13. PreferenceActivity
  14. 高校社团管理系统的设计与开发学习论文
  15. python怎么安装lxml库_lxml解析库的安装和使用
  16. 矿产资源勘查评价的新进展——GIS在矿产资源评价中的应用
  17. 实时展示摄像头内容(go server + electron-vue client)
  18. ARC093F Dark Horse 容斥原理+DP
  19. 【开学季】30款高质量的自学网站,总有一款适合你
  20. 亚马逊风控从哪些方面检测的?

热门文章

  1. 金蝶KIS,存货核算出库核算报错
  2. 解决:Error: Cannot retrieve metalink for repository: epel. Please verify its path and try again
  3. 注意力机制(Attention)最新综述论文及相关源码
  4. 一棵高度为h的满m叉树,根节点所在的层次为第1层。若按层次自顶向下,同一层自左向右,顺序从1开始对全部结点进行编号,则结点i的第1个子结点编号j为
  5. 利用uni-app 开发的iOS app 发布到App Store全流程
  6. MySQL删除外键、增加外键及删除主键、增加主键
  7. 搭建手游联运平台都需要具备什么功能呢?
  8. 微信小程序即时聊天对话窗口静态源码
  9. 如何确保API 的稳定性与正确性?你只需要这一招
  10. halcon学习笔记(一)毛边检测 仿射变换+标准区域登陆检测内外边缘毛边