【数据湖Hudi-8-Hudi集成Flink-入门】
数据湖Hudi-8-Hudi集成Flink-入门
- Hudi集成Flink入门
- 1.Hudi集成Flink版本对照关系
- 2.Flink环境准备
- 3.Flink SQL Client方式处理任务
- 1.修改配置
- 2.创建表格,插入数据
- 3.流式插入数据
- 4.Flink IDEA编码方式处理任务
- 1.环境准备
- 2.创建Maven工程,并编写代码
- 3.提交运行
- 5.Flink和Hudi类型映射关系
Hudi集成Flink入门
1.Hudi集成Flink版本对照关系
0.11.x不建议使用,如果要用请使用补丁分支:https://github.com/apache/hudi/pull/6182
2.Flink环境准备
1)拷贝编译好的jar包到Flink的lib目录下
cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink-1.13.6/lib/
2)拷贝guava包,解决依赖冲突
cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/
3)配置Hadoop环境变量
sudo vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile
3.Flink SQL Client方式处理任务
1.修改配置
- 1)修改flink-conf.yaml配置
vim /opt/module/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4 # hudi写出数据默认taskslots是4,如果不调整hudi,就在这里调整state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true
- 2)yarn-session模式
(1)解决依赖问题
注意:
下面包依赖问题的处理,主要是解决 flink集成Hudi的时候,flink任务在执行的时候,需要进行 compaction,但是 compaction不会成功,且此错误不会上报到总日志服务器上,所以需要进入到Flink对应的单独的任务里面,查看报错,报错信息如下,实际上在flink集成hudi里面有这个包,最终原因是以来冲突问题。
cp /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/
(2)启动yarn-session
/opt/module/flink-1.13.6/bin/yarn-session.sh -d
(3)启动sql-client
/opt/module/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session
2.创建表格,插入数据
set sql-client.execution.result-mode=tableau;
– 创建hudi表
CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' –- 默认是COW
);
或如下写法
CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ'
);
- 插入数据
INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
- 查询数据
select * from t1;
- 更新数据
insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
注意,保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的记录。每个写操作都会生成一个用时间戳表示的新提交。查找前一次提交中相同的_hoodie_record_keys在_hoodie_commit_time、age字段中的变化。
3.流式插入数据
- 1)创建测试表
CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
) WITH ('connector' = 'datagen','rows-per-second' = '1'
);create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
)
with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ'
);
- 2)执行插入
insert into t2 select * from sourceT;
- 3)查询结果
set sql-client.execution.result-mode=tableau;
select * from t2 limit 10;
4.Flink IDEA编码方式处理任务
1.环境准备
- 1.手动install依赖
在hudi-flink1.13-bundle-0.12.0.jar所在目录下,打开cmd,执行此命令,然后查看idea中settings的maven中 local repository多对应的本地依赖库目录跟执行完下面命令所对应的目录是否一致,如果不一致,需要将下面命令编译完的jar移动到刚刚目录下面。
mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar
2.创建Maven工程,并编写代码
代码如下:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {//IDEA运行时,提供WEBUI
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置状态后端 RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);//idea本地运行时,指定rocksdb存储路径
// embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb");embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);//checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);tableEnvironment.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");tableEnvironment.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" +")");tableEnvironment.executeSql("insert into t2 select * from sourceT");}
}
3.提交运行
将代码打成jar包,上传到目录myjars,执行提交命令:
flink run -t yarn-per-job \
-c com.yang.hudi.flink.HudiDemo \
./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar
5.Flink和Hudi类型映射关系
【数据湖Hudi-8-Hudi集成Flink-入门】相关推荐
- 数据湖技术之Hudi 集成 Spark
数据湖技术之Hudi 集成 Spark 数据湖框架Hudi,从诞生之初支持Spark进行操作,后期支持Flink,接下来先看看与Spark整合使用,并且在0.9.0版本中,提供SparkSQL支持,编 ...
- 数据湖架构开发-Hudi入门教程
当下大数据技术在工业环境中应用,主要构建离线数仓和实时数仓,进行大规模数据管理和分析.为了更好管理数据,出现数据湖DataLake:用来存储大量的原始数据.当下最流行数据湖框架Apache Hudi, ...
- 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi
文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...
- 数据湖浅析(以hudi为例)
数据湖定义 业界对于数据湖的定义存在一定争议,个人认为数据湖就是针对传统hive数仓不支持acid.upsert.schema evolution等痛点上,提出的一种数据存储库. hive的痛点:hi ...
- 数据湖(十七):Flink与Iceberg整合DataStream API操作
文章目录 Flink与Iceberg整合DataStream API操作 一.DataStream API 实时写入Iceberg表 1.首先在Maven中导入以下依赖
- Apache Iceberg数据湖与Trino的集成
目录 1. 介绍 2. 配置Connector 1. 介绍 Trino连接Iceberg的Connector,需要通过Hive的metastore,查询每个partition的HDFS locatio ...
- 百信银行基于 Apache Hudi 实时数据湖演进方案
简介:本文介绍了百信银行实时计算平台的建设情况,实时数据湖构建在 Hudi 上的方案和实践方法,以及实时计算平台集成 Hudi 和使用 Hudi 的方式. 本文介绍了百信银行实时计算平台的建设情况,实 ...
- Apache Hudi 在 B 站构建实时数据湖的实践
简介: B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化. 本文作者喻兆靖,介绍了为什么 B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化.主要 ...
- 深度对比Apache CarbonData、Hudi和Open Delta三大开源数据湖方案
摘要:今天我们就来解构数据湖的核心需求,同时深度对比Apache CarbonData.Hudi和Open Delta三大解决方案,帮助用户更好地针对自身场景来做数据湖方案选型. 背景 我们已经看到, ...
- Apache Hudi 数据湖概述
文章目录 前言 hudi是什么 hudi 实现更新的基本原理 基础文件 增量日志文件 文件组 文件的版本 COW表数据的更新 MOR表数据的更新 MOR 表的compact hudi 不同表格式的读取 ...
最新文章
- 使用Python,OpenCV计算图像直方图(cv2.calcHist)
- Bash 脚本:`(反引号)运算符和 $()的使用方式
- SQL Server数据库镜像部署 错误1418’处理及证书验证
- ZLMS教学管理平台系统V1.2.0最新版本发布,支持纯Web视频直播点播,还带运营在线支付功能!完全免费提供!...
- 统计学习方法第九章作业:三硬币EM算法、GMM高维高斯混合模型 代码实现
- SQL Server2005 ROW_NUMBER() OVER 使用
- php 取出多重数组中的一列_PHP获取数组中指定的一列实例
- Linux内核的Nand驱动流程分析
- php5.5 pdo mysql_PHP5中PDO的简单使用
- 使用 urllib 构造请求对象
- 信创终端高效运维与安全管理方案
- 计算机文件夹怎样显示隐藏文件,显示隐藏文件夹,教您电脑如何显示隐藏文件夹...
- go module使用教程:使用go mod的方法.报错go: cannot determine module path for source directory E:\tttttt (outside
- 利用Python进行博客图片压缩
- android设计计算等级程序,Android自定义View仿QQ等级天数进度
- 钱符号怎么打出来(如何在文档中输入人民币符号?)
- 前端面试经典js题目Foo 与 getName
- 微信公众平台的运营管理
- springboot线上教学平台计算机毕业设计(源码、运行环境)
- Java利用Ant包进行解压缩文件