Hudi0.10master测试流程

Hudi粗糙介绍

hudi同步hive底层大概是什么原理,都是指向同一份存储没有拷贝数据吗,hive是怎么实现update、delete逻辑。

hudi底层其实还是hdfs,只不过hudi提供对写入数据包括其metadata的管理和数据组织方式,通过hudi本身支持acid语义,这样可以确保数据写入和hive存放hdfs方式一致,再在hive增加对应metadata信息。 可以将hudi看做是table format用来组织数据存放位置和格式之类的

hive读取的时候是对hdfs进行list操作,如果分区文件太多,性能很差; hudi增加index在读取的时候,能快速进行目标文件

它承担完成组织数据及对提供组织数据时提供的格式(parquet,orc,avro,arrow等),并在完成组织数据过程中提供了acid的语义,可以将其理解为table format;

hive也是可以理解为table format;两者肯定有差异; 比如hive是否满足acid,是否提供存储一体化满足离线和实时(近实时),与hive读取相比是的性能如何? 是否支持schema变更是前后数据兼容,自动识别…这些

对象存储是不支持 append 的

本地环境

名称 版本 描述
flink(pre-job) 1.3.2 通过parcel包部署于cdh6.3.2中
cdh 6.3.2 开源版本
hive 2.1.1-cdh6.3.2 包含cdh中(更换jar升级替换)
hadoop 3.0.0-cdh6.3.2 cdh原生版本
presto 2.591 开源版本
trino 360 开源版本
hudi 0.10 master分支编译

编译hudi包

github 拉取hudi代码

git clone  https://github.com/apache/hudi.git

编译hudi

mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
# 如果是 hive3 需要使用 profile -Pflink-bundle-shade-hive3
# 如果是 hive1 需要使用 profile -Pflink-bundle-shade-hive1#注意1:hive1.x现在只能实现同步metadata到hive,而无法使用hive查询,如需查询可使用spark查询hive外表的方法查询。
#注意2: 使用-Pflink-bundle-shade-hive x,需要修改profile中hive的版本为集群对应版本(只需修改profile里的hive版本)。修改位置为packaging/hudi-flink-bundle/pom.xml最下面的对应profile段,找到后修改profile中的hive版本为对应版本即可。
packaging/hudi-flink-bundle/pom.xml
<profile><id>flink-bundle-shade-hive2</id><properties><hive.version>2.1.1-cdh6.3.2</hive.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope></properties><dependencies><dependency><groupId>${hive.groupid}</groupId><artifactId>hive-service-rpc</artifactId><version>${hive.version}</version><scope>${flink.bundle.hive.scope}</scope></dependency></dependencies></profile>

当flink/lib下有flink-sql-connector-hive-xxx.jar时,会出现hive包冲突,解决方法是在install时,另外再指定一个profile:-Pinclude-flink-sql-connector-hive,同时删除掉flink/lib下的flink-sql-connector-hive-xxx.jar

Note: 该问题从 0.10 版本已经解决。

编译完成之后包的位置

  • flink依赖
hudi/packaging/hudi-flink-bundle/target
  • hive依赖
hudi/packaging/hudi-hadoop-mr-bundle/target
  • 导入包

    • flink依赖
    cp hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar FLINK_HOME/lib/
    #也可以通过-j或者-l指定 但是当前仅master可指定,0.90存在bug
    
    • hive依赖
    cp hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/hive/lib/
    

Hudi 表类型

COW 表适用于离线批量更新场景,对于更新数据,会先读取旧的 base file,然后合并更新数据,生成新的 base file。

MOR 表适用于实时高频更新场景,更新数据会直接写入 log file 中,读时再进行合并。为了减少读放大的问题,会定期合并 log file 到 base file 中。

开始数据湖操作

进入flink-sql-clent

flink-sql-client -l /root/hudi
使用的flink为cdh on flink(pre-job)。命令为全局命令
-l #指定文件夹,加载文件夹中所有jar
-j #指定jar#/root/hudi
-rw-r--r-- 1 root root  3670520 9月  26 09:00 flink-sql-connector-kafka_2.11-1.13.2.jar
-rw-r--r-- 1 root root 57509301 9月  26 15:15 hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

