hudi介绍

Hudi将带来流式处理大数据, 提供新数据集,同时比传统批处理效率高一个数据量级。

特性

(1)快速upsert,可插入索引
(2)以原子方式操作数据并具有回滚功能
(3)写入器之间的快照隔离
(4)savepoint用户数据恢复的保存点
(5)管理文件大小,使用统计数据布局
(6)数据行的异步压缩和柱状数据
(7)时间轴数据跟踪血统

通过Spark-shell快速开始

Spark-shell启动

spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是2.4.5。

[root@hadoop102 hudi]# spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /opt/module/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.0-SNAPSHOT.jar

设置表名

设置表名,基本路径和数据生成器

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cowscala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cowscala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@5cdd5ff9

插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表

scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
scala> df.write.format("hudi").|   options(getQuickstartWriteConfigs).|   option(PRECOMBINE_FIELD_OPT_KEY, "ts").|   option(RECORDKEY_FIELD_OPT_KEY, "uuid").|   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").|   option(TABLE_NAME, tableName).|   mode(Overwrite).|   save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

[root@hadoop102 ~]# cd /tmp/hudi_trips_cow/
[root@hadoop102 hudi_trips_cow]# ls
americas  asia

查询数据

scala> val tripsSnapshotDF = spark.|   read.|   format("hudi").|   load(basePath + "/*/*/*/*")
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+---+
|              fare|          begin_lon|          begin_lat| ts|
+------------------+-------------------+-------------------+---+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|0.0|
+------------------+-------------------+-------------------+---+
scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
|     20200701105144|6007a624-d942-4e0...|  americas/united_s...|rider-213|driver-213| 64.27696295884016|
|     20200701105144|db7c6361-3f05-48d...|  americas/united_s...|rider-213|driver-213| 33.92216483948643|
|     20200701105144|dfd0e7d9-f10c-468...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
|     20200701105144|e36365c8-5b3a-415...|  americas/united_s...|rider-213|driver-213| 27.79478688582596|
|     20200701105144|fb92c00e-dea2-48e...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
|     20200701105144|98be3080-a058-47d...|  americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
|     20200701105144|3dd6ef72-4196-469...|  americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
|     20200701105144|20f9463f-1c14-4e6...|  americas/brazil/s...|rider-213|driver-213|34.158284716382845|
|     20200701105144|1585ad3a-11c9-43c...|    asia/india/chennai|rider-213|driver-213|17.851135255091155|
|     20200701105144|d40daa90-cf1a-4d1...|    asia/india/chennai|rider-213|driver-213| 41.06290929046368|
+-------------------+--------------------+----------------------+---------+----------+------------------+

由于测试数据分区是 区域/国家/城市,所以load(basePath “////”)

修改数据

类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中

scala> val updates = convertToStringList(dataGen.generateUpdates(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
scala> df.write.format("hudi").|   options(getQuickstartWriteConfigs).|   option(PRECOMBINE_FIELD_OPT_KEY, "ts").|   option(RECORDKEY_FIELD_OPT_KEY, "uuid").|   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").|   option(TABLE_NAME, tableName).|   mode(Append).|   save(basePath)

增量查询

Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。

scala> spark.|   read.|   format("hudi").|   load(basePath + "/*/*/*/*").|   createOrReplaceTempView("hudi_trips_snapshot")
scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
scala> val beginTime = commits(commits.length - 2)
beginTime: String = 20200701105144
scala> val tripsIncrementalDF = spark.read.format("hudi").|   option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).|   option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).|   load(basePath)
scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

时间点查询

scala> val beginTime = "000"
beginTime: String = 000scala> val endTime = commits(commits.length - 2)
endTime: String = 20200701105144
scala> val tripsPointInTimeDF = spark.read.format("hudi").|   option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).|   option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).|   option(END_INSTANTTIME_OPT_KEY, endTime).|   load(basePath)
scala> tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

删除数据

scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res12: Long = 10
scala> val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
scala> val deletes = dataGen.generateDeletes(ds.collectAsList())
scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
scala> df.write.format("hudi").|   options(getQuickstartWriteConfigs).|   option(OPERATION_OPT_KEY,"delete").|   option(PRECOMBINE_FIELD_OPT_KEY, "ts").|   option(RECORDKEY_FIELD_OPT_KEY, "uuid").|   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").|   option(TABLE_NAME, tableName).|   mode(Append).|   save(basePath)
scala> val roAfterDeleteViewDF = spark.|   read.|   format("hudi").|   load(basePath + "/*/*/*/*")
scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res15: Long = 8

只有append模式,才支持删除功能

