▼ 关注「Apache Flink」,获取更多技术干货 ▼

摘要:本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。

Flink-CDC 项目地址:

https://github.com/ververica/flink-cdc-connectors

Tips:点击「阅读原文」查看更多技术干货~

在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。

但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展示如何使用 Flink CDC 构建实时数据湖来应对这种场景,本教程的演示基于 Docker,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE,你可以很方便地在自己的电脑上完成本教程的全部内容。

接下来将以数据从 MySQL 同步到 Iceberg[1] 为例展示整个流程,架构图如下所示:

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:sql-client:user: flink:flinkimage: yuxialuo/flink-sql-client:1.13.2.v1 depends_on:- jobmanager- mysqlenvironment:FLINK_JOBMANAGER_HOST: jobmanagerMYSQL_HOST: mysqlvolumes:- shared-tmpfs:/tmp/icebergjobmanager:user: flink:flinkimage: flink:1.13.2-scala_2.11ports:- "8081:8081"command: jobmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagervolumes:- shared-tmpfs:/tmp/icebergtaskmanager:user: flink:flinkimage: flink:1.13.2-scala_2.11depends_on:- jobmanagercommand: taskmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 2volumes:- shared-tmpfs:/tmp/icebergmysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpwvolumes:shared-tmpfs:driver: localdriver_opts:type: "tmpfs"device: "tmpfs"

该 Docker Compose 中包含的容器有:

  • SQL-Client:Flink SQL Client, 用来提交 SQL 查询和查看 SQL 的执行结果;

  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用来执行 Flink SQL;

  • MySQL:作为分库分表的数据源,存储本教程的 user 表。

在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8081/ 来查看 Flink 是否运行正常。

注意:

1. 本教程接下来用到的容器相关的命令都需要在 docker-compose.yml 所在目录下执行。

2. 为了简化整个教程,本教程需要的 jar 包都已经被打包进 SQL-Client 容器中了,镜像的构建脚本可以在 GitHub[2] 上找到。

如果你想要在自己的 Flink 环境运行本教程,需要下载下面列出的包并且把它们放在 Flink 所在目录的 lib 目录下,即 FLINK_HOME/lib/

  • flink-sql-connector-mysql-cdc-2.1.0.jar

    https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.0/flink-sql-connector-mysql-cdc-2.1.0.jar

  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

    https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

  • iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

    https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

截止目前支持 Flink 1.13 的 iceberg-flink-runtime jar 包还没有发布,所以我们在这里提供了一个支持 Flink 1.13 的 iceberg-flink-runtime jar 包,这个 jar 包是基于 Iceberg 的 master 分支打包的。

当 Iceberg 0.13.0 版本发布后,你也可以在 apache official repository[3] 下载到支持 Flink 1.13 的 iceberg-flink-runtime jar 包。


1.2 准备数据

进入 MySQL 容器中:

docker-compose exec mysql mysql -uroot -p123456

创建数据和表,并填充数据。

创建两个不同的数据库,并在每个数据库中创建两个表,作为 user 表分库分表下拆分出的表。