测试数据类型

类型 备注
tinyint 1字节 整数值
smallint 2字节 整数值
int 4字节 整数值
bigint 8字节 整数值
decimal(precision, scale) 精确数值,精度precision,小数点后位数scale
precision取值1~38,缺省默认为9
scale不能大于precision,缺省默认为0
float 4字节 浮点型
double 8字节 浮点型
boolean true/false
char(length) 固定长度字符,length必填(1~255)

kafka消息数据参考

{"tinyint0": 6, "smallint1": 223, "int2": 42999, "bigint3": 429450, "float4": 95.47324181659323, "double5": 340.5755392968011,"decimal6": 111.1111, "boolean7": true,  "char8": "dddddd", "varchar9": "buy0", "string10": "buy1", "timestamp11": "2021-09-13 03:08:50.810"}

创建source表

CREATE TABLE k (tinyint0 TINYINT,smallint1 SMALLINT,int2 INT,bigint3 BIGINT,float4 FLOAT,double5 DOUBLE  ,decimal6 DECIMAL(38,8),boolean7 BOOLEAN,char8 STRING,varchar9 STRING,string10 STRING,timestamp11 STRING
) WITH ('connector' = 'kafka'                             -- 使用 kafka connector, 'topic' = 'hd4'                                   -- kafka topic名称, 'scan.startup.mode' = 'earliest-offset'           -- 从起始 offset 开始读取, 'properties.bootstrap.servers' = 'cdh4:9092'      -- kafka broker 地址, 'properties.group.id' = 'testgroup1' , 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'true', 'value.fields-include' = 'ALL'
);

创建Hudi(cow)sink表

CREATE TABLE hdc(tinyint0 TINYINT ,smallint1 SMALLINT,int2 INT,bigint3 BIGINT,float4 FLOAT,double5 DOUBLE  ,decimal6 DECIMAL(12,3),boolean7 BOOLEAN,char8 CHAR(64) PRIMARY KEY NOT ENFORCED,varchar9 VARCHAR(64),string10 STRING,timestamp11 TIMESTAMP(3))
PARTITIONED BY (tinyint0) WITH ('connector' = 'hudi', 'path' = 'hdfs://nameservice1/data/hudi/hdc', 'write.precombine.field' = 'timestamp11'             -- 相同的键值时,取此字段最大值,默认ts字段, 'write.tasks' = '4', 'write.rate.limit' = '2000'                          -- 限制每秒多少条, 'hive_sync.enable' = 'true'                          -- 启用hive同步, 'hive_sync.mode' = 'hms'                             -- 启用hive hms同步,默认jdbc, 'hive_sync.metastore.uris' = 'thrift://cdh3:9083'    -- required, metastore的端口, 'hive_sync.jdbc_url' = 'jdbc:hive2://cdh3:10000'     -- required, hiveServer地址, 'hive_sync.table' = 'hdc'                            -- required, hive 新建的表名, 'hive_sync.db' = 'hudi'                              -- required, hive 新建的数据库名, 'hive_sync.username' = 'hive'                        -- required, HMS 用户名, 'hive_sync.password' = ''                            -- required, HMS 密码);

创建Hudi(mor)sink表