近实时摄取

将外部数据(例如事件日志,数据库,外部源)如何摄取到Hadoop Data Lake是一个众所周知的问题。在大多数Hadoop部署中,经常会以零碎的方式,使用多种摄取工具解决,这些数据对整个组织是最具有价值的。
对于RDBMS关系型的摄入,Hudi提供了更快的Upset操作。例如,你可以通过MySql binlog的形式或者Sqoop导入到hdfs上的对应的Hudi表中,这样操作比Sqoop批量合并job(Sqoop merge)和复杂合并工作流更加快速高效。
对于NoSql的数据库,比如Cassandra,Voldemort,Hbase,这种可以存储数十亿行的数据库。采用完全批量加载是根本不可行的,并且如果摄取数据要跟上通常较高的更新量,则需要更有效的方法。
即使对于像Kafka这样不可变数据库源,Hudi也会在HDFS 上强制执行最小文件大小,从而通过整体解决Hadoop领域中小文件过多问题,改善NameNode的运行状况。对于事件流尤为重要,因为事件流通常较大(例如:点击流),并且如果管理不善,可能会严重损害Hadoop集群。
在所有来源中,Hudi都增加了急需的功能,即通过提交概念将新数据原子推送给消费者,避免摄入数据失败。

近实时分析

通常,实时数据集市由专门的分析存储(例如Druid或Memsql,OpenTSDB)提供支持,对于较低规模的数据,这绝对是完美的,可实现亚秒级响应查询。但是通常这些数据库最终会因为交互性较低的查询而被滥用,导致利用率不足。
另一方面,Hadoop上交互式的SQL解决方案有Presto和Spark sql。将数据的更新时间缩短至几分钟,Hudi可以提供多种高效的替代方案,并可以对存储在DFS中的多个大小表进行实时分析,此外Hudi没有外部依赖,例如专用实时分析的专用HBase集群,因此可以在不增加操作开销的情况下,进行更快更新鲜的分析。

增量处理管道

Hadoop提供的一项基本功能是,通过表示为工作流的DAG来构建彼此衍生的表链。工作流通常取决于多个上游工作流输出的新数据,并且传统上,新数据的可用性由新的DFS文件夹/配置单元分区指示。让我们举一个具体的例子来说明这一点。上游工作流U可以每小时创建一个Hive分区,并在每小时的末尾(processing_time)包含该小时(event_time)的数据,从而提供1小时的有效刷新。然后,下游工作流D在U完成后立即开始,并在接下来的一个小时内进行自己的处理,从而将有效延迟增加到2个小时。

上述范例只是忽略了延迟到达的数据,即,processing_time和event_time分开时。不幸的是,在当今的后移动和物联网前世界中,间歇性连接的移动设备和传感器的最新数据是正常现象,而不是反常现象。在这种情况下,保证正确性的唯一补救方法是每小时一次又一次地重新处理最后几个小时的数据,这会严重损害整个生态系统的效率。例如想象在数百个工作流程中每小时重新处理价值TB的数据。

Hudi通过提供一种以记录粒度(而不是文件夹/分区)消耗上游Hudi表HU的新数据(包括最新数据),应用处理逻辑并有效地更新/协调最新数据的方式再次进行救援。下游护桌HD。在这里,HU和HD可以以更频繁的时间表(例如15分钟)连续进行调度,并在HD上提供30分钟的终端延迟。

为了实现这一目标,Hudi从流处理框架(如Spark Streaming),发布/订阅系统(如Kafka)或数据库复制技术(如Oracle XStream)中采用了类似的概念

DFS的数据分散

Hudi可以像Kafka一样,用于数据分散,将每个管道的数据输出到Hudi表中,然后将其递增尾部以获取新数据并写入服务存储中。

与其他数据库对比

  • 与Kudu对比
    Apache Kudu是一个存储系统,其目标与Hudi类似,后者的目标是通过对Upserts的一流支持,对PB级数据进行实时分析。一个关键的区别是Kudu还试图充当OLTP工作负载的数据存储,而Hudi并不希望这样做。因此,Kudu不支持增量拉取(截至2017年初),Hudi这样做是为了启用增量处理用例。

