Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成

一、环境准备

1.1 软件版本

Flink 1.14.4Scala 2.11CDH 6.1.0Hadoop 3.0.0Hive 2.1.1Hudi 0.11.0Flink CDC 2.2.0Mysql 5.7

1.2 Flink 准备

  1. 下载flink 1.14.4 到$HUDI_HOME
wget https://archive.apache.org/dist/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
  1. 解压
tar zxvf flink-1.14.4-bin-scala_2.11.tgz
  1. 下载flink-sql-connector
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar

1.3 Hadoop 准备

  1. 设置Hadoop环境
export HADOOP_CONF_DIR=/etc/hadoop/conf

1.4 Hudi 准备

  1. 下载Hudi 0.11.0 到$HUDI_HOME
wget --no-check-certificate https://dlcdn.apache.org/hudi/0.11.0/hudi-0.11.0.src.tgz
  1. 解压
tar zxvf hudi-0.11.0.src.tgz
  1. 完成后进入 packaging/hudi-flink-bundle 目录,执行命令:
mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
  1. 将packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle_2.11-0.11.0.jar 拷贝到$HUDI_HOME/flink-1.14.4/lib/

1.5 Hive 准备

  1. 在 Hive 的根目录下创建 auxlib 文件夹
  2. 进入packaging/hudi-hadoop-mr-bundle 目录,执行命令:
    mvn clean install -DskipTests
  3. 进入packaging/hudi-hive-sync-bundle 目录,执行命令:
    mvn clean install -DskipTests
  4. 将上面两个打包好的jar包拷贝到 auxlib目录
hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.10.1.jar
hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.10.1.jar

1.6 注意

修改hudi-flink-bundle中的pom.xml文件的Hive版本为集群对应的版本

<properties><hive.version>2.1.1-cdh6.1.0</hive.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties><!--编译报错,在repository中加入-->
<repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

二、kafka + flink + hudi + hive

2.1 启动Flink SQL

bin/yarn-session.sh -nm kafka2hudi -d -qu root.analysis -jm 2048 -tm 4096
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 60000;

2.2 创建一个 kafka 的 source 和 hudi sink,启动 sql 流任务:

CREATE TABLE user_report_topic(uid string,userIp string,countryName string,countryCode string,regionName string,cityName string,ispName string,cVersion string,deviceId string,deviceType string,appType string,flagLevel Array<string>,visitType int,visitTime TIMESTAMP(3),WATERMARK FOR visitTime AS visitTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka','topic' = 'user_report_topic','properties.group.id' = 'user_report_topic_group2','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092','format' = 'json');create table user_report_hudi(uid string,userIp string,countryName string,countryCode string,regionName string,cityName string,ispName string,cVersion string,deviceId string,deviceType string,appType string,PRIMARY KEY(uid) NOT ENFORCED
)
with ('connector' = 'hudi','path' = 'hdfs:///hudi/data/user_report_hudi','table.type' = 'MERGE_ON_READ','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',-- required, hiveServer地址'hive_sync.table'= 'user_report_hudi',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);insert into user_report_hudi select uid ,userIp ,countryName ,countryCode ,regionName ,cityName ,ispName ,cVersion ,deviceId ,deviceType ,appType
from user_report_topic;

通过 Flink UI 可以查看作业运行状态。

2.3 Hive 查询

  1. MOR 生成两个表,COW 只生成一个表
--MOR rt表会比ro表多查询未合并的log数据,(合并的策略可以根据commits数量or时间调整,默认compaction.delta_commits=5)
test.user_report_hudi_ro --查询parquet
test.user_report_hudi_rt --查询parquet 和 log
--COW
test.user_report_hudi
  1. 查询 _rt表报错

    • 日志如下
Caused by: java.lang.IllegalArgumentException: HoodieRealtimeRecordReader can only work on RealtimeSplit and not with hdfs://nameservice1/hudi/data/user_report_hudi/7445853b-1f0d-4d34-9638-74ffe7e99664_0-4-0_20220808134127564.parquet:0+452025at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:310)at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.<init>(CombineHiveRecordReader.java:68)... 16 more
  • 解决方法
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

