数据湖Hudi-8-Hudi集成Flink-入门

  • Hudi集成Flink入门
    • 1.Hudi集成Flink版本对照关系
    • 2.Flink环境准备
    • 3.Flink SQL Client方式处理任务
      • 1.修改配置
      • 2.创建表格,插入数据
      • 3.流式插入数据
    • 4.Flink IDEA编码方式处理任务
      • 1.环境准备
      • 2.创建Maven工程,并编写代码
      • 3.提交运行
    • 5.Flink和Hudi类型映射关系

Hudi集成Flink入门

1.Hudi集成Flink版本对照关系


0.11.x不建议使用,如果要用请使用补丁分支:https://github.com/apache/hudi/pull/6182

2.Flink环境准备

1)拷贝编译好的jar包到Flink的lib目录下

cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink-1.13.6/lib/

2)拷贝guava包,解决依赖冲突

cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/

3)配置Hadoop环境变量

sudo vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile

3.Flink SQL Client方式处理任务

1.修改配置

  • 1)修改flink-conf.yaml配置
vim /opt/module/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4 # hudi写出数据默认taskslots是4,如果不调整hudi,就在这里调整state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true
  • 2)yarn-session模式
    (1)解决依赖问题
    注意:
    下面包依赖问题的处理,主要是解决 flink集成Hudi的时候,flink任务在执行的时候,需要进行 compaction,但是 compaction不会成功,且此错误不会上报到总日志服务器上,所以需要进入到Flink对应的单独的任务里面,查看报错,报错信息如下,实际上在flink集成hudi里面有这个包,最终原因是以来冲突问题。
cp /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/

(2)启动yarn-session

/opt/module/flink-1.13.6/bin/yarn-session.sh -d

(3)启动sql-client

/opt/module/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

2.创建表格,插入数据

set sql-client.execution.result-mode=tableau;

– 创建hudi表

CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' –- 默认是COW
);
或如下写法
CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ'
);
  • 插入数据
INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
  • 查询数据
select * from t1;
  • 更新数据
insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

注意,保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的记录。每个写操作都会生成一个用时间戳表示的新提交。查找前一次提交中相同的_hoodie_record_keys在_hoodie_commit_time、age字段中的变化。

3.流式插入数据

  • 1)创建测试表
CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
) WITH ('connector' = 'datagen','rows-per-second' = '1'
);create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
)
with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ'
);
  • 2)执行插入
insert into t2 select * from sourceT;
  • 3)查询结果
set sql-client.execution.result-mode=tableau;
select * from t2 limit 10;

4.Flink IDEA编码方式处理任务

1.环境准备

  • 1.手动install依赖
    在hudi-flink1.13-bundle-0.12.0.jar所在目录下,打开cmd,执行此命令,然后查看idea中settings的maven中 local repository多对应的本地依赖库目录跟执行完下面命令所对应的目录是否一致,如果不一致,需要将下面命令编译完的jar移动到刚刚目录下面。
mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

2.创建Maven工程,并编写代码

代码如下:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {//IDEA运行时,提供WEBUI
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置状态后端 RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);//idea本地运行时,指定rocksdb存储路径
//        embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb");embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);//checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);tableEnvironment.executeSql("CREATE TABLE sourceT (\n" +"  uuid varchar(20),\n" +"  name varchar(10),\n" +"  age int,\n" +"  ts timestamp(3),\n" +"  `partition` varchar(20)\n" +") WITH (\n" +"  'connector' = 'datagen',\n" +"  'rows-per-second' = '1'\n" +")");tableEnvironment.executeSql("create table t2(\n" +"  uuid varchar(20),\n" +"  name varchar(10),\n" +"  age int,\n" +"  ts timestamp(3),\n" +"  `partition` varchar(20)\n" +")\n" +"with (\n" +"  'connector' = 'hudi',\n" +"  'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t2',\n" +"  'table.type' = 'MERGE_ON_READ'\n" +")");tableEnvironment.executeSql("insert into t2 select * from sourceT");}
}

3.提交运行

将代码打成jar包,上传到目录myjars,执行提交命令:

flink run -t yarn-per-job \
-c com.yang.hudi.flink.HudiDemo \
./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar

5.Flink和Hudi类型映射关系

