Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成
Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成
一、环境准备
1.1 软件版本
Flink 1.14.4Scala 2.11CDH 6.1.0Hadoop 3.0.0Hive 2.1.1Hudi 0.11.0Flink CDC 2.2.0Mysql 5.7
1.2 Flink 准备
- 下载flink 1.14.4 到$HUDI_HOME
wget https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
- 解压
tar zxvf flink-1.14.4-bin-scala_2.11.tgz
- 下载flink-sql-connector
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar
1.3 Hadoop 准备
- 设置Hadoop环境
export HADOOP_CONF_DIR=/etc/hadoop/conf
1.4 Hudi 准备
- 下载Hudi 0.11.0 到$HUDI_HOME
wget --no-check-certificate https://dlcdn.apache.org/hudi/0.11.0/hudi-0.11.0.src.tgz
- 解压
tar zxvf hudi-0.11.0.src.tgz
- 完成后进入 packaging/hudi-flink-bundle 目录,执行命令:
mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
- 将packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle_2.11-0.11.0.jar 拷贝到$HUDI_HOME/flink-1.14.4/lib/
1.5 Hive 准备
- 在 Hive 的根目录下创建 auxlib 文件夹
- 进入packaging/hudi-hadoop-mr-bundle 目录,执行命令:
mvn clean install -DskipTests - 进入packaging/hudi-hive-sync-bundle 目录,执行命令:
mvn clean install -DskipTests - 将上面两个打包好的jar包拷贝到 auxlib目录
hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.10.1.jar
hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.10.1.jar
1.6 注意
修改hudi-flink-bundle中的pom.xml文件的Hive版本为集群对应的版本
<properties><hive.version>2.1.1-cdh6.1.0</hive.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties><!--编译报错,在repository中加入-->
<repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
二、kafka + flink + hudi + hive
2.1 启动Flink SQL
bin/yarn-session.sh -nm kafka2hudi -d -qu root.analysis -jm 2048 -tm 4096
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 60000;
2.2 创建一个 kafka 的 source 和 hudi sink,启动 sql 流任务:
CREATE TABLE user_report_topic(uid string,userIp string,countryName string,countryCode string,regionName string,cityName string,ispName string,cVersion string,deviceId string,deviceType string,appType string,flagLevel Array<string>,visitType int,visitTime TIMESTAMP(3),WATERMARK FOR visitTime AS visitTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka','topic' = 'user_report_topic','properties.group.id' = 'user_report_topic_group2','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092','format' = 'json');create table user_report_hudi(uid string,userIp string,countryName string,countryCode string,regionName string,cityName string,ispName string,cVersion string,deviceId string,deviceType string,appType string,PRIMARY KEY(uid) NOT ENFORCED
)
with ('connector' = 'hudi','path' = 'hdfs:///hudi/data/user_report_hudi','table.type' = 'MERGE_ON_READ','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',-- required, hiveServer地址'hive_sync.table'= 'user_report_hudi',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);insert into user_report_hudi select uid ,userIp ,countryName ,countryCode ,regionName ,cityName ,ispName ,cVersion ,deviceId ,deviceType ,appType
from user_report_topic;
通过 Flink UI 可以查看作业运行状态。
2.3 Hive 查询
- MOR 生成两个表,COW 只生成一个表
--MOR rt表会比ro表多查询未合并的log数据,(合并的策略可以根据commits数量or时间调整,默认compaction.delta_commits=5)
test.user_report_hudi_ro --查询parquet
test.user_report_hudi_rt --查询parquet 和 log
--COW
test.user_report_hudi
- 查询 _rt表报错
- 日志如下
Caused by: java.lang.IllegalArgumentException: HoodieRealtimeRecordReader can only work on RealtimeSplit and not with hdfs://nameservice1/hudi/data/user_report_hudi/7445853b-1f0d-4d34-9638-74ffe7e99664_0-4-0_20220808134127564.parquet:0+452025at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:310)at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.<init>(CombineHiveRecordReader.java:68)... 16 more
- 解决方法
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
三、flink cdc + kafka + flink + hudi + hive
3.1 MySQL
- 数据准备
USE test;
CREATE TABLE test.products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE test.products AUTO_INCREMENT = 101;INSERT INTO test.products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");CREATE TABLE test.orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;INSERT INTO test.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
3.2 启动Flink SQL
bin/yarn-session.sh -nm mysql2hudi -d -qu root.analysis -jm 2048 -tm 4096
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 60000;
3.3 Flink CDC
Mysql数据库中的表 products, orders 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'xx.xx.xx.73','port' = '3306','username' = 'root','password' = '123','database-name' = 'test','table-name' = 'products');CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'xx.xx.xx.73','port' = '3306','username' = 'root','password' = '123','database-name' = 'test','table-name' = 'orders');
3.4 Kafka
- 创建kafka表
CREATE TABLE products_cdc2kafka(id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'kafka','topic' = 'products_cdc2kafka_topic','properties.group.id' = 'products_cdc2kafka_group','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092','format' = 'debezium-json');CREATE TABLE orders_cdc2kafka(order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'kafka','topic' = 'orders_cdc2kafka_topic','properties.group.id' = 'orders_cdc2kafka_group','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092','format' = 'debezium-json');
- 将数据写入kafka
insert into products_cdc2kafka
select * from products;insert into orders_cdc2kafka
select * from orders;
3.5 Hudi
- 创建hudi表 MOR
create table products_2hudi (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED
)
WITH ('connector' = 'hudi','path' = 'hdfs:///hudi/data/products_2hudi','table.type' = 'COPY_ON_WRITE','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000', -- required, hiveServer地址'hive_sync.table'= 'products_2hudi',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);create table orders_2hudi (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED
)
WITH ('connector' = 'hudi','path' = 'hdfs:///hudi/data/orders_2hudi','table.type' = 'MERGE_ON_READ','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000', -- required, hiveServer地址'hive_sync.table'= 'orders_2hudi',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);
- 将数据写入hudi
insert into products_2hudi
select * from products_cdc2kafka;insert into orders_2hudi
select * from orders_cdc2kafka;
- 相同数据源MOR表和COW对比
create table orders_2hudi_cow2 (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED
)
WITH ('connector' = 'hudi','path' = 'hdfs:///hudi/data/orders_2hudi_cow2','table.type' = 'COPY_ON_WRITE','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000', -- required, hiveServer地址'hive_sync.table'= 'orders_2hudi_cow2',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);insert into orders_2hudi_cow2
select * from orders_cdc2kafka;
- 对比结果
在初始化数据时,MOR两个表Hive查询都没数据,但是目录hdfs:///hudi/data/orders_2hudi 中有log文件,并且log文件中有数据。
COW的表已经生成了parquet文件,并且Hive查询有数据
3.6 Hive查询
select * from test.products_2hudi;
select * from test.orders_2hudi_rt;
select * from test.orders_2hudi_ro;
3.7 数据变更测试
- Insert
--写入数据
INSERT INTO test.orders VALUES (default, '2022-04-30 10:08:22', 'Raj', 50.50, 101, false);
--查看HDFS上的数据 hadoop fs -ls /hudi/data/orders_2hudi*
--COW 有新增 ,新增一个parquet 文件
--MOR 无新增,有一个log文件INSERT INTO test.orders VALUES (default, '2022-04-30 10:11:09', 'Terry', 15.00, 102, false);
--COW 有新增,新增一个parquet 文件
--MOR 无新增INSERT INTO test.orders VALUES (default, '2022-04-30 12:00:30', 'Jackson', 25.25, 103, false);
--COW 有新增,新增一个parquet 文件
--MOR 无新增--Commits5次
INSERT INTO test.orders VALUES (default, '2022-04-30 12:01:30', 'xiaoming', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt和ro都有数据 ,新增一个parquet 文件INSERT INTO test.orders VALUES (default, '2022-04-30 12:04:30', 'Walet', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增 新增一个log文件INSERT INTO test.orders VALUES (default, '2022-04-30 12:05:30', 'jassy', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增INSERT INTO test.orders VALUES (default, '2022-04-30 12:06:30', 'xiahua', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增INSERT INTO test.orders VALUES (default, '2022-04-30 12:07:30', 'Tommmmm', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增-- 默认5次Commits会合并一次
INSERT INTO test.orders VALUES (default, '2022-04-30 12:07:30', 'kkk', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro有新增(比COW和rt晚checkpoint时间) ,新增一个parquet 文件
- Update
--每次Update超过一次Checkpoint时间间隔update test.orders set price=1000 where order_id =10030;update test.orders set customer_name='tomy' where order_id =10029;update test.orders set customer_name='xh' where order_id =10028;update test.orders set customer_name='jasssssssy' where order_id =10027;update test.orders set customer_name='waletttt' where order_id =10026;--Update 结果
--COW 数据正常被修改
--MOR 修改的那条数据rt和ro的数据都丢失了
--在dd群里问玉兆老师,最近有一个改动会在0.12.0发布,https://github.com/apache/hudi/pull/6286
- Delete
--每次Delete超过一次Checkpoint时间间隔
delete from test.orders where order_id=10005;delete from test.orders where order_id=10006;delete from test.orders where order_id=10007;delete from test.orders where order_id=10008;delete from test.orders where order_id=10009;delete from test.orders where order_id=10010;--Delete 结果
--COW 数据正常被删除
--MOR 数据正常被删除
The End.
Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成相关推荐
- 金蝶天燕行政事业GKIS标准版10.0 金蝶天燕GKIS标准版V10.0 金蝶天燕V9.0 金蝶GKIS高级版9.0 金蝶KIS行政事业14.0 13.0 12.1 行政事业12.0 11.0 9.0
金蝶天燕GKIS标准版V10.0 金蝶天燕行政事业GKIS标准版V10.0 金蝶天燕10.0标准版 金蝶天燕V9.0标准版 金蝶KIS行政事业版14.0 金蝶KIS行政事业13.0 12.1 12.0 ...
- frp 0.11.0 发布新版,支持很多新功能
2019独角兽企业重金招聘Python工程师标准>>> 内网穿透工具 frp 0.11.0 发布了. 新增: 增加支持 unix域套接字 的 Plugin. 增加 http prox ...
- 假设用于通信的电文由字符集{a,b,c,d,e,f,g}中的字母构成。 它们在电文中出现的频度分别为{0.31,0.16,0.10,0.08,0.11,0.20,0.04}。【MOOC答案】
目 录 1.题目 2.答案and详细题解过程 1)为这7个字母设计哈夫曼编码: 1.1.答案 1.2.详细题解过程 2)为这7个字母设计等长编码,至少需要几位二进制数?[3位] 2.1.答案 2. ...
- TagScanner(MP3标签编辑批量更名)v6.0.11.0免费版
名称:TagScanner(MP3标签编辑批量更名)v6.0.11.0免费版 版本:6.0.11.0 软件大小:2.96 MB 软件语言:多国语言 软件授权:免费版 应用平台:WinXP/Win7/W ...
- Android 10.0 11.0 12.0 启动模拟器教程
<<返回总目录 Android 10.0 11.0 12.0 启动模拟器教程 一.android 12.0 模拟器 二.安装android 10.0 11.0 12.0 SDK平台 三.创 ...
- OMNI USDT 0.11.0 环境部署
文章目录 一.生成Omni Core v0.11.0版本镜像 二.启动Omni Core v0.11.0版本容器 三.查看日志 一.生成Omni Core v0.11.0版本镜像 编写Dockerfi ...
- Android 9.0 10.0 11.0 开机动画支持mp4 视频播放
1.概述 在9.0 10.0 11.0 的产品定制开发中,在开机流程中,是在开机kenel部分都是播放的开机log,等kenel启动完成后进入系统后这时播放的是开机动画,由于开发需要要求开机动画换成支 ...
- hive 0.11 mysql_Hive的升级(0.8.0到0.11.0)
hive-0.11.0出来后,得知有将row_number进行封装,对于我这等不懂java的人士来说,是莫大的幸福啊!毫不犹豫的将0.8.0抛弃了! hive-0.11.0出来后,得知有将row_nu ...
- Apache Doris 0.11.x 版本升级
背景 项目计划基于 Apache Doris 构建在线实时数据查询平台,目前准生产环境部署的版本为 0.10.13-release ,来自百度内部发布的分支版本,和 Apache 开源社区版本兼容,百 ...
最新文章
- forms oracle runtime_FRM-92101:forms Server在启动过程中失败
- webApp之meta标签
- java 获取文件权限_Java中的文件权限,检查权限和更改权限 - Break易站
- 「干货」什么Linux是邮件服务器?
- 在JUnit中超越核心Hamcrest
- 苦B的程序猿道路数据验证
- Linux 命令(106)—— chkconfig 命令
- 【R图秀-6】地震来了
- 2021计算机组装视频,2021年做影视后期电脑配置单推荐PR AE软件视频后期电脑组装...
- 高等数学:第八章 多元函数的微分法及其应用(6)微分法在几何上的应用
- springmvc ResponseEntity 下载文件损坏问题解决方法
- 你知道中国历届奥运会中获得了多少金牌吗?
- 〖Python 数据库开发实战 - Python与Redis交互篇⑮〗- 综合案例 - 新闻管理系统 - 更新所编辑新闻状态(根据输入内容进行保存操作)
- QT开发MQTT 之 MQTT 编译
- 本地笔记本连接sqlserver数据库连接失败 证书链是由不受信任的颁发机构颁发的
- [CSS]常见布局技巧
- 微信小程序 # 轮播图swiper滑动到最后一页进行页面跳转
- 优秀录屏工具:Screenium 3 for Mac
- IOC容器中bean的生命周期,iocbean生命周期
- 聚焦边缘创新,Rancher推出全新开源力作Octopus