三、flink cdc + kafka + flink + hudi + hive

3.1 MySQL

  1. 数据准备
USE test;
CREATE TABLE test.products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE test.products AUTO_INCREMENT = 101;INSERT INTO test.products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");CREATE TABLE test.orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;INSERT INTO test.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

3.2 启动Flink SQL

bin/yarn-session.sh -nm mysql2hudi -d -qu root.analysis -jm 2048 -tm 4096
bin/sql-client.sh embedded
SET execution.checkpointing.interval = 60000;

3.3 Flink CDC

Mysql数据库中的表 products, orders 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'xx.xx.xx.73','port' = '3306','username' = 'root','password' = '123','database-name' = 'test','table-name' = 'products');CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'xx.xx.xx.73','port' = '3306','username' = 'root','password' = '123','database-name' = 'test','table-name' = 'orders');

3.4 Kafka

  1. 创建kafka表
 CREATE TABLE products_cdc2kafka(id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'kafka','topic' = 'products_cdc2kafka_topic','properties.group.id' = 'products_cdc2kafka_group','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092','format' = 'debezium-json');CREATE TABLE orders_cdc2kafka(order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'kafka','topic' = 'orders_cdc2kafka_topic','properties.group.id' = 'orders_cdc2kafka_group','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.25:9092,xx.xx.xx.26:9092,xx.xx.xx.27:9092','format' = 'debezium-json');
  1. 将数据写入kafka
insert into products_cdc2kafka
select * from products;insert into orders_cdc2kafka
select * from orders;

3.5 Hudi

  1. 创建hudi表 MOR
create table products_2hudi (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED
)
WITH ('connector' = 'hudi','path' = 'hdfs:///hudi/data/products_2hudi','table.type' = 'COPY_ON_WRITE','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',     -- required, hiveServer地址'hive_sync.table'= 'products_2hudi',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);create table orders_2hudi (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED
)
WITH ('connector' = 'hudi','path' = 'hdfs:///hudi/data/orders_2hudi','table.type' = 'MERGE_ON_READ','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',     -- required, hiveServer地址'hive_sync.table'= 'orders_2hudi',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);
  1. 将数据写入hudi
insert into products_2hudi
select * from products_cdc2kafka;insert into orders_2hudi
select * from orders_cdc2kafka;
  1. 相同数据源MOR表和COW对比