【数据湖Hudi-8-Hudi集成Flink-入门】相关推荐

  1. 数据湖技术之Hudi 集成 Spark

    数据湖技术之Hudi 集成 Spark 数据湖框架Hudi,从诞生之初支持Spark进行操作,后期支持Flink,接下来先看看与Spark整合使用,并且在0.9.0版本中,提供SparkSQL支持,编 ...

  2. 数据湖架构开发-Hudi入门教程

    当下大数据技术在工业环境中应用,主要构建离线数仓和实时数仓,进行大规模数据管理和分析.为了更好管理数据,出现数据湖DataLake:用来存储大量的原始数据.当下最流行数据湖框架Apache Hudi, ...

  3. 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

    文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...

  4. 数据湖浅析(以hudi为例)

    数据湖定义 业界对于数据湖的定义存在一定争议,个人认为数据湖就是针对传统hive数仓不支持acid.upsert.schema evolution等痛点上,提出的一种数据存储库. hive的痛点:hi ...

  5. 数据湖(十七):Flink与Iceberg整合DataStream API操作

    文章目录 Flink与Iceberg整合DataStream API操作 一.DataStream API 实时写入Iceberg表 1.首先在Maven中导入以下依赖

  6. Apache Iceberg数据湖与Trino的集成

    目录 1. 介绍 2. 配置Connector 1. 介绍 Trino连接Iceberg的Connector,需要通过Hive的metastore,查询每个partition的HDFS locatio ...

  7. 百信银行基于 Apache Hudi 实时数据湖演进方案

    简介:本文介绍了百信银行实时计算平台的建设情况,实时数据湖构建在 Hudi 上的方案和实践方法,以及实时计算平台集成 Hudi 和使用 Hudi 的方式. 本文介绍了百信银行实时计算平台的建设情况,实 ...

  8. Apache Hudi 在 B 站构建实时数据湖的实践

    简介: B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化. 本文作者喻兆靖,介绍了为什么 B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化.主要 ...

  9. 深度对比Apache CarbonData、Hudi和Open Delta三大开源数据湖方案

    摘要:今天我们就来解构数据湖的核心需求,同时深度对比Apache CarbonData.Hudi和Open Delta三大解决方案,帮助用户更好地针对自身场景来做数据湖方案选型. 背景 我们已经看到, ...

  10. Apache Hudi 数据湖概述

    文章目录 前言 hudi是什么 hudi 实现更新的基本原理 基础文件 增量日志文件 文件组 文件的版本 COW表数据的更新 MOR表数据的更新 MOR 表的compact hudi 不同表格式的读取 ...

最新文章

  1. 使用Python,OpenCV计算图像直方图(cv2.calcHist)
  2. Bash 脚本:`(反引号)运算符和 $()的使用方式
  3. SQL Server数据库镜像部署 错误1418’处理及证书验证
  4. ZLMS教学管理平台系统V1.2.0最新版本发布,支持纯Web视频直播点播,还带运营在线支付功能!完全免费提供!...
  5. 统计学习方法第九章作业:三硬币EM算法、GMM高维高斯混合模型 代码实现
  6. SQL Server2005 ROW_NUMBER() OVER 使用
  7. php 取出多重数组中的一列_PHP获取数组中指定的一列实例
  8. Linux内核的Nand驱动流程分析
  9. php5.5 pdo mysql_PHP5中PDO的简单使用
  10. 使用 urllib 构造请求对象
  11. 信创终端高效运维与安全管理方案
  12. 计算机文件夹怎样显示隐藏文件,显示隐藏文件夹,教您电脑如何显示隐藏文件夹...
  13. go module使用教程:使用go mod的方法.报错go: cannot determine module path for source directory E:\tttttt (outside
  14. 利用Python进行博客图片压缩
  15. android设计计算等级程序,Android自定义View仿QQ等级天数进度
  16. 钱符号怎么打出来(如何在文档中输入人民币符号?)
  17. 前端面试经典js题目Foo 与 getName
  18. 微信公众平台的运营管理
  19. springboot线上教学平台计算机毕业设计(源码、运行环境)
  20. Java利用Ant包进行解压缩文件

热门文章

  1. recyclerView + editTextView 编辑图片和文字标题 问题
  2. Feature Selective Anchor-Free(FSAF)
  3. C语言之利用文件保存数据
  4. 【算法系列】数据预处理全面介绍
  5. ZYT and LBC
  6. 毕马威明确从事区块链行业所需的四大技能
  7. 树莓派3B 之IIC通信
  8. 微信活码系统程序源码
  9. java程序笑脸怎么打_我编写的JAVA程序为什么在编译执行后会先打出一个笑脸呢?...
  10. word中为代码块儿添加背景色