Kudu与分布式文件系统抽象和HDFS完全不同,它自己的一组存储服务器通过RAFT相互通信。另一方面,Hudi旨在与底层Hadoop兼容文件系统(HDFS,S3或Ceph)一起使用,并且没有自己的存储服务器,而是依靠Apache Spark来完成繁重的工作。因此,就像其他Spark作业一样,Hudi可以轻松扩展,而Kudu则需要硬件和操作支持,这对于HBase或Vertica这样的数据存储来说是典型的。

  • 与HBase对比
    HBase是OLTP工作负载的关键值存储,但鉴于与Hadoop的相似性,用户通常倾向于将HBase与分析相关联。 鉴于HBase经过严格的写优化,它支持开箱即用的亚秒级更新,Hive-on-HBase允许用户查询该数据。 但是,就分析工作负载的实际性能而言,混合列式存储格式(例如Parquet / ORC)可以轻松击败HBase,因为这些工作负载主要是读取繁重的工作。 Hudi弥补了更快的数据与具有分析性存储格式之间的差距。 从操作的角度来看,与仅管理分析用的大型HBase区域服务器场相比,为用户提供可提供更快数据的库更具可扩展性。 最终,HBase不像Hudi这样的一流公民来支持诸如提交时间,增量拉动之类的增量处理原语。
    Hudi拥有更好数据分析能力。
  • 与Streaming流式处理对比
    Hudi可以与当今的批处理(写时复制表)和流式(读时合并表)作业集成,以将计算结果存储在Hadoop中。对于Spark应用程序,这可以通过将Hudi库与Spark / Spark流式DAG直接集成来实现。在非Spark处理系统(例如Flink,Hive)的情况下,可以在各个系统中进行处理,然后通过Kafka主题/ DFS中间文件将其发送到Hudi表中。从概念上讲,数据处理管道仅由三个部分组成:源,处理,接收器,用户最终针对接收器运行查询以使用管道的结果。 Hudi可以充当将数据存储在DFS上的源或接收器。 Hudi在给定流处理管道上的适用性最终归结为Presto / SparkSQL / Hive对您的查询的适用性。

Hudi中的名称概念

  • Timeline
    Hudi的核心是维护不同时间对表执行的所有操作的事件表,这有助于提供表的即时视图,同时还有效地支持按到达顺序进行数据检索。Hudi包含以下组件:
    (1)Instant action:在表上的操作类型
    (2)Instant time:操作开始的一个时间戳,该时间戳会按照开始时间顺序单调递增
    (3)state:即时状态

Hudi保证在时间轴上执行的操作都是原先性的,所有执行的操作包括:
(1)commits:原子的写入一张表的操作
(2)cleans:后台消除了表中的旧版本数据,即表中不在需要的数据
(3)delta_commit:增量提交,将一批数据原子写入到MergeOnRead表中,并且只记录到增量日志中
(4)compaction:后台协调Hudi中的差异数据
(5)rollback:回滚,删除在写入过程中的数据
(6)savepoint:将某些文件标记“已保存”,以便清理数据时不会删除它们,一般用于表的还原,可以将数据还原到某个时间点

任何操作都可以处于以下状态
(1)Requested:表示已安排操作行为,但是尚未开始
(2)Inflight:表示正在执行当前操作
(3)Completed:表示已完成操作

Timeline

Hudi的核心是维护不同时间对表执行的所有操作的事件表,这有助于提供表的即时视图,同时还有效地支持按到达顺序进行数据检索。Hudi包含以下组件:
(1)Instant action:在表上的操作类型
(2)Instant time:操作开始的一个时间戳,该时间戳会按照开始时间顺序单调递增
(3)state:即时状态

Hudi保证在时间轴上执行的操作都是原先性的,所有执行的操作包括:
(1)commits:原子的写入一张表的操作
(2)cleans:后台消除了表中的旧版本数据,即表中不在需要的数据
(3)delta_commit:增量提交,将一批数据原子写入到MergeOnRead表中,并且只记录到增量日志中
(4)compaction:后台协调Hudi中的差异数据
(5)rollback:回滚,删除在写入过程中的数据
(6)savepoint:将某些文件标记“已保存”,以便清理数据时不会删除它们,一般用于表的还原,可以将数据还原到某个时间点

任何操作都可以处于以下状态
(1)Requested:表示已安排操作行为,但是尚未开始
(2)Inflight:表示正在执行当前操作
(3)Completed:表示已完成操作

File management

Hudi将表组织成DFS上基本路径下的目录结构。表分位几个分区,与hive类似,每个分区均有唯一标示。
在每个分区内,有多个数据组,每个数据组包含几个文件片,其中文件片包含基本文件和日志文件。Hudi采用MVCC设计,其中压缩操作将日志文件和基本数据文件合并成新的文件片,而清楚操作则将未使用的文件片去除。

索引

Hudi通过使用索引机制,生成hoodie密钥映射对应文件ID,从而提供高效upsert操作。

表类型

