CDC概念

CDC全称是Change data Cpature,即变更数据捕获,主要面向数据库的变更,是数据库领域非常常见的技术,主要用于捕获数据库的一些变更,然后可以把变更数据发送到下游。

CDC类型

1.基于查询的,客户端会通过SQL方式查询源库表变更数据,然后对外发送。
2.基于日志的,这也是业界广泛使用的一种方式,一般是通过binlog方式,变更的记录会写入binlog,解析binlog后会写入消息系统,或直接基于Flink CDC进行处理。

CDC数据入湖

基于CDC数据的入湖,架构:上游各种各样的数据源,比如DB的变更数据、事件流,以及各种外部数据源,都可以通过变更流的方式写入表中,再进行外部的查询分析
典型CDC入湖的链路:
链路1是大部分公司采取的链路,前面CDC的数据先通过CDC工具导入kafka或者Pulsar,再通过Flink或者是spark流式消费写到Hudi里
链路2是通过Flink CDC直联到MySQL上游数据源,直接写到下游hudi表。

Flink CDC Hudi概述

基于Flink CDC技术,实时采集MySQL数据库表数据,进行ETL转换处理,最终存储Hudi表

实践

MySQL数据库创建表,实时添加数据,通过Flink CDC将数据写入Hudi表,并且Hudi与Hive集成,自动在hive中创建表与添加分区信息,最后hive终端beeline查询分析数据。
hudi表与hive表自动关联集成,需要重新编译hudi源码,指定hive版本及编译时包含hive依赖jar包
1.MySQL数据库,创建表及开启binlog
2.创建flink CDC表,关联到MySQL数据库表
3.创建视图,数据来源输入表,字段与输出表相同
4.创建输出表,关联到hudi表,自动同步到hive中,字段与hudi表相同
5.查询视图数据,插入到输出表(hudi表)
6.查询hive表数据,ro类型(读优化查询)和rt类型(快照查询)

准备工作

1.编译hudi源码

修改hudi集成flink和hive编译依赖版本配置
原因:现在版本hudi,在编译的时候后本身默认已经集成了flink-SQL-connector-hive的包,会和flink lib包下的flink-SQL-connector-hive冲突。所以,编译的过程中只修改hive编译版本
文件: hudi-0.10.1/packaging/hudi-flink-bundle/pom.xml,将hive.version改为2.3.9
<include>org.apache.flink:flink-sql-connector-hive-2.3.9_${scala.binary.version}</include>
该问题从 0.10 版本已经解决

mvn clean install -DskipTests -Drat.skip=true -Dscala-2.12 -Dspark3 -Dhive.version=2.3.9 -Pflink-bundle-shade-hive2
编译完成后,有2个jar包,非常重要
hudi-flink-bundle_2.12-0.10.1.jar位于hudi-0.10.1/packaging/hudi-flink-bundle/target,flink用来写入和读取数据,将其拷贝至$FLINK_HOME/lib目录中,如果以前有同名jar包,先删除再拷贝。
hudi-hadoop-mr-bundle-0.10.1.jar位于hudi-0.10.1/packaging/hudi-hadoop-mr-bundle/target,hive需要用来读hudi数据,将其拷贝至$HIVE_HOME/lib目录中。

2.将Flink CDC MySQL对应jar包,放到$FLINK_HOME/lib目录中

建议用新版的CDC2,因为功能差了好多,CDC1的时候是把数据先全都读取到内存,再执行后续操作,CDC2是边读边执行后续操作,这么一比,CDC1被秒杀。
CDC1:
https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc/1.4.0
flink-sql-connector-mysql-cdc-1.4.0.jar
CDC2[升级版]:
flink-connector-mysql-cdc-2.0.2.jar
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.2</version>
</dependency>

实现

零、组件版本

hudi:0.10.1
flink:1.13.1
hive:2.3.9
Hadoop:2.8.0
Scala:2.12.4

一、MySQL配置

1.MySQL数据库,创建表及开启binlog
vim /etc/my.cnf
在[mysqld]下面添加内容
server-id=2
log-bin=mysql-bin
binlog_format=rot
expire_logs_day=15
binlog_row_image=full

2.重启MySQL
service mysqld restart

查看是否生效
show master logs