CREATE TABLE hdm2(tinyint0 TINYINT,smallint1 SMALLINT,int2 INT,bigint3 BIGINT,float4 FLOAT,double5 DOUBLE  ,decimal6 DECIMAL(12,3),boolean7 BOOLEAN,char8 CHAR(64),varchar9 VARCHAR(64),string10 STRING,timestamp11 TIMESTAMP(3))
PARTITIONED BY (tinyint0) WITH ('connector' = 'hudi', 'path' = 'hdfs://nameservice1/data/hudi/hdm2', 'hoodie.datasource.write.recordkey.field' = 'char8'  -- 主键, 'write.precombine.field' = 'timestamp11'             -- 相同的键值时,取此字段最大值,默认ts字段, 'write.tasks' = '1', 'read.tasks' = '4', 'compaction.tasks' = '2', 'write.rate.limit' = '2000'                          -- 限制每秒多少条, 'table.type' = 'MERGE_ON_READ'                       -- 默认COPY_ON_WRITE, 'compaction.async.enabled' = 'true'                  -- 在线压缩, 'compaction.trigger.strategy' = 'num_commits'        -- 按次数压缩, 'compaction.delta_commits' = '5'                     -- 默认为5, 'hive_sync.enable' = 'true'                          -- 启用hive同步, 'hive_sync.mode' = 'hms'                             -- 启用hive hms同步,默认jdbc, 'hive_sync.metastore.uris' = 'thrift://cdh3:9083'    -- required, metastore的端口, 'hive_sync.jdbc_url' = 'jdbc:hive2://cdh3:10000'     -- required, hiveServer地址, 'hive_sync.table' = 'hdm2'                            -- required, hive 新建的表名, 'hive_sync.db' = 'hudi'                              -- required, hive 新建的数据库名, 'hive_sync.username' = 'hive'                        -- required, HMS 用户名, 'hive_sync.password' = ''                            -- required, HMS 密码, 'hive_sync.skip_ro_suffix' = 'true'                  -- 去除ro后缀);

插入source数据

insert into hdm
select   cast(tinyint0 as TINYINT), cast(smallint1 as SMALLINT), cast(int2 as INT), cast(bigint3 as BIGINT), cast(float4 as FLOAT), cast(double5 as DOUBLE), cast(decimal6 as DECIMAL(38,18)), cast(boolean7 as BOOLEAN), cast(char8 as CHAR(64)), cast(varchar9 as VARCHAR(64)), cast(string10 as STRING), cast(timestamp11 as TIMESTAMP(3)) from  k;

插入单条数据测试

INSERT INTO hd VALUES(cast(1218 as TINYINT), cast(295 as SMALLINT), cast(-210121792 as INT), cast(-3697946268377828253 as BIGINT), cast(1.123459111111 as FLOAT), cast(1111111.123411 as DOUBLE), cast(1111.1234111 as DECIMAL(12, 3) ), cast(123123123123 as BOOLEAN), cast('`[s1tX213ysdasdasdgfq3wqwdqwqd速度速度pGPYl`AggMaHNRJv\[CkIYzcgMlmVvLSjtYmnlBEcwH^kEgDSxGIwGNLDP' as CHAR(64)), cast('daQOIE[n_eJsYLBJLttyFHnBXiCoT`RWeCO\G[JZZTdFFnFZFCODoI`X[SbMVAjq' as VARCHAR(64)), cast('e1916697-e626-4446-bd18-0142bfb9417b' as STRING), cast('2021-09-13 03:08:50.810' as TIMESTAMP(3))
);

流读hudi

参数 默认 描述
read.streaming.enabled false 流读
read.streaming.check-interval 60 流读检查秒数
read.streaming.start-commit 设置此参数将从提供的时间后开始读取数据

设置查询模式

SET sql-client.execution.result-mode=table;
SET sql-client.execution.result-mode=changelog;
SET sql-client.execution.result-mode=tableau;

presto查询hudi配置

presto 可以直接通过hive-catalog查询hudi

connector.name=hive-hadoop2
hive.metastore.uri=thrift://cdh2:9083
hive.config.resources=/etc/alternatives/hadoop-conf/core-site.xml,/etc/alternatives/hadoop-conf/hdfs-site.xml
hive.parquet.use-column-names=true

当 Presto-server-xxx 版本 < 0.233 时,hudi-presto-bundle.jar需要手动导入到{presto_install_dir}/plugin/hive-hadoop2/.

hudi sync hive presto表数据类型测试

