Spark SQL增量查询Hudi表
前言
由于项目上主要用Hive查询Hudi,所以之前总结过一篇:Hive增量查询Hudi表。最近可能会有Spark SQL增量查询Hudi表的需求,并且我发现目前用纯Spark SQL的形式还不能直接增量查询Hudi表,于是进行学习总结一下。
编程方式(DF+SQL)
先看一下官方文档上Spark SQL增量查询的方式,地址:https://hudi.apache.org/cn/docs/quick-start-guide#incremental-query 和 https://hudi.apache.org/cn/docs/querying_data#incremental-query
它是先通过spark.read中添加增量参数的形式读Hudi表为DF,然后将DF注册成临时表,最后通过Spark SQL查询临时表的形式,实现增量查询的。
参数
- hoodie.datasource.query.type=incremental 查询类型,值为incremental时代表增量查询,默认值snapshot,增量查询时,该参数必填
- hoodie.datasource.read.begin.instanttime 增量查询开始时间,必填 例如:20221126170009762
- hoodie.datasource.read.end.instanttime 增量查询结束时间,非必填 例如:20221126170023240
- hoodie.datasource.read.incr.path.glob 增量查询指定分区路径,非必填 例如 /dt=2022-11*/*
查询范围 (BEGIN_INSTANTTIME,END_INSTANTTIME],也就是大于开始时间(不包含),小于等于结束时间(包含),如果没有指定结束时间,那么查询大于BEGIN_INSTANTTIME到现在为止最新的数据,如果指定INCR_PATH_GLOB,那么只在指定分区路径下面查询对应的数据。
代码示例
完整代码地址:https://github.com/dongkelun/hudi-demo/blob/master/hudi0.12_spark3.1/src/main/scala/com/dkl/hudi/spark3_1/IncrementalQuery.scala
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, INCR_PATH_GLOB, QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifierval tableName = "test_hudi_incremental"spark.sql(s"""|create table $tableName (| id int,| name string,| price double,| ts long,| dt string|) using hudi| partitioned by (dt)| options (| primaryKey = 'id',| preCombineField = 'ts',| type = 'cow'| )|""".stripMargin)spark.sql(s"insert into $tableName values (1,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (2,'hudi',10,100,'2022-11-25')")
spark.sql(s"insert into $tableName values (3,'hudi',10,100,'2022-11-26')")
spark.sql(s"insert into $tableName values (4,'hudi',10,100,'2022-12-26')")
spark.sql(s"insert into $tableName values (5,'hudi',10,100,'2022-12-27')")val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val basePath = table.storage.properties("path")// incrementally query data
val incrementalDF = spark.read.format("hudi").option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTTIME.key, beginTime).option(END_INSTANTTIME.key, endTime).option(INCR_PATH_GLOB.key, "/dt=2022-11*/*").load(basePath)
// table(tableName)incrementalDF.createOrReplaceTempView(s"temp_$tableName")spark.sql(s"select * from temp_$tableName").show()
spark.stop()
结果:
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221126165954300|20221126165954300...| id:1| dt=2022-11-25|de99b299-b9de-423...| 1|hudi| 10.0|100|2022-11-25|
| 20221126170009762|20221126170009762...| id:2| dt=2022-11-25|de99b299-b9de-423...| 2|hudi| 10.0|100|2022-11-25|
| 20221126170030470|20221126170030470...| id:5| dt=2022-12-27|75f8a760-9dc3-452...| 5|hudi| 10.0|100|2022-12-27|
| 20221126170023240|20221126170023240...| id:4| dt=2022-12-26|4751225d-4848-4dd...| 4|hudi| 10.0|100|2022-12-26|
| 20221126170017119|20221126170017119...| id:3| dt=2022-11-26|2272e513-5516-43f...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------++-----------------+
| commit_time|
+-----------------+
|20221126170030470|
|20221126170023240|
|20221126170017119|
|20221126170009762|
|20221126165954300|
+-----------------+20221126170009762
20221126170023240
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221126170017119|20221126170017119...| id:3| dt=2022-11-26|2272e513-5516-43f...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
注释掉INCR_PATH_GLOB,结果:
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221127155346067|20221127155346067...| id:4| dt=2022-12-26|33e7a2ed-ea28-428...| 4|hudi| 10.0|100|2022-12-26|
| 20221127155339981|20221127155339981...| id:3| dt=2022-11-26|a5652ae0-942a-425...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
继续注释掉END_INSTANTTIME,结果:
20221127161253433
20221127161311831
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|price| ts| dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
| 20221127161320347|20221127161320347...| id:5| dt=2022-12-27|7b389e57-ca44-4aa...| 5|hudi| 10.0|100|2022-12-27|
| 20221127161311831|20221127161311831...| id:4| dt=2022-12-26|2707ce02-548a-422...| 4|hudi| 10.0|100|2022-12-26|
| 20221127161304742|20221127161304742...| id:3| dt=2022-11-26|264bc4a9-930d-4ec...| 3|hudi| 10.0|100|2022-11-26|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+---+----------+
可以看到不包含起始时间,包含结束时间。
纯SQL方式
一般项目上都采用纯SQL方式进行增量查询,这样比较方便,纯SQL的方式参数和上面讲的参数是一样的,接下来看一下怎么用纯SQL方式实现
建表造数
create table hudi.test_hudi_incremental (id int,name string,price double,ts long,dt string
) using hudipartitioned by (dt)options (primaryKey = 'id',preCombineField = 'ts',type = 'cow'
);insert into hudi.test_hudi_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_hudi_incremental values (2,'a2', 20, 2000, '2022-11-25');
insert into hudi.test_hudi_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_hudi_incremental values (4,'a4', 40, 4000, '2022-12-26');
insert into hudi.test_hudi_incremental values (5,'a5', 50, 5000, '2022-12-27');
看一下有哪些commit_time
select distinct(_hoodie_commit_time) from test_hudi_incremental order by _hoodie_commit_time
+----------------------+
| _hoodie_commit_time |
+----------------------+
| 20221130163618650 |
| 20221130163703640 |
| 20221130163720795 |
| 20221130163726780 |
| 20221130163823274 |
+----------------------+
纯SQL方式(一)
使用Call Procedures:copy_to_temp_view
、copy_to_table
,目前这两个命令已经合到master,由scxwhite 苏乘祥
贡献,这俩参数差不多,建议使用copy_to_temp_view
,因为copy_to_table
会先将数据落盘而copy_to_temp_view
是创建的临时表,效率会高一点,且数据落盘无意义,后面还要将落盘的表删掉。
支持的参数
- table
- query_type
- view_name
- begin_instance_time
- end_instance_time
- as_of_instant
- replace
- global
测试SQL:
call copy_to_temp_view(table => 'test_hudi_incremental', query_type => 'incremental',
view_name => 'temp_incremental', begin_instance_time=> '20221130163703640', end_instance_time => '20221130163726780');select _hoodie_commit_time, id, name, price, ts, dt from temp_incremental;
结果:
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163726780 | 4 | a4 | 40.0 | 4000 | 2022-12-26 |
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
可以看到这种方式是可以实现增量查询的,但是需要注意,如果需要修改增量查询的起始时间,那么就需要重复执行copy_to_temp_view
,但是因为临时表temp_incremental已经存在,要么新起个表名,要么先删掉,再创建新的,我建议先删掉,通过下面的命令删除
drop view if exists temp_incremental;
纯SQL方式(二)
PR地址:https://github.com/apache/hudi/pull/7182,这个PR同样由scxwhite
贡献,目前只支持Spark3.2以上的版本(目前社区未合并)
增量查询SQL
select id, name, price, ts, dt from tableName
[
'hoodie.datasource.query.type'=>'incremental',
'hoodie.datasource.read.begin.instanttime'=>'$instant1',
'hoodie.datasource.read.end.instanttime'=>'$instant2'
]
这种方式,是支持了一种新的语法,在查询SQL后通过在[]添加参数的形式,感兴趣的话可以拉一下代码,自己打包试一下。
纯SQL方式(三)
使用 Spark SQL Hint实现,具体实现方式,请查看KnightChess
的这篇文章如何使用 Spark SQL Hint 对 Hudi 进行增量查询、时间旅行
最终的效果如下
select/*+hoodie_prop('default.h1',map('hoodie.datasource.read.begin.instanttime', '20221127083503537', 'hoodie.datasource.read.end.instanttime', '20221127083506081')),hoodie_prop('default.h2',map('hoodie.datasource.read.begin.instanttime', '20221127083508715', 'hoodie.datasource.read.end.instanttime', '20221127083511803'))*/id, name, price, ts
from (select id, name, price, tsfrom default.h1union allselect id, name, price, tsfrom default.h2
)
是在hint中添加增量查询相关的参数,先指定表名再写参数,但是文章好像未给出完整的代码地址,大家有时间可以自己试一下。
纯SQL方式(四)
这种方式,是我按照Hive增量查询Hudi的方式修改的源码,通过set的方式实现增量查询。
PR地址:https://github.com/apache/hudi/pull/7339
关于为啥目前不能通过set参数进行增量查询,这里说明一下:根据文章Hudi Spark SQL源码学习总结-select(查询),可知Hudi的
DefaultSource.createRelation
中的optParams
参数为readDataSourceTable
中的options = table.storage.properties ++ pathOption,也就是表本身属性中的配置参数+path,之后在createRelation
并没有接收其他参数,所以不能通过set参数的形式进行查询
和Hive增量查询一样,指定具体表名的增量查询参数
set hoodie.test_hudi_incremental.datasource.query.type=incremental
set hoodie.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163823274 | 5 | a5 | 50.0 | 5000 | 2022-12-27 |
| 20221130163726780 | 4 | a4 | 40.0 | 4000 | 2022-12-26 |
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
如果不同的库下面有相同的表名,则可以通过库名.表名的形式:
## 需要先开启使用数据库名称限定表名的配置,开启后上面不加库名的配置就失效了
set hoodie.query.use.database = true;
set hoodie.hudi.test_hudi_incremental.datasource.query.type=incremental;
set hoodie.hudi.test_hudi_incremental.datasource.read.begin.instanttime=20221130163703640;
set hoodie.hudi.test_hudi_incremental.datasource.read.end.instanttime=20221130163726780;
set hoodie.hudi.test_hudi_incremental.datasource.read.incr.path.glob=/dt=2022-11*/*;
refresh table test_hudi_incremental;
select _hoodie_commit_time, id, name, price, ts, dt from test_hudi_incremental;
+----------------------+-----+-------+--------+-------+-------------+
| _hoodie_commit_time | id | name | price | ts | dt |
+----------------------+-----+-------+--------+-------+-------------+
| 20221130163720795 | 3 | a3 | 30.0 | 3000 | 2022-11-26 |
+----------------------+-----+-------+--------+-------+-------------+
大家可以自己试一下,不同的库表关联的情形
这里需要注意一点,更新参数后,需要先refresh table
,再查询,否则查询时修改的参数不生效,因为会使用缓存中的参数。
这种方式只是简单地修改了一下源码,使set的参数对查询生效。
为了避免有些读者嫌打包麻烦,这里给大家提供了hudi-spark3.1-bundle_2.12-0.13.0-SNAPSHOT.jar的下载地址:https://download.csdn.net/download/dkl12/87221476
总结
本文总结了Spark SQL增量查询Hudi表的一些参数设置,并给出了示例,介绍了使用纯Spark SQL实现增量查询Hudi表的几种方式,不确定未来社区会采用哪种方式,大家目前如果有这种需求的话,可以先选择一种自己喜欢的方式,等未来社区版本支持后,再升级版本。本文没有涉及增量查询的原理,暂未验证增量查询的效率,是否可以起到文件过滤的效果,以后如果有时间会单独整理一篇。
相关阅读
- Apache Hudi 入门学习总结
- Hudi Spark SQL总结
- Hudi Spark SQL Call Procedures学习总结(一)(查询统计表文件信息)
- Spark3.12+Kyuubi1.5.2+kyuubi-spark-authz源码编译打包+部署配置HA
- Hudi Spark SQL源码学习总结-Create Table
- Hudi Spark SQL源码学习总结-CTAS
- Hudi Spark源码学习总结-df.write.format(“hudi”).save
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load(2)
- Hudi Spark SQL源码学习总结-select(查询)
Spark SQL增量查询Hudi表相关推荐
- sql server 查询某个表被哪些存储过程调用
sql server 查询某个表被哪些存储过程调用 select distinct object_name(id) from syscomments where id in (select id fr ...
- MySQL、Spark SQL 嵌套查询(二层、三层、多层)
MySQL.Spark SQL 嵌套查询(二层.三层.多层) 二层查询 select * from(select * from(select substring(``,1,3) from `big1` ...
- sparksql删除MySQL数据_Databricks 第6篇:Spark SQL 维护数据库和表
Spark SQL 表的命名方式是db_name.table_name,只有数据库名称和数据表名称.如果没有指定db_name而直接引用table_name,实际上是引用default 数据库下的表. ...
- Databricks 第6篇:Spark SQL 维护数据库和表
Spark SQL 表的命名方式是db_name.table_name,只有数据库名称和数据表名称.如果没有指定db_name而直接引用table_name,实际上是引用default 数据库下的表. ...
- 关于oracle sql语句查询时表名和字段名要加双引号的问题
oracle初学者一般会遇到这个问题. 用navicat可视化创建了表,可是就是不能查到! 后来发现②语句可以查询到 ①select * from user; 但是,我们如果给user加上双引号就可以 ...
- 关于oracle sql语句查询时表名和字段名要加双引号的问题详解
转自:http://www.2cto.com/database/201504/387184.html 作为oracle的初学者相信大家一定会遇到这个问题,如图: 明明就是navicat可视化创建了表, ...
- SQL数据查询——单表查询(二)
集合查询+单表查询相关实例 集合查询 1.UNION 2.INTERSECT 3.EXCEPT 相关实例 实例1 实例2 集合查询 SQL SERVER集合操作主要包括 并操作 UNION 交操作 I ...
- spark sql优化:小表大表关联优化 union替换or broadcast join
----原语句(运行18min) SELECTbb.ipFROM(SELECTip ,sum(click) click_num,round(sum(click) / sum(imp), 4) user ...
- SQL Server 查询数据表数据
查询数据表数据 1.查询数据表所有字段的数据信息: 2.查询部分列的所有数据: 3.查询部分列的所有数据,过滤重复行信息:
最新文章
- 初学者必学的C++项目!花3天搞定
- 如何构造强度较高的密码
- php aapt apk 包名,aapt 命令可应用于查看apk包名、主activity、版本等很多信息
- 【Linux系统编程】线程同步与互斥:互斥锁
- python篇第6天【数据类型】
- 详解python可迭代对象、迭代器和生成器
- 6.5使用外部环境的属性文件
- 工业互联网平台基本架构
- (转)mysql 字符串 拼接 截取 替换
- 区块链与程序员:赚钱还是创业
- 在自己订阅的GOOGLE快讯中,看到自己的文章
- 一名大学生选择军哥的乾颐堂是如何顺利通过华为HCIE的,又如何应对HCIE面试呢?...
- 【白板动画制作软件】万彩手影大师教程 | 调节动作播放时长
- 雅虎将收购什么类型公司?梅耶尔:移动!移动!移动!
- 软件配置管理岗位职责说明
- 我学ERP 之 金蝶ERP-K3_第4章 销售管理
- word的使用学习笔记(一)
- u盘怎么装电脑系统 u盘启动盘系统重装教学
- winmail不用服务器系统可以吗,用Winmail架设安全可靠的邮件服务器
- arm对应hex_ASCII与HEX对照转换表