一文进入Flink CDC 的世界
文章目录
- 前言
- 为什么要学FlinkCDC
- 什么是CDC
- CDC 的种类
- Flink CDC
- Flink CDC 案例实操
- 依赖准备
- 编码(DataStream)
- 本地测试
- 配置MySQL中的binlog
- 创建数据库,建表,添加一个语句
- 执行代码查看
- 集群测试
- 其他参数测试
- 前言
- 依赖准备
- 编码(FlinkSQL)
- 自定义反序列化器
- 总结
前言
CDC有两种方式,一种是离线的,一种是实时的,也就是一种是基于查询的,一种是Binlog的这种方式。
为什么要学FlinkCDC
我们用传统的CDC工具检测到一个数据后,我们要对数据进行计算,常规的方案是采用CDC工具将数据采集到消息队列中,用Spark 或者 Flink 进行计算,加工, 有了Flink CDC 我们可以读取数据和加工数据用Flink一起完成,这就是学习Flink CDC 的一个根本原因。
什么是CDC
CDC 是 Change Data Capture(变更数据获取)
的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC 的种类
CDC 主要分为基于查询
和基于 Binlog
两种方式,
Flink CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据
和增量变更
数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors
Flink CDC 案例实操
依赖准备
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
编码(DataStream)
package com.bigdata.cdc;import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkStreamCDC {public static void main(String[] args) throws Exception {//创建流处理执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism( 1 );//创建 Flink-MySQL-CDC 的 SourceDebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder().hostname( "hadoop102" ) //主机名.port( 3306 ) //端口号.username( "root" ) //用户名.password( "000000" ) //密码.databaseList( "cdc_test" ) //数据库名.tableList( "cdc_test.user_info" ) //表名.deserializer( new StringDebeziumDeserializationSchema() ) // 反序列化.startupOptions( StartupOptions.initial() ) //初始化 (拍快照).build();//添加数据源DataStreamSource<String> dataStream = env.addSource( mysqlSource );//做一个打印dataStream.print();//执行env.execute( "Flink_CDD" );}
}
本地测试
本地测试的前提得有环境,刚刚在编码中测试的数据库是 cdc_test 表名是user_info,我们现在得把它们创建出来,但是前提是,一定要在MySQL 配置文件中,添加binlog 这样才能有效的进行测试。
配置MySQL中的binlog
进入MySQL配置文件 my.cnf
[root@hadoop102 ~]# vim /etc/my.cnf
将如下内容添加 [mysqld] 下中
#binlog日志名称前缀
log-bin=mysql-bin
##默认值未0,如果使用默认值则不能和从节点通信,这个值的区间是:1到(2^32)-1
server-id=1
binlog-do-db=cdc_test
重启MySQL服务
[root@hadoop102 ~]# systemctl restart mysqld
创建数据库,建表,添加一个语句
create database cdc_test;use cdc_test;create table user_info(id int,name varchar(200),age varchar(200)
);insert into user_info values(1,"张三","24");
执行代码查看
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v8MfxJcH-1661149264947)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302183745037.png)]
下面添加一个数据
insert into user_info values(2,"李四","24");
下面修改数据,将 年龄 24 改成 26
update user_info set age = "26" where id = 2;
添加王五 并删除
insert into user_info values(3,"王五","23");delete from user_info where id = 3;
假如说,我们这个挂掉了,再次开启的时候,它会不会读取之前的状态呢,就是说,当前我们有张三和和李四,那么会不会将它们读取到呢?
其实还是能读取到的,只是以前的操作看不到了,op 也变成了 r ,还是能将之前的数据展示出来
集群测试
代码中添加如下配置
/*** Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,* 需要从 Checkpoint 或者 Savepoint 启动程序*///开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE);//设置CK 的超时时间 10秒钟env.getCheckpointConfig().setCheckpointTimeout( 10000L );//并发检查点尝试的最大次数。env.getCheckpointConfig().setMaxConcurrentCheckpoints( 1 );//设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc_test/ck"));
进行打包,将打包后的jar 上传到Linux 下的 Flink 目录中
启动集群测试之前,先启动Hadoop集群和Flink集群
Flink 集群
start-cluster.sh
输入如下命令
bin/flink run -m hadoop102:8081 -c com.bigdata.cdc.FlinkDreamCDC_Cluster ./Flink_CDC-1.0-SNAPSHOT-jar-with-dependencies.jar
数据库中的数据
Flink 集群页面如下,读取两组数据
开始尝试添加一组数据
此时Flink 集群中读取的数据 如下:
断点续传
做数据的恢复,输入如下命令
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vPDNOcmF-1661149264965)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302222248369.png)]
flink savepoint 8876a5be5f6e0a754aa662bff41fc03d hdfs://hadoop102:8020/cdc_test/savepoint
此时我杀掉Flink当年前的任务,并且做一下 增加 ,修改以及删除的操作。
添加人员张栋,修改欢欢年龄20,删除姓名为李四的人
开始再次启动Flink 查看,断电传续功能
bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/cdc_test/savepoint/savepoint-8876a5-aa9882ca253c -c com.bigdata.cdc.FlinkDreamCDC_Cluster ./Flink_CDC-1.0-SNAPSHOT-jar-with-dependencies.jar
注意:-s 后面加入的是上述断点传续命令中如下图片中圈起来的模块。
结果如下:
前6行是原有的功能,从第8行开始,操作的数据都是在Flink 挂掉之后操作的,添加了 张呆呆,将欢欢年龄改成了20,又删除了李四,这个就是断点续传功能
。
其他参数测试
//创建 Flink-MySQL-CDC 的 SourceDebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder().hostname( "hadoop102" ) //主机名.port( 3306 ) //端口号.username( "root" ) //用户名.password( "000000" ) //密码.databaseList( "cdc_test" ) //数据库名.tableList( "cdc_test.user_info" ) //表名.deserializer( new StringDebeziumDeserializationSchema() ) // 反序列化.startupOptions( StartupOptions.initial() ) //初始化 (拍快照).build();
1,当我们把表名注销掉之后,运行时,依然也能看到信息,说明,只要有数据库名称,哪怕是不写上表的名字,依然也能监控表中的信息
2,如果 参数是 startupOptions(StartupOptions.latest());
那么不会输出历史信息,只会讲再次操作的信息展示出来。
前言
FlinkCDC 用的是2.0版本的,但是2.0版本不支持1.12版本的FlinkSQL , 只支持1.13版本的FlinkSQL,如果要使用FlinkSQL 编程 CDC 的话,需要引入1.13版本的Flink
依赖准备
<properties><flink-version>1.13.0</flink-version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
scan.startup.mode 这个参数只有两个值 "initial" and "latest-offset".
(初始化),initial 代表 初始化拍快照,lateset-offset 代表不拍快照
官网是有的,想配置相应的参数参考官网:
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
编码(FlinkSQL)
package com.bigdata.cdc;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class FlinkSQLCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism( 1 );//创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );//用FlinkSQL 构建CDCtableEnv.executeSql( "CREATE TABLE user_info(" +"id INT," +"name STRING," +"age STRING" +") WITH (" +"'connector' = 'mysql-cdc'," +"'scan.startup.mode' = 'latest-offset'," +"'hostname' = 'hadoop102'," +"'port' = '3306'," +"'username' = 'root'," +"'password' = '000000'," +"'database-name' = 'cdc_test'," +"'table-name' = 'user_info'," +"'scan.incremental.snapshot.enabled' = 'false'" +")");//查询输出并转换流输出Table table = tableEnv.sqlQuery( "select * from user_info" );DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream( table, Row.class );rowDataStream.print();//执行env.execute( "Job" );}
}
注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加’scan.incremental.snapshot.enabled’ = 'false’否则会报错:
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
自定义反序列化器
1,实现DebeziumDeserializationSchema接口
2,重写deserialize,getProducedType两个方法
3,进入StringDebeziumDeserializationSchema类中将getProducedType返回值写进刚刚重写的getProducedType 方法中
如果用官方的序列化的话,打印的数据是如下这样子的。
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1646306260, file=mysql-bin.000002,pos=443,snapshot=true}
}
ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null,key=null, keySchema=null, value=Struct{after=Struct{id=3,name=欢欢,age=20},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1646306260302,snapshot=true,db=cdc_test,table=user_info,server_id=0,file=mysql-bin.000002,pos=443,row=0},op=r,ts_ms=1646306260302}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)
}
使用自定义反序列化器,打印出如下格式
{
“db”:" ",
“tableName”:" ",
“before”:{“id”:“1”,“name”:“…”…},
“after”:{“id”:“1”,“name”:“…”…},
“op”:" "
}
代码展示:同目录下function目录两组文件
自定义序列化之后,展示的数据如下:
{"op":"READ","before":{},"after":{"name":"欢欢","id":3,"age":"20"},"db":"cdc_test","tableName":"user_info"
}
总结
1,FlinkDream 做 CDC 在1.12版本和1.13版本中都能用 ,而FlinkSQL 只能在1.13版本使用
2,FlinkDream 可以监控多表,而FlinkSQL 只能监控单表
- FlinkDream 的缺点就是,自带的反序列化的数据,太复杂了,想简单化需要自己编写,而FlinkSQL已经是最好的Row对象展示,很方便,如果一定要使用FlinkDream 做CDC 的话,需要自定义一个反序列化器,要考虑到这一点。
- FlinkSQL的缺点就是只能监控单表,其他的比较完美,后续等官方更新,看是否支持监控多表
一文进入Flink CDC 的世界相关推荐
- Flink CDC 2.0 正式发布,详解核心改进
简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...
- 37 手游基于 Flink CDC + Hudi 湖仓一体方案实践
简介: 介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案. 本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何选择 ...
- Flink CDC 将MySQL的数据写入Hudi实践
Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...
- Flink cdc +doris生产遇到的问题汇总-持续更新
问题: 我有个表主键是字符串类型 然后cdc去读取的时候 自己split了很久 checkpoint一直显示执行中,我看日志打印是info : checkpoint一直卡在那里 程序一直等待中: 原因 ...
- Flink CDC MongoDB Connector 的实现原理和使用实践
本文整理自 XTransfer 资深 Java 开发工程师.Flink CDC Maintainer 孙家宝在 Flink CDC Meetup 的演讲.主要内容包括: MongoDB Change ...
- Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓
摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...
- Flink CDC 2.x 让一切变得美好
本文基于阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flink Meetup 分享的<详解 Flink-CDC>整理. <详解 Flink-CDC>深入讲 ...
- flink cdc笔记(一):flink cdc简介
1,什么是cdc CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC .目前通常描述的 CDC 技术主要面向数据库的变更, ...
- flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖
原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...
最新文章
- --noinput loaddata
- 系统架构的演变 -----自 罗文浩
- [react] create-react-app创建新运用怎么解决卡的问题?
- 博客更新内容简单介绍
- R语言编程基础(2)
- python 多线程笔记(2)-- 锁
- 公司 MyEclipse设置和SVN安装设置
- Drool学习记录(一) 概念、Helloworld
- IBM人工智能项目Watson旧金山开设新总部
- xrd精修教程_XRD精修教程.pdf
- php根据两点经纬度计算距离
- 【定位技术】:常见的定位技术
- 从辅助运动到让人开口说话,脑机接口:“你的福气还在后头!”
- 2020.12.28用isprime函数判断m是否为素数
- 设置gmail邮箱代收的方法
- JAVA购物商城系统毕业设计 开题报告
- loadrunner 注册破解
- 数据库一条insert插入多条记录
- 史上最详细唇语识别数据集综述
- 小红书账号分析丨小红书kol速成干货分享