Hudi数据类型 hive数据类型 presto数据类型 备注 极值 插入不符合值结果 备注
tinyint int integer(10) 1字节 整数值 (-128~127) 数值溢出 超过20位Flink-JOb返回异常
smallint int integer(10) 2字节 整数值 (-32768~32767) 数值溢出 超过20位Flink-JOb返回异常
int int integer(10) 4字节 整数值 (-2147483648~
2147483647)
数值溢出 超过20位Flink-JOb返回异常
bigint bigint bigint(19) 8字节 整数值 (±9.22*10的18次方) 数值溢出 超过20位Flink-JOb返回异常
decimal(m, d) decimal(m,d) decimal(m, d) 精确数值,精度m,小数点后位数d
m取值1~38,缺省默认为9
d不能大于m,缺省默认为0
参数m<65 是总个数
d<30且 d<m 是小数位
小数位超出按位截取,整数位超出指定为值为NULL 超过20位Flink-JOb返回异常
float float real(24) 4字节 浮点型 8位精度(4字节) 小数保留八位,超出截取,整数位异常 超过20位Flink-JOb返回异常
double double double(53) 8字节 浮点型 16位精度(8字节) 小数位插入正常,整数位异常 超过20位Flink-JOb返回异常
boolean boolean boolean true/false true/false 插入数值为true,插入字符为NULL 插入任何仅返回true和flase
char(length) string varchar 固定长度字符,length必填(1~255) 最多255个字符 可随意插入,与设定长度无关
varchar(max_length) string varchar 可变长度字符,max_length必填(1~65535) 可随意插入,与设定长度无关
string string varchar 字符串 无异常,可随意插入
timestamp bigint(19) bigint(19) 时间戳 hive自动转类型

hudi(mor)离线压缩

  • 单机flink压缩提交

    ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor /root/hudi/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar --path hdfs:///data/hudi/hd2 --compaction-tasks 4
    
  • Flink on yarn(pre-job)离线压缩

    flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dtaskmanager.numberOfTaskSlots=2 -Denv.java.opts="-Dfile.encoding=UTF-8" -c org.apache.hudi.sink.compact.HoodieFlinkCompactor /root/hudi/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar --path  hdfs:///data/hudi/hd7  --compaction-tasks 4
    

多引擎同时操作hudi

  • Spark操作数据

    Flink正常可见,不可操作,且Spark操作数据会刷新Flink插入数据

  • FLink操作数据

    Spark不可见

支持查询矩阵

Copy-On-Write #

查询引擎 快照查询 增量查询
Hive Y Y
Spark SQL Y Y
Spark Datasource Y Y
Flink SQL Y N
PrestoDB Y N
Trino Y N
Impala3.4 或更高版本 Y N

Merge-On-Read #

查询引擎 快照查询 增量查询 读优化查询
Hive Y(有bug 社区在修复) Y Y
Spark SQL Y Y Y
Spark Datasource Y Y Y
Flink SQL Y Y Y
PrestoDB Y N Y
Trino N N Y
Impala N N Y

注:

在线压缩策略没起之前占用内存资源,推荐离线压缩,但离线压缩需手动根据压缩策略才可触发

cow写少读多的场景 mor 相反

MOR表压缩在线压缩按照配置压缩,如压缩失败,会有重试压缩操作,重试压缩操作延迟一小时后重试