3.建表
create database test_hudi;
create table test_hudi.tbl_users(id bigint auto_increment primary key,name varchar(20) null,birthday timestamp default CURRENT_TIMESTAMP NOT NULL,ts timestamp default CURRENT_TIMESTAMP NOT NULL
);INSERT INTO test_hudi.tbl_users(name) VALUES(“测试”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“张三”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“李四”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“王五”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“赵六");

二、创建Flink CDC表,关联到MySQL数据库表

1.启动相关服务
HDFShadoop-daemon.sh start namenodehadoop-daemon.sh start datanode
Hive./hive --service metastore &./hive --service hiveserver2  &
Flink standalonestart-cluster.sh
启动Flink SQL Client客户端sql-client.sh embedded -j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/flink-connector-mysql-cdc-1.4.0.jar shell
-j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/hudi-hive-sync-bundle-0.10.1.jar设置属性set  sql-client.execution.result-mode=tableau;set execution.checkpointing.interval=3sec;

2.创建输入表,关联MySQL表,采用MySQL CDC关联
CREATE TABLE users_source_mysql(id BIGINT PRIMARY KEY NOT ENFORCED,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3)
)WITH('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '1234qwer','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'test_hudi','table-name' = 'tbl_users'
);
Flink SQL> select * from users_source_mysql;
+----+----------------------+--------------------------------+-------------------------+-------------------------+
| op |                   id |                           name |                birthday |                      ts |
+----+----------------------+--------------------------------+-------------------------+-------------------------+
| +I |                    1 |                           测试 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 |

使用CDC2必须配置一下主键,因为参数【scan.incremental.snapshot.enabled】默认为true,增量读取就必须配置PK,如果不做增量读取,直接改为false即可。
debezium.snapshot.mode枚举值:
参数值 描述
initial(默认) 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。
initial_only 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。
schema_only 连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录
schema_only_recovery 设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。 

三、创建视图,查询输入表,字段与输出表相同

创建视图,增加分区列part,方便后续同步hive分区表
CREATE VIEW view_users_cdc AS
SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS part FROM users_source_mysql;
Flink SQL> select * From view_users_cdc;
+----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+
| op |                   id |                           name |                birthday |                      ts |                           part |
+----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+
| +I |                    1 |                           测试 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 |                       20220315 |
| +I |                    2 |                           张三 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
| +I |                    3 |                           李四 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
| +I |                    4 |                           王五 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
| +I |                    5 |                           赵六 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |

四、创建CDC Hudi Sink 表,并自动同步hive分区


CREATE TABLE users_sink_hudi_hive(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),part VARCHAR(20),primary key(id) not enforced
)
PARTITIONED BY (part)
with('connector'='hudi','path'= 'hdfs://localhost:9000/hudi-warehouse/users_sink_hudi_hive', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键, 'write.precombine.field'= 'ts'-- 自动precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ, 'compaction.async.enabled'= 'true'-- 是否开启异步压缩, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩, 'compaction.delta_commits'= '1'-- 默认为5, 'changelog.enabled'= 'true'-- 开启changelog变更, 'read.streaming.enabled'= 'true'-- 开启流读, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s, 'hive_sync.enable'= 'true'-- 开启自动同步hive, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, hms:hive metastore, 'hive_sync.metastore.uris'= 'thrift://localhost:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://localhost:10000'-- hiveServer地址, 'hive_sync.table'= 'users_sink_hudi_hive_sync'-- hive 新建表名, 'hive_sync.db'= 'db_hudi'-- hive 新建数据库名, 'hive_sync.username'= ''-- HMS 用户名, 'hive_sync.password'= ''-- HMS 密码, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);

五、视图数据写入hudi表

INSERT INTO users_sink_hudi_hive
SELECT id,name,birthday,ts,part FROM view_users_cdc;

六、Hive表查询

需要将hudi-hadoop-mr-bundle-0.10.1.jar包,放到$HIVE_HOME/lib下
启动beeline客户端,连接hiveserver2
beeline -u jdbc:hive2://localhost:10000 -n root -p 123456