(1)Copy on Write:仅使用列式存储,例如parquet。仅更新版本号,通过写入过程中执行同步合并来重写文件
(2)Merge on Read:基于列式存储(parquet)和行式存储(arvo)结合的文件更始进行存储。更新记录到增量文件,压缩同步和异步生成新版本的文件

以下是对比

大数据数据湖之hudi相关推荐

  1. 技术干货|基于Apache Hudi 的CDC数据入湖

    简介:阿里云技术专家李少锋(风泽)在Apache Hudi 与 Apache Pulsar 联合 Meetup 杭州站上的演讲整理稿件,本议题将介绍典型 CDC 入湖场景,以及如何使用 Pulsar/ ...

  2. 技术干货|基于Apache Hudi 的CDC数据入湖「内附干货PPT下载渠道」

    简介: 阿里云技术专家李少锋(风泽)在Apache Hudi 与 Apache Pulsar 联合 Meetup 杭州站上的演讲整理稿件,本议题将介绍典型 CDC 入湖场景,以及如何使用 Pulsar ...

  3. 数据湖存储格式Hudi原理与实践

    今天给大家分享阿里云DLA团队技术专家李伟所做的分享<数据湖存储格式Hudi原理yu .pdf>,对数据湖及Apache Hudigan兴趣的伙伴别错过啦!(到省时查报告小程序中搜索&qu ...

  4. 大数据_湖仓一体:下一代存储解决方案

    目录 一.什么是湖仓一体 二.湖仓一体架构的特点 三.常见框架 1.Apache Hudi 2.Apache Iceberg 3.Delta Lake 数据库早已解决了数据问题,但无法满足现代使用场景 ...

  5. 数据湖架构Hudi(五)Hudi集成Flink案例详解

    五.Hudi集成Flink案例详解 5.1 hudi集成flink flink的下载地址: https://archive.apache.org/dist/flink/ Hudi Supported ...

  6. Flink + Iceberg,腾讯百亿级实时数据入湖实战

    简介:上海站 Flink Meetup 分享内容,腾讯数据湖的百亿级数据场景落地的案例分享. 本文整理自腾讯数据湖研发高级工程师陈俊杰在 4 月 17 日 上海站 Flink Meetup 分享的&l ...

  7. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  8. Flink 和 Iceberg 如何解决数据入湖面临的挑战

    简介:4.17 上海站 Meetup 胡争老师分享内容:数据入湖的挑战有哪些,以及如何用 Flink + Iceberg 解决此类问题. GitHub 地址 https://github.com/ap ...

  9. 大数据数据量估算_如何估算数据科学项目的数据收集成本

    大数据数据量估算 (Notes: All opinions are my own) (注:所有观点均为我自己) 介绍 (Introduction) Data collection is the ini ...

最新文章

  1. RHEL6入门系列之九,常用命令2
  2. php开发 linux作用是什么,linux有什么用?
  3. logm--求矩阵的对数
  4. VMware Fusion DHCP方式下如何指定虚拟机IP地址
  5. [转]在WPF中自定义控件 UserControl
  6. 数据科学 IPython 笔记本 8.10 自定义颜色条
  7. 面试中经常会被问到的70个问题
  8. linux内存管理:kmap、vmap、ioremap
  9. 2018-05-02 os.path
  10. Android:沉浸式状态栏(一)工具类
  11. MySQL中Join算法实现原理通俗易懂
  12. php5 mysql 源_thinkphp6:访问多个mysql数据源(thinkphp6.0.5 / php 7.4.9)
  13. poj1637 Sightseeing tour 混合图欧拉回路判定
  14. foobar2000 用了那么久 才学会设置 好音质设置
  15. FPGA代码规则检查工具
  16. python抓包超星网课试卷_2020网络数据采集与Python爬虫【带实验】高校邦网课答案...
  17. 浅谈智能客服机器人的产品设计
  18. 对话微软大中华区CEO梁念坚:WP7为云而生
  19. 模拟赛DAY 2 T2不老梦
  20. 年终总结:2021年最有用的数据清洗 Python 库

热门文章

  1. 今天是我在csdn的1265天
  2. Marlin固件之—:基础入门与测试
  3. 修改Unity3d Asset Store 默认下载文件位置?
  4. 《数据结构》实验报告(一)——顺序表存储结构及实现
  5. 论文翻译-基于深度残差收缩网络的故障诊断 Deep Residual Shrinkage Networks for Fault Diagnosis
  6. QT 支持多客户端网络视频监控的实现
  7. 论文写作总结(中外文对比)
  8. 【java华为机试】HJ10 字符个数统计
  9. HTTP协议——URL
  10. COPA和利润中心PCA的区别