CREATE DATABASE db_1;USE db_1;CREATE TABLE user_1 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255));INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");CREATE TABLE user_2 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255));
INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);CREATE TABLE user_2 (id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(255) NOT NULL DEFAULT 'flink',address VARCHAR(1024),phone_number VARCHAR(512),email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");

二、在 Flink SQL CLI 中

使用 Flink DDL 创建表

首先,使用如下的命令进入 Flink SQL CLI 容器中:

docker-compose exec sql-client ./sql-client

我们可以看到如下界面:

然后,进行如下步骤:

1. 开启 checkpoint

Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。

-- Flink SQL
-- 每隔 3 秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;

2. 创建 MySQL 分库分表 source 表

创建 source 表 user_source 来捕获MySQL中所有 user 表的数据,在表的配置项 database-nametable-name 使用正则表达式来匹配这些表。并且,user_source 表也定义了 metadata 列来区分数据是来自哪个数据库和表。

-- Flink SQL
Flink SQL> CREATE TABLE user_source (database_name STRING METADATA VIRTUAL,table_name STRING METADATA VIRTUAL,`id` DECIMAL(20, 0) NOT NULL,name STRING,address STRING,phone_number STRING,email STRING,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'mysql','port' = '3306','username' = 'root','password' = '123456','database-name' = 'db_[0-9]+','table-name' = 'user_[0-9]+');

3. 创建 Iceberg sink 表

创建 sink 表 all_users_sink,用来将数据加载至 Iceberg 中。在这个 sink 表,考虑到不同的 MySQL 数据库表的 id 字段的值可能相同,我们定义了复合主键 (database_name, table_name, id)。

-- Flink SQL
Flink SQL> CREATE TABLE all_users_sink (database_name STRING,table_name    STRING,`id`          DECIMAL(20, 0) NOT NULL,name          STRING,address       STRING,phone_number  STRING,email         STRING,PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED) WITH ('connector'='iceberg','catalog-name'='iceberg_catalog','catalog-type'='hadoop',  'warehouse'='file:///tmp/iceberg/warehouse','format-version'='2');

三、流式写入 Iceberg  

1. 使用下面的 Flink SQL 语句将数据从 MySQL 写入 Iceberg 中:

-- Flink SQL
Flink SQL> INSERT INTO all_users_sink select * from user_source;

上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。在 Flink UI (http://localhost:8081/#/job/running)上可以看到这个运行的作业:

然后我们就可以使用如下的命令看到 Iceberg 中的写入的文件:

docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

如下所示:

在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。

2. 使用下面的 Flink SQL 语句查询表 all_users_sink 中的数据:

-- Flink SQL
Flink SQL> SELECT * FROM all_users_sink;

在 Flink SQL CLI 中我们可以看到如下查询结果:

修改 MySQL 中表的数据,Iceberg 中的表 all_users_sink 中的数据也将实时更新:

(3.1) 在 db_1.user_1 表中插入新的一行

--- db_1
INSERT INTO db_1.user_1 VALUES
(111,"user_111","Shanghai","123567891234","user_111@foo.com");

(3.2) 更新 db_1.user_2 表的数据

--- db_1
UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;

(3.3) 在 db_2.user_2 表中删除一行

--- db_2
DELETE FROM db_2.user_2 WHERE id=220;

每执行一步,我们就可以在 Flink Client CLI 中使用 SELECT * FROM all_users_sink 查询表 all_users_sink 来看到数据的变化。

最后的查询结果如下所示:

从 Iceberg 的最新结果中可以看到新增了 (db_1, user_1, 111) 的记录,(db_1, user_2, 120) 的地址更新成了Beijing,且 (db_2, user_2, 220) 的记录被删除了,与我们在 MySQL 做的数据更新完全一致。

四、环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

五、总结

在本文中,我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最后希望通过本文,能够帮助读者快速上手 Flink CDC 。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

注释:

[1] https://iceberg.apache.org/

[2] https://github.com/luoyuxia/flink-cdc-tutorial/tree/main/flink-cdc-iceberg-demo/sql-client

[3] https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/


相关文章

  • Flink CDC 系列 - 实时抽取 Oracle 数据,排雷和调优实践

  • Flink CDC 系列 - 实现 MySQL 数据实时写入 Apache Doris

  • Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL

Flink Forward Asia 2021 

2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。

大会官网:

https://flink-forward.org.cn

大会线上观看地址 (记得预约哦):

https://developer.aliyun.com/special/ffa2021/live


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

  戳我,查看更多技术干货~

Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖相关推荐

  1. Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...

  2. 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...

  3. Mysql系列七:分库分表技术难题之分布式全局唯一id解决方案

    Mysql系列七:分库分表技术难题之分布式全局唯一id解决方案 参考文章: (1)Mysql系列七:分库分表技术难题之分布式全局唯一id解决方案 (2)https://www.cnblogs.com/ ...

  4. 高可用Mysql架构_Mysql主从复制、Mysql双主热备、Mysql双主双从、Mysql读写分离(Mycat中间件)、Mysql分库分表架构(Mycat中间件)的演变...

    [Mysql主从复制] 解决的问题 数据分布:比如一共150台机器,分别往电信.网通.移动各放50台,这样无论在哪个网络访问都很快.其次按照地域,比如国内国外,北方南方,这样地域性访问解决了. 负载均 ...

  5. 如何彻底解决烦人的 MySQL 分库分表问题?写一个更好的数据库!

    作者 | 黄东旭 责编 | 郭   芮 我还清楚记得,五年前的这个时候,当时还在豌豆荚,午后与刘奇和崔秋的闲聊关于未来数据库的想象,就像一粒种子一样,到了今天看起来也竟枝繁叶茂郁郁葱葱,有点感慨.按照 ...

  6. MySQL第六讲 MySQL分库分表方案

    分库分表概念        分库分表就是业务系统将数据写请求分发到master节点,而读请求分发到slave 节点的一种方案,可以大大提高整个数据库集群的性能.但是要注意,分库分表的 一整套逻辑全部是 ...

  7. 【MySQL】MySQL分库分表详解

    目录 一.前言 1.1 数据量 1.2 磁盘 1.3 数据库连接 二.垂直拆分 or 水平拆分? 三.垂直拆分 3.1 垂直分库 3.2 垂直分表 3.3 垂直拆分的优缺点 四.水平拆分 4.1 水平 ...

  8. MySQL分库分表面试知识总结

    场景分析 Web开发工作,亦或是海量数据开发工作,学习分库.分表.分区等知识都是很有必要的 . 面试的时候,也有可能也会被问到.不过作为一个有经验的Coder,不熟悉分库.分表技术确实有些 low. ...

  9. mysql 分库分表策略_【数据库】分库分表策略

    关系型数据库本身比较容易成为系统瓶颈,单机存储容量.连接数.处理能力都有限.当单表的数据量达到1000W或100G以后,由于查询维度较多,即使添加从库.优化索引,做很多操作时性能仍下降严重.此时就要考 ...

  10. MySQL分库分表会带来哪些问题?分库分表问题

    MySQL分库分表会带来哪些问题? 分库分表能有效的环节单机和单库带来的性能瓶颈和压力,突破网络IO.硬件资源.连接数的瓶颈,同时也带来了一些问题.下面将描述这些技术挑战以及对应的解决思路. 分库分表 ...

最新文章

  1. 26.2. Web UI
  2. apache 启动故障(httpd: apr_sockaddr_info_get() failed fo)
  3. Gif(1)-加载视图-交替圆效果
  4. [2016-04-19 15:46:03 - IceHoloReader1.0] Installation error: INSTALL_FAILED_CONFLICTING_PROVIDER [20
  5. VMware vSphere学习笔记二
  6. 位运算(按位与、按位或、异或、取反)以及原码、反码、补码
  7. C语言按下列公式计算 求A20的值,2011年全国计算机二级C语言模拟试题及答案(10)...
  8. 【MySql】MySql存储,游标,循环的简单使用
  9. 喵哈哈村的魔法考试 Round #1 (Div.2) C 喵哈哈村的魔法石(II) 背包dp
  10. 最常用的网络应用工具之寻线仪
  11. excel删除奇数行或者偶数行
  12. 全国IT标准化技术委员会教育技术分会CETSC介绍 (公号回复“CETSC”下载PDF资料,欢迎转发、赞赏支持)
  13. KY-RTI分布仿真技术:第二章 系统安装
  14. sql语句中count(*),count(1),count(id)区别详解
  15. 响应式极简新闻发布系统源码
  16. MySQL 8.0 初学与基础项目实践
  17. 上位机开发——数据库系列问题一网打尽
  18. 快速了解做一款App所用到的技术,及实现流程
  19. 视频教程-SpringBoot实战教程:SpringBoot入门及前后端分离项目开发-Java
  20. 读南师《金刚经说什么》有感

热门文章

  1. jQuery特效:实现瀑布流
  2. Java 13新特性:switch表达式,文本块
  3. 算法笔记_面试_0.刷leetcode攻略
  4. Axis生成wsdl的三种方法以及注意事项
  5. 最新开源:BundleTrack - 无需任何CAD模型的6D物体姿态跟踪算法(谷歌X实习生)...
  6. 「平衡树splay」学习笔记#1
  7. CF280C Game on tree(期望dp)
  8. 【转】HEIF图片存储格式探秘
  9. asp.net ajax客户端框架如何调用Page Method
  10. flask前端优化:css/js/html压缩