Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖
▼ 关注「Apache Flink」,获取更多技术干货 ▼
摘要:本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。
Flink-CDC 项目地址:
https://github.com/ververica/flink-cdc-connectors
Tips:点击「阅读原文」查看更多技术干货~
在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。
但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。
这篇教程将展示如何使用 Flink CDC 构建实时数据湖来应对这种场景,本教程的演示基于 Docker,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE,你可以很方便地在自己的电脑上完成本教程的全部内容。
接下来将以数据从 MySQL 同步到 Iceberg[1] 为例展示整个流程,架构图如下所示:
一、准备阶段
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
1.1 准备教程所需要的组件
接下来的教程将以 docker-compose
的方式准备所需要的组件。
使用下面的内容创建一个 docker-compose.yml
文件:
version: '2.1'
services:sql-client:user: flink:flinkimage: yuxialuo/flink-sql-client:1.13.2.v1 depends_on:- jobmanager- mysqlenvironment:FLINK_JOBMANAGER_HOST: jobmanagerMYSQL_HOST: mysqlvolumes:- shared-tmpfs:/tmp/icebergjobmanager:user: flink:flinkimage: flink:1.13.2-scala_2.11ports:- "8081:8081"command: jobmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagervolumes:- shared-tmpfs:/tmp/icebergtaskmanager:user: flink:flinkimage: flink:1.13.2-scala_2.11depends_on:- jobmanagercommand: taskmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 2volumes:- shared-tmpfs:/tmp/icebergmysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpwvolumes:shared-tmpfs:driver: localdriver_opts:type: "tmpfs"device: "tmpfs"
该 Docker Compose 中包含的容器有:
SQL-Client:Flink SQL Client, 用来提交 SQL 查询和查看 SQL 的执行结果;
Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用来执行 Flink SQL;
MySQL:作为分库分表的数据源,存储本教程的
user
表。
在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:
docker-compose up -d
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8081/ 来查看 Flink 是否运行正常。
注意:
1. 本教程接下来用到的容器相关的命令都需要在 docker-compose.yml
所在目录下执行。
2. 为了简化整个教程,本教程需要的 jar 包都已经被打包进 SQL-Client 容器中了,镜像的构建脚本可以在 GitHub[2] 上找到。
如果你想要在自己的 Flink 环境运行本教程,需要下载下面列出的包并且把它们放在 Flink 所在目录的 lib 目录下,即 FLINK_HOME/lib/
。
flink-sql-connector-mysql-cdc-2.1.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.0/flink-sql-connector-mysql-cdc-2.1.0.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar
https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar
截止目前支持 Flink 1.13 的 iceberg-flink-runtime
jar 包还没有发布,所以我们在这里提供了一个支持 Flink 1.13 的 iceberg-flink-runtime
jar 包,这个 jar 包是基于 Iceberg 的 master 分支打包的。
当 Iceberg 0.13.0 版本发布后,你也可以在 apache official repository[3] 下载到支持 Flink 1.13 的 iceberg-flink-runtime
jar 包。
1.2 准备数据
进入 MySQL 容器中:
docker-compose exec mysql mysql -uroot -p123456
创建数据和表,并填充数据。
创建两个不同的数据库,并在每个数据库中创建两个表,作为 user
表分库分表下拆分出的表。
CREATE DATABASE db_1;USE db_1;CREATE TABLE user_1 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255));INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");CREATE TABLE user_2 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255));
INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);CREATE TABLE user_2 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");
二、在 Flink SQL CLI 中
使用 Flink DDL 创建表
首先,使用如下的命令进入 Flink SQL CLI 容器中:
docker-compose exec sql-client ./sql-client
我们可以看到如下界面:
然后,进行如下步骤:
1. 开启 checkpoint
Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。
-- Flink SQL
-- 每隔 3 秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
2. 创建 MySQL 分库分表 source 表
创建 source 表 user_source
来捕获MySQL中所有 user
表的数据,在表的配置项 database-name
, table-name
使用正则表达式来匹配这些表。并且,user_source
表也定义了 metadata 列来区分数据是来自哪个数据库和表。
-- Flink SQL
Flink SQL> CREATE TABLE user_source (database_name STRING METADATA VIRTUAL,table_name STRING METADATA VIRTUAL,`id` DECIMAL(20, 0) NOT NULL,name STRING,address STRING,phone_number STRING,email STRING,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'mysql','port' = '3306','username' = 'root','password' = '123456','database-name' = 'db_[0-9]+','table-name' = 'user_[0-9]+');
3. 创建 Iceberg sink 表
创建 sink 表 all_users_sink
,用来将数据加载至 Iceberg 中。在这个 sink 表,考虑到不同的 MySQL 数据库表的 id
字段的值可能相同,我们定义了复合主键 (database_name
, table_name
, id
)。
-- Flink SQL
Flink SQL> CREATE TABLE all_users_sink (database_name STRING,table_name STRING,`id` DECIMAL(20, 0) NOT NULL,name STRING,address STRING,phone_number STRING,email STRING,PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED) WITH ('connector'='iceberg','catalog-name'='iceberg_catalog','catalog-type'='hadoop', 'warehouse'='file:///tmp/iceberg/warehouse','format-version'='2');
三、流式写入 Iceberg
1. 使用下面的 Flink SQL 语句将数据从 MySQL 写入 Iceberg 中:
-- Flink SQL
Flink SQL> INSERT INTO all_users_sink select * from user_source;
上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。在 Flink UI (http://localhost:8081/#/job/running)上可以看到这个运行的作业:
然后我们就可以使用如下的命令看到 Iceberg 中的写入的文件:
docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/
如下所示:
在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。
2. 使用下面的 Flink SQL 语句查询表 all_users_sink
中的数据:
-- Flink SQL
Flink SQL> SELECT * FROM all_users_sink;
在 Flink SQL CLI 中我们可以看到如下查询结果:
修改 MySQL 中表的数据,Iceberg 中的表 all_users_sink
中的数据也将实时更新:
(3.1) 在 db_1.user_1
表中插入新的一行
--- db_1
INSERT INTO db_1.user_1 VALUES
(111,"user_111","Shanghai","123567891234","user_111@foo.com");
(3.2) 更新 db_1.user_2
表的数据
--- db_1
UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;
(3.3) 在 db_2.user_2
表中删除一行
--- db_2
DELETE FROM db_2.user_2 WHERE id=220;
每执行一步,我们就可以在 Flink Client CLI 中使用 SELECT * FROM all_users_sink
查询表 all_users_sink
来看到数据的变化。
最后的查询结果如下所示:
从 Iceberg 的最新结果中可以看到新增了 (db_1, user_1, 111)
的记录,(db_1, user_2, 120)
的地址更新成了Beijing
,且 (db_2, user_2, 220)
的记录被删除了,与我们在 MySQL 做的数据更新完全一致。
四、环境清理
本教程结束后,在 docker-compose.yml
文件所在的目录下执行如下命令停止所有容器:
docker-compose down
五、总结
在本文中,我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最后希望通过本文,能够帮助读者快速上手 Flink CDC 。
更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~
注释:
[1] https://iceberg.apache.org/
[2] https://github.com/luoyuxia/flink-cdc-tutorial/tree/main/flink-cdc-iceberg-demo/sql-client
[3] https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/
相关文章
Flink CDC 系列 - 实时抽取 Oracle 数据,排雷和调优实践
Flink CDC 系列 - 实现 MySQL 数据实时写入 Apache Doris
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
Flink Forward Asia 2021
2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。
大会官网:
https://flink-forward.org.cn
大会线上观看地址 (记得预约哦):
https://developer.aliyun.com/special/ffa2021/live
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~
戳我,查看更多技术干货~
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖相关推荐
- Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...
- 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...
- Mysql系列七:分库分表技术难题之分布式全局唯一id解决方案
Mysql系列七:分库分表技术难题之分布式全局唯一id解决方案 参考文章: (1)Mysql系列七:分库分表技术难题之分布式全局唯一id解决方案 (2)https://www.cnblogs.com/ ...
- 高可用Mysql架构_Mysql主从复制、Mysql双主热备、Mysql双主双从、Mysql读写分离(Mycat中间件)、Mysql分库分表架构(Mycat中间件)的演变...
[Mysql主从复制] 解决的问题 数据分布:比如一共150台机器,分别往电信.网通.移动各放50台,这样无论在哪个网络访问都很快.其次按照地域,比如国内国外,北方南方,这样地域性访问解决了. 负载均 ...
- 如何彻底解决烦人的 MySQL 分库分表问题?写一个更好的数据库!
作者 | 黄东旭 责编 | 郭 芮 我还清楚记得,五年前的这个时候,当时还在豌豆荚,午后与刘奇和崔秋的闲聊关于未来数据库的想象,就像一粒种子一样,到了今天看起来也竟枝繁叶茂郁郁葱葱,有点感慨.按照 ...
- MySQL第六讲 MySQL分库分表方案
分库分表概念 分库分表就是业务系统将数据写请求分发到master节点,而读请求分发到slave 节点的一种方案,可以大大提高整个数据库集群的性能.但是要注意,分库分表的 一整套逻辑全部是 ...
- 【MySQL】MySQL分库分表详解
目录 一.前言 1.1 数据量 1.2 磁盘 1.3 数据库连接 二.垂直拆分 or 水平拆分? 三.垂直拆分 3.1 垂直分库 3.2 垂直分表 3.3 垂直拆分的优缺点 四.水平拆分 4.1 水平 ...
- MySQL分库分表面试知识总结
场景分析 Web开发工作,亦或是海量数据开发工作,学习分库.分表.分区等知识都是很有必要的 . 面试的时候,也有可能也会被问到.不过作为一个有经验的Coder,不熟悉分库.分表技术确实有些 low. ...
- mysql 分库分表策略_【数据库】分库分表策略
关系型数据库本身比较容易成为系统瓶颈,单机存储容量.连接数.处理能力都有限.当单表的数据量达到1000W或100G以后,由于查询维度较多,即使添加从库.优化索引,做很多操作时性能仍下降严重.此时就要考 ...
- MySQL分库分表会带来哪些问题?分库分表问题
MySQL分库分表会带来哪些问题? 分库分表能有效的环节单机和单库带来的性能瓶颈和压力,突破网络IO.硬件资源.连接数的瓶颈,同时也带来了一些问题.下面将描述这些技术挑战以及对应的解决思路. 分库分表 ...
最新文章
- 26.2. Web UI
- apache 启动故障(httpd: apr_sockaddr_info_get() failed fo)
- Gif(1)-加载视图-交替圆效果
- [2016-04-19 15:46:03 - IceHoloReader1.0] Installation error: INSTALL_FAILED_CONFLICTING_PROVIDER [20
- VMware vSphere学习笔记二
- 位运算(按位与、按位或、异或、取反)以及原码、反码、补码
- C语言按下列公式计算 求A20的值,2011年全国计算机二级C语言模拟试题及答案(10)...
- 【MySql】MySql存储,游标,循环的简单使用
- 喵哈哈村的魔法考试 Round #1 (Div.2) C 喵哈哈村的魔法石(II) 背包dp
- 最常用的网络应用工具之寻线仪
- excel删除奇数行或者偶数行
- 全国IT标准化技术委员会教育技术分会CETSC介绍 (公号回复“CETSC”下载PDF资料,欢迎转发、赞赏支持)
- KY-RTI分布仿真技术:第二章 系统安装
- sql语句中count(*),count(1),count(id)区别详解
- 响应式极简新闻发布系统源码
- MySQL 8.0 初学与基础项目实践
- 上位机开发——数据库系列问题一网打尽
- 快速了解做一款App所用到的技术,及实现流程
- 视频教程-SpringBoot实战教程:SpringBoot入门及前后端分离项目开发-Java
- 读南师《金刚经说什么》有感
热门文章
- jQuery特效:实现瀑布流
- Java 13新特性:switch表达式,文本块
- 算法笔记_面试_0.刷leetcode攻略
- Axis生成wsdl的三种方法以及注意事项
- 最新开源:BundleTrack - 无需任何CAD模型的6D物体姿态跟踪算法(谷歌X实习生)...
- 「平衡树splay」学习笔记#1
- CF280C Game on tree(期望dp)
- 【转】HEIF图片存储格式探秘
- asp.net ajax客户端框架如何调用Page Method
- flask前端优化:css/js/html压缩