create table orders_2hudi_cow2 (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED
)
WITH ('connector' = 'hudi','path' = 'hdfs:///hudi/data/orders_2hudi_cow2','table.type' = 'COPY_ON_WRITE','write.bucket_assign.tasks' = '1','write.tasks' = '1','hive_sync.enable'= 'true',-- 开启自动同步hive'hive_sync.mode'= 'hms',-- 自动同步hive模式,默认jdbc模式'hive_sync.metastore.uris'= 'thrift://xx.xx.xx.27:9083',-- hive metastore地址'hive_sync.jdbc_url' = 'jdbc:hive2://xx.xx.xx.27:10000',     -- required, hiveServer地址'hive_sync.table'= 'orders_2hudi_cow2',-- hive 新建表名'hive_sync.db'= 'test',-- hive 新建数据库名'hive_sync.username'= 'admin',-- HMS 用户名'hive_sync.password'= 'admin',-- HMS 密码'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);insert into orders_2hudi_cow2
select * from orders_cdc2kafka;
  • 对比结果
    在初始化数据时,MOR两个表Hive查询都没数据,但是目录hdfs:///hudi/data/orders_2hudi 中有log文件,并且log文件中有数据。
    COW的表已经生成了parquet文件,并且Hive查询有数据

3.6 Hive查询

select * from  test.products_2hudi;
select * from  test.orders_2hudi_rt;
select * from  test.orders_2hudi_ro;

3.7 数据变更测试

  1. Insert
--写入数据
INSERT INTO test.orders VALUES (default, '2022-04-30 10:08:22', 'Raj', 50.50, 101, false);
--查看HDFS上的数据 hadoop fs -ls /hudi/data/orders_2hudi*
--COW 有新增 ,新增一个parquet 文件
--MOR 无新增,有一个log文件INSERT INTO test.orders VALUES (default, '2022-04-30 10:11:09', 'Terry', 15.00, 102, false);
--COW 有新增,新增一个parquet 文件
--MOR 无新增INSERT INTO test.orders VALUES (default, '2022-04-30 12:00:30', 'Jackson', 25.25, 103, false);
--COW 有新增,新增一个parquet 文件
--MOR 无新增--Commits5次
INSERT INTO test.orders VALUES (default, '2022-04-30 12:01:30', 'xiaoming', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt和ro都有数据 ,新增一个parquet 文件INSERT INTO test.orders VALUES (default, '2022-04-30 12:04:30', 'Walet', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增 新增一个log文件INSERT INTO test.orders VALUES (default, '2022-04-30 12:05:30', 'jassy', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增INSERT INTO test.orders VALUES (default, '2022-04-30 12:06:30', 'xiahua', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增INSERT INTO test.orders VALUES (default, '2022-04-30 12:07:30', 'Tommmmm', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro无新增-- 默认5次Commits会合并一次
INSERT INTO test.orders VALUES (default, '2022-04-30 12:07:30', 'kkk', 25.25, 104, false);
--COW 有新增,新增一个parquet 文件
--MOR rt有新增 ro有新增(比COW和rt晚checkpoint时间) ,新增一个parquet 文件
  1. Update
--每次Update超过一次Checkpoint时间间隔update test.orders set price=1000 where order_id =10030;update test.orders set customer_name='tomy' where order_id =10029;update test.orders set customer_name='xh' where order_id =10028;update test.orders set customer_name='jasssssssy' where order_id =10027;update test.orders set customer_name='waletttt' where order_id =10026;--Update 结果
--COW 数据正常被修改
--MOR 修改的那条数据rt和ro的数据都丢失了
--在dd群里问玉兆老师,最近有一个改动会在0.12.0发布,https://github.com/apache/hudi/pull/6286
  1. Delete
--每次Delete超过一次Checkpoint时间间隔
delete from test.orders where order_id=10005;delete from test.orders where order_id=10006;delete from test.orders where order_id=10007;delete from test.orders where order_id=10008;delete from test.orders where order_id=10009;delete from test.orders where order_id=10010;--Delete 结果
--COW 数据正常被删除
--MOR 数据正常被删除

The End.

Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成相关推荐

  1. 金蝶天燕行政事业GKIS标准版10.0 金蝶天燕GKIS标准版V10.0 金蝶天燕V9.0 金蝶GKIS高级版9.0 金蝶KIS行政事业14.0 13.0 12.1 行政事业12.0 11.0 9.0

    金蝶天燕GKIS标准版V10.0 金蝶天燕行政事业GKIS标准版V10.0 金蝶天燕10.0标准版 金蝶天燕V9.0标准版 金蝶KIS行政事业版14.0 金蝶KIS行政事业13.0 12.1 12.0 ...

  2. frp 0.11.0 发布新版,支持很多新功能

    2019独角兽企业重金招聘Python工程师标准>>> 内网穿透工具 frp 0.11.0 发布了. 新增: 增加支持 unix域套接字 的 Plugin. 增加 http prox ...

  3. 假设用于通信的电文由字符集{a,b,c,d,e,f,g}中的字母构成。 它们在电文中出现的频度分别为{0.31,0.16,0.10,0.08,0.11,0.20,0.04}。【MOOC答案】

    目   录 1.题目 2.答案and详细题解过程 1)为这7个字母设计哈夫曼编码: 1.1.答案 1.2.详细题解过程 2)为这7个字母设计等长编码,至少需要几位二进制数?[3位] 2.1.答案 2. ...

  4. TagScanner(MP3标签编辑批量更名)v6.0.11.0免费版

    名称:TagScanner(MP3标签编辑批量更名)v6.0.11.0免费版 版本:6.0.11.0 软件大小:2.96 MB 软件语言:多国语言 软件授权:免费版 应用平台:WinXP/Win7/W ...

  5. Android 10.0 11.0 12.0 启动模拟器教程

    <<返回总目录 Android 10.0 11.0 12.0 启动模拟器教程 一.android 12.0 模拟器 二.安装android 10.0 11.0 12.0 SDK平台 三.创 ...

  6. OMNI USDT 0.11.0 环境部署

    文章目录 一.生成Omni Core v0.11.0版本镜像 二.启动Omni Core v0.11.0版本容器 三.查看日志 一.生成Omni Core v0.11.0版本镜像 编写Dockerfi ...

  7. Android 9.0 10.0 11.0 开机动画支持mp4 视频播放

    1.概述 在9.0 10.0 11.0 的产品定制开发中,在开机流程中,是在开机kenel部分都是播放的开机log,等kenel启动完成后进入系统后这时播放的是开机动画,由于开发需要要求开机动画换成支 ...

  8. hive 0.11 mysql_Hive的升级(0.8.0到0.11.0)

    hive-0.11.0出来后,得知有将row_number进行封装,对于我这等不懂java的人士来说,是莫大的幸福啊!毫不犹豫的将0.8.0抛弃了! hive-0.11.0出来后,得知有将row_nu ...

  9. Apache Doris 0.11.x 版本升级

    背景 项目计划基于 Apache Doris 构建在线实时数据查询平台,目前准生产环境部署的版本为 0.10.13-release ,来自百度内部发布的分支版本,和 Apache 开源社区版本兼容,百 ...

最新文章

  1. forms oracle runtime_FRM-92101:forms Server在启动过程中失败
  2. webApp之meta标签
  3. java 获取文件权限_Java中的文件权限,检查权限和更改权限 - Break易站
  4. 「干货」什么Linux是邮件服务器?
  5. 在JUnit中超越核心Hamcrest
  6. 苦B的程序猿道路数据验证
  7. Linux 命令(106)—— chkconfig 命令
  8. 【R图秀-6】地震来了
  9. 2021计算机组装视频,2021年做影视后期电脑配置单推荐PR AE软件视频后期电脑组装...
  10. 高等数学:第八章 多元函数的微分法及其应用(6)微分法在几何上的应用
  11. springmvc ResponseEntity 下载文件损坏问题解决方法
  12. 你知道中国历届奥运会中获得了多少金牌吗?
  13. 〖Python 数据库开发实战 - Python与Redis交互篇⑮〗- 综合案例 - 新闻管理系统 - 更新所编辑新闻状态(根据输入内容进行保存操作)
  14. QT开发MQTT 之 MQTT 编译
  15. 本地笔记本连接sqlserver数据库连接失败 证书链是由不受信任的颁发机构颁发的
  16. [CSS]常见布局技巧
  17. 微信小程序 # 轮播图swiper滑动到最后一页进行页面跳转
  18. 优秀录屏工具:Screenium 3 for Mac
  19. IOC容器中bean的生命周期,iocbean生命周期
  20. 聚焦边缘创新,Rancher推出全新开源力作Octopus

热门文章

  1. 对口河北高考计算机试题,河北对口高考计算机模拟题四
  2. TFN T300 H系列 (6GHZ) 手持天馈线测试仪 ——基站测试分析专家
  3. 2008新年新开始,搞点搞笑谜语猜一猜
  4. 海天蚝油《挑战不可能》最强记忆大师夫妻迎战31200种可能
  5. VMware 10设备CentOs 6.5
  6. 我的vim配置和solardark主题
  7. srtp移植到android平台
  8. 第一次迭代开发总结-汽车租赁系统
  9. 我的世界服务器玩家在线指令,我的世界指令大全 各类指令汇总
  10. 自定义View-自制简单的钟表