我吐了,我的hive没生成下边这两张表,我各种操作都TM不行,这个情况先保留,以后再看
会自动生成hudi MOR模式的两张表
users_sink_hudi_hive_ro:ro表全称read optimized table,对于MOR表同步的xxx_ro表,只暴露压缩后的parquet。其查询方式和COW表类似。设置完hiveInputformat之后和普通的hive表一样查询即可。
users_sink_hudi_rt:rt表示增量视图,主要针对增量查询的rt表;ro表只能查parquet文件数据,rt表parquet文件数据和log文件数据都可查。
查看hive表数据
set hive.exec.mode.local.auto = true;-- 不添加的话会导致count数据和select * 数据量不一致
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hive.mapred.mode = nonstrict; 

Hudi-Flink CDC将MySQL数据写入hudi相关推荐

  1. 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...

  2. Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...

  3. Flink CDC入门实践--基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

    文章目录 前言 1.环境准备 2.准备数据 2.1 MySQL 2.2 postgres 3.启动flink和flink sql client 3.1启动flink 3.2启动flink SQL cl ...

  4. 关于flink cdc 抽取oracle数据 oracle表名大小写的问题

    使用flink cdc 抽取oracle数据 报错ALTER TABLE XXXXX ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; 我使用的是flink cdc的 ...

  5. Flink CDC 将MySQL的数据写入Hudi实践

    Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...

  6. flink定时读取mysql数据_flink时间系统系列之实例讲解:如何做定时输出

    flink时间系统系列篇幅目录: 六.实例讲解:如何做定时输出 今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理 ...

  7. 实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu

    作者:于乐,腾讯 CSIG 工程师 解决方案描述 概述 Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracl ...

  8. mysql数据写入磁盘的原理_WAL(Write Ahead Log)机制解析

    WAL即 Write Ahead Log,WAL的主要意思是说在将元数据的变更操作写入磁盘之前,先预先写入到一个log文件中.为什么要先写日志文件呢,我们一步一步的来探索. 基础数据了解 首先,我们需 ...

  9. oracle 两表两列数据对比_Oracle、PostgreSQL与Mysql数据写入性能对比

    最近因为工作需要,需要对Oracle和Mysql写入性能进行对比,以前都是听说Mysql性能比Oracle不是一个级别,现在亲测后,不比不知道,一比吓一跳... 追加PostgreSql性能测试数据 ...

  10. oracle读写速率,Oracle、PostgreSQL与Mysql数据写入性能对比

    最近因为工作需要,需要对Oracle和Mysql写入性能进行对比,以前都是听说Mysql性能比Oracle不是一个级别,现在亲测后,不比不知道,一比吓一跳... 追加PostgreSql性能测试数据 ...

最新文章

  1. 计算机网络管理与安全探索
  2. 第一章 初识Mysql
  3. 编译Android VNC Server
  4. STL Vector使用例程
  5. mysql util_关于mysql数据库操作工具类MySQLUtils用于连接数据提交sql脚本及结果转为JSONArray等操作...
  6. 交换排序-经典的快速排序算法总结
  7. 修改器内置脚本编写_Node.js 中实践 Redis Lua 脚本
  8. python特性描述_详解 Python 最优雅的特性之一 — 描述符
  9. 881.BoatstoSavePeople
  10. 绝不因寂寞而爱上别人
  11. 区块链学堂——公有链、私有链、联盟链、侧链、互联链
  12. 从音箱入门到高手必看知识
  13. 虚拟化安全防护系统部署在安全服务器上,虚拟化安全及解决方案
  14. linux cd系统下载地址,Mandriva 2008.0 正式版 CD、DVD HTTP下载地址
  15. Http状态405-方法不允许
  16. linux清理缓存和垃圾,CentOS等Linux系统如何清理系统垃圾和日志?
  17. kcl计算机科学与技术研究生,KCL的Data Science「伦敦大学国王学院数据科学理学硕士」...
  18. 图片上怎么加文字?看完就你知道了
  19. 欧拉路和欧拉回路知识
  20. python实训报告5000字_测量实训报告范文5000字

热门文章

  1. sklearn.neighbors常用API介绍
  2. 给出三个坐标点,求三角形周长。
  3. 文件转换-----(类型,格式)
  4. 手机抠图怎么变透明底?这个抠图方法快收藏起来
  5. 2018.11.22!今天重温一遍知识点,捋一捋思路
  6. Quorum快速部署
  7. 算法——Horner scheme
  8. c语言中next如何用,C++ STL prev()和next()函数用法详解
  9. Hbase shell练习
  10. 安装gms 的拨号盘