Hudi-Flink CDC将MySQL数据写入hudi
CDC概念
CDC类型
CDC数据入湖
Flink CDC Hudi概述
实践
准备工作
1.编译hudi源码
2.将Flink CDC MySQL对应jar包,放到$FLINK_HOME/lib目录中
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.2</version>
</dependency>
实现
零、组件版本
hudi:0.10.1
flink:1.13.1
hive:2.3.9
Hadoop:2.8.0
Scala:2.12.4
一、MySQL配置
server-id=2
log-bin=mysql-bin
binlog_format=rot
expire_logs_day=15
binlog_row_image=full
service mysqld restart
show master logs
create database test_hudi;
create table test_hudi.tbl_users(id bigint auto_increment primary key,name varchar(20) null,birthday timestamp default CURRENT_TIMESTAMP NOT NULL,ts timestamp default CURRENT_TIMESTAMP NOT NULL
);INSERT INTO test_hudi.tbl_users(name) VALUES(“测试”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“张三”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“李四”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“王五”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“赵六");
二、创建Flink CDC表,关联到MySQL数据库表
HDFShadoop-daemon.sh start namenodehadoop-daemon.sh start datanode
Hive./hive --service metastore &./hive --service hiveserver2 &
Flink standalonestart-cluster.sh
启动Flink SQL Client客户端sql-client.sh embedded -j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/flink-connector-mysql-cdc-1.4.0.jar shell
-j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/hudi-hive-sync-bundle-0.10.1.jar设置属性set sql-client.execution.result-mode=tableau;set execution.checkpointing.interval=3sec;
CREATE TABLE users_source_mysql(id BIGINT PRIMARY KEY NOT ENFORCED,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3)
)WITH('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '1234qwer','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'test_hudi','table-name' = 'tbl_users'
);
Flink SQL> select * from users_source_mysql;
+----+----------------------+--------------------------------+-------------------------+-------------------------+
| op | id | name | birthday | ts |
+----+----------------------+--------------------------------+-------------------------+-------------------------+
| +I | 1 | 测试 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 |
参数值 | 描述 |
initial(默认) | 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。 |
initial_only | 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。 |
schema_only | 连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录 |
schema_only_recovery | 设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。 |
三、创建视图,查询输入表,字段与输出表相同
CREATE VIEW view_users_cdc AS
SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS part FROM users_source_mysql;
Flink SQL> select * From view_users_cdc;
+----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+
| op | id | name | birthday | ts | part |
+----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+
| +I | 1 | 测试 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 | 20220315 |
| +I | 2 | 张三 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 |
| +I | 3 | 李四 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 |
| +I | 4 | 王五 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 |
| +I | 5 | 赵六 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 | 20220317 |
四、创建CDC Hudi Sink 表,并自动同步hive分区
CREATE TABLE users_sink_hudi_hive(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),part VARCHAR(20),primary key(id) not enforced
)
PARTITIONED BY (part)
with('connector'='hudi','path'= 'hdfs://localhost:9000/hudi-warehouse/users_sink_hudi_hive', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键, 'write.precombine.field'= 'ts'-- 自动precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ, 'compaction.async.enabled'= 'true'-- 是否开启异步压缩, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩, 'compaction.delta_commits'= '1'-- 默认为5, 'changelog.enabled'= 'true'-- 开启changelog变更, 'read.streaming.enabled'= 'true'-- 开启流读, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s, 'hive_sync.enable'= 'true'-- 开启自动同步hive, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, hms:hive metastore, 'hive_sync.metastore.uris'= 'thrift://localhost:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://localhost:10000'-- hiveServer地址, 'hive_sync.table'= 'users_sink_hudi_hive_sync'-- hive 新建表名, 'hive_sync.db'= 'db_hudi'-- hive 新建数据库名, 'hive_sync.username'= ''-- HMS 用户名, 'hive_sync.password'= ''-- HMS 密码, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);
五、视图数据写入hudi表
INSERT INTO users_sink_hudi_hive
SELECT id,name,birthday,ts,part FROM view_users_cdc;
六、Hive表查询
beeline -u jdbc:hive2://localhost:10000 -n root -p 123456
set hive.exec.mode.local.auto = true;-- 不添加的话会导致count数据和select * 数据量不一致
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hive.mapred.mode = nonstrict;
Hudi-Flink CDC将MySQL数据写入hudi相关推荐
- 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...
- Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...
- Flink CDC入门实践--基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
文章目录 前言 1.环境准备 2.准备数据 2.1 MySQL 2.2 postgres 3.启动flink和flink sql client 3.1启动flink 3.2启动flink SQL cl ...
- 关于flink cdc 抽取oracle数据 oracle表名大小写的问题
使用flink cdc 抽取oracle数据 报错ALTER TABLE XXXXX ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; 我使用的是flink cdc的 ...
- Flink CDC 将MySQL的数据写入Hudi实践
Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...
- flink定时读取mysql数据_flink时间系统系列之实例讲解:如何做定时输出
flink时间系统系列篇幅目录: 六.实例讲解:如何做定时输出 今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理 ...
- 实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu
作者:于乐,腾讯 CSIG 工程师 解决方案描述 概述 Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracl ...
- mysql数据写入磁盘的原理_WAL(Write Ahead Log)机制解析
WAL即 Write Ahead Log,WAL的主要意思是说在将元数据的变更操作写入磁盘之前,先预先写入到一个log文件中.为什么要先写日志文件呢,我们一步一步的来探索. 基础数据了解 首先,我们需 ...
- oracle 两表两列数据对比_Oracle、PostgreSQL与Mysql数据写入性能对比
最近因为工作需要,需要对Oracle和Mysql写入性能进行对比,以前都是听说Mysql性能比Oracle不是一个级别,现在亲测后,不比不知道,一比吓一跳... 追加PostgreSql性能测试数据 ...
- oracle读写速率,Oracle、PostgreSQL与Mysql数据写入性能对比
最近因为工作需要,需要对Oracle和Mysql写入性能进行对比,以前都是听说Mysql性能比Oracle不是一个级别,现在亲测后,不比不知道,一比吓一跳... 追加PostgreSql性能测试数据 ...
最新文章
- 计算机网络管理与安全探索
- 第一章 初识Mysql
- 编译Android VNC Server
- STL Vector使用例程
- mysql util_关于mysql数据库操作工具类MySQLUtils用于连接数据提交sql脚本及结果转为JSONArray等操作...
- 交换排序-经典的快速排序算法总结
- 修改器内置脚本编写_Node.js 中实践 Redis Lua 脚本
- python特性描述_详解 Python 最优雅的特性之一 — 描述符
- 881.BoatstoSavePeople
- 绝不因寂寞而爱上别人
- 区块链学堂——公有链、私有链、联盟链、侧链、互联链
- 从音箱入门到高手必看知识
- 虚拟化安全防护系统部署在安全服务器上,虚拟化安全及解决方案
- linux cd系统下载地址,Mandriva 2008.0 正式版 CD、DVD HTTP下载地址
- Http状态405-方法不允许
- linux清理缓存和垃圾,CentOS等Linux系统如何清理系统垃圾和日志?
- kcl计算机科学与技术研究生,KCL的Data Science「伦敦大学国王学院数据科学理学硕士」...
- 图片上怎么加文字?看完就你知道了
- 欧拉路和欧拉回路知识
- python实训报告5000字_测量实训报告范文5000字