数据湖技术Hudi0.10master测试流程相关推荐

  1. 数据湖技术 Iceberg 的探索与实践

    随着大数据存储和处理需求的多样化,如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析成了企业构建大数据生态的一个重要方向.Netflix 发起的 Apache Iceberg 项目具备 AC ...

  2. 数据湖技术之Hudi 集成 Spark

    数据湖技术之Hudi 集成 Spark 数据湖框架Hudi,从诞生之初支持Spark进行操作,后期支持Flink,接下来先看看与Spark整合使用,并且在0.9.0版本中,提供SparkSQL支持,编 ...

  3. 数据湖04:数据湖技术架构演进

    系列专题:数据湖系列文章 1. 背景 国内的大型互联网公司,每天都会生成几十.几百TB,甚至几PB的原始数据.这些公司通常采用开源的大数据组件来搭建大数据平台.大数据平台经历过"以Hadoo ...

  4. 【推荐】数据湖技术及实践与案例资料汇总合集47篇

    数据湖或hub的概念最初是由大数据厂商提出的,表面上看,数据都是承载在基于可向外扩展的HDFS廉价存储硬件之上的.但数据量越大,越需要各种不同种类的存储.最终,所有的企业数据都可以被认为是大数据,但并 ...

  5. 大数据架构师——数据湖技术(二)

    文章目录 数据湖技术 数据湖技术之Iceberg Spark 与 Iceberg 整合 1. Spark3.2.1 与 Iceberg0.13.2整合 添加依赖 Spark 设置 Catalog 配置 ...

  6. 上万规模数据湖如何在实验室测试

    摘要:上万规模的数据湖如何在进行实验室进行功能.性能.可靠性等方面的测试,也成为研发团队需要考虑的问题. 本文分享自华为云社区<如何在实验室进行MRS大集群规模测试>,作者: 老人与海 . ...

  7. 数据湖技术在某行业的实践

    与传统的数据架构要求整合.面向主题.固定分层等特点不同,数据湖为企业全员独立参与数据运营和应用创新提供了极大的灵活性,并可优先确保数据的低时延.高质量和高可用,给运营商数据架构优化提供了很好的参考思路 ...

  8. 开源大数据:Iceberg新一代数据湖技术实践

    数据湖三剑客 1.Iceberg 基本结构 1-1.Iceberg 表格式 Apache Iceberg是一种用于大型分析数据集的开放表格格式.Iceberg向Trino和Spark添加了使用高性能格 ...

  9. 技术干货—敏捷测试流程

    与传统测试组织相比,敏捷开发团队不再细分小组,敏捷开发团队构建时即确定了测试工程师,因此,敏捷测试中不存在测试团队构建环节. 同时,因产品规划期间已经设定了具体的实现目标,大部分软件公司不再要求测试工 ...

最新文章

  1. 前沿科技山雨欲来,四大领域存创新机会
  2. NR 5G 用户平面协议
  3. python怎么写csv文件_python怎么写csv文件
  4. boost::edge_connectivity用法的测试程序
  5. 什么是回调地狱以及promise的链式调用和aysnc/await
  6. Hive注册表那点事(5.0 VS 6.0)
  7. 关于安装deepin+window10双系统有时没有声音的问题
  8. Python算法题----在列表中找到和为s的两个数字
  9. 动态规划--牛客网19校招--魔法深渊
  10. 贷中风控调额方法与策略详解
  11. 小白到高级程序员,进阶过程中都需必备些什么。
  12. css学习_文本有关的样式属性、sublime快捷生成标签
  13. 【统计学习方法】统计学习方法概论(2)
  14. combo是什么意思啊(combo卡是什么意思)
  15. 直截了当地解释 ERC-3525 与 ERC-1155 的差别
  16. 微分方程解析解+数值解
  17. 技术赋能数字经济释放巨大潜力
  18. 计算机主板提炼金,你知道如何从废旧主板中提炼金子吗?
  19. Air Kiss(飞吻)技术简介,AP配网简介,airkissdebugger.apk app给智能设备通过airkiss配网过程
  20. android 动画 图片从指定位置飞到指定位置

热门文章

  1. 《炬丰科技-半导体工艺》湿法蚀刻工艺对铜及其合金蚀刻剂的评述
  2. 使用YonBuilder移动开发平台开发视频会议App
  3. MAC os x 修改文件夹图标
  4. 高频、射频傻傻分不清楚?看完这个你就懂了
  5. 初入神经网络剪枝量化4(大白话)
  6. SQL入门经典第5版(Sams Teach Yourself SQL in 24 Hours, 5th)随书习题的建表和插入
  7. egret 实现图片一次闪光效果
  8. MySQL 数据库死锁问题-Deadlock found when trying to get lock
  9. Effective C++条款20:宁以pass-by-reference-to-const替换pass-by-value
  10. python用函数计算个人所得税_用if函数计算个人所得税