文章目录

  • 前言
  • 为什么要学FlinkCDC
  • 什么是CDC
  • CDC 的种类
  • Flink CDC
  • Flink CDC 案例实操
    • 依赖准备
    • 编码(DataStream)
    • 本地测试
      • 配置MySQL中的binlog
      • 创建数据库,建表,添加一个语句
      • 执行代码查看
    • 集群测试
    • 其他参数测试
    • 前言
      • 依赖准备
      • 编码(FlinkSQL)
  • 自定义反序列化器
  • 总结

前言

CDC有两种方式,一种是离线的,一种是实时的,也就是一种是基于查询的,一种是Binlog的这种方式。

为什么要学FlinkCDC

我们用传统的CDC工具检测到一个数据后,我们要对数据进行计算,常规的方案是采用CDC工具将数据采集到消息队列中,用Spark 或者 Flink 进行计算,加工, 有了Flink CDC 我们可以读取数据和加工数据用Flink一起完成,这就是学习Flink CDC 的一个根本原因。

什么是CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,

Flink CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的 source 组件。

开源地址:https://github.com/ververica/flink-cdc-connectors

Flink CDC 案例实操

依赖准备

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

编码(DataStream)

package com.bigdata.cdc;import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkStreamCDC {public static void main(String[] args) throws Exception {//创建流处理执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism( 1 );//创建 Flink-MySQL-CDC 的 SourceDebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder().hostname( "hadoop102" ) //主机名.port( 3306 ) //端口号.username( "root" ) //用户名.password( "000000" ) //密码.databaseList( "cdc_test" ) //数据库名.tableList( "cdc_test.user_info" ) //表名.deserializer( new StringDebeziumDeserializationSchema() ) // 反序列化.startupOptions( StartupOptions.initial() )  //初始化 (拍快照).build();//添加数据源DataStreamSource<String> dataStream = env.addSource( mysqlSource );//做一个打印dataStream.print();//执行env.execute( "Flink_CDD" );}
}

本地测试

本地测试的前提得有环境,刚刚在编码中测试的数据库是 cdc_test 表名是user_info,我们现在得把它们创建出来,但是前提是,一定要在MySQL 配置文件中,添加binlog 这样才能有效的进行测试。

配置MySQL中的binlog

进入MySQL配置文件 my.cnf

[root@hadoop102 ~]# vim /etc/my.cnf

将如下内容添加 [mysqld] 下中

#binlog日志名称前缀
log-bin=mysql-bin
##默认值未0,如果使用默认值则不能和从节点通信,这个值的区间是:1到(2^32)-1
server-id=1
binlog-do-db=cdc_test

重启MySQL服务

[root@hadoop102 ~]# systemctl restart mysqld

创建数据库,建表,添加一个语句

create database cdc_test;use cdc_test;create table user_info(id int,name varchar(200),age varchar(200)
);insert into user_info values(1,"张三","24");

执行代码查看

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v8MfxJcH-1661149264947)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302183745037.png)]

下面添加一个数据

insert into user_info values(2,"李四","24");

下面修改数据,将 年龄 24 改成 26

 update user_info set age = "26" where id = 2;

添加王五 并删除

insert into user_info values(3,"王五","23");delete from user_info where id = 3;

假如说,我们这个挂掉了,再次开启的时候,它会不会读取之前的状态呢,就是说,当前我们有张三和和李四,那么会不会将它们读取到呢?

其实还是能读取到的,只是以前的操作看不到了,op 也变成了 r ,还是能将之前的数据展示出来

集群测试

代码中添加如下配置

/*** Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,* 需要从 Checkpoint 或者 Savepoint 启动程序*///开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE);//设置CK 的超时时间 10秒钟env.getCheckpointConfig().setCheckpointTimeout( 10000L );//并发检查点尝试的最大次数。env.getCheckpointConfig().setMaxConcurrentCheckpoints( 1 );//设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc_test/ck"));

进行打包,将打包后的jar 上传到Linux 下的 Flink 目录中

启动集群测试之前,先启动Hadoop集群和Flink集群

Flink 集群

start-cluster.sh

输入如下命令

bin/flink run -m hadoop102:8081 -c com.bigdata.cdc.FlinkDreamCDC_Cluster ./Flink_CDC-1.0-SNAPSHOT-jar-with-dependencies.jar

数据库中的数据

Flink 集群页面如下,读取两组数据

开始尝试添加一组数据

此时Flink 集群中读取的数据 如下:

断点续传做数据的恢复,输入如下命令

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vPDNOcmF-1661149264965)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302222248369.png)]

flink savepoint 8876a5be5f6e0a754aa662bff41fc03d hdfs://hadoop102:8020/cdc_test/savepoint

此时我杀掉Flink当年前的任务,并且做一下 增加 ,修改以及删除的操作。

添加人员张栋,修改欢欢年龄20,删除姓名为李四的人

开始再次启动Flink 查看,断电传续功能

bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/cdc_test/savepoint/savepoint-8876a5-aa9882ca253c -c com.bigdata.cdc.FlinkDreamCDC_Cluster ./Flink_CDC-1.0-SNAPSHOT-jar-with-dependencies.jar

注意:-s 后面加入的是上述断点传续命令中如下图片中圈起来的模块。

结果如下:

前6行是原有的功能,从第8行开始,操作的数据都是在Flink 挂掉之后操作的,添加了 张呆呆,将欢欢年龄改成了20,又删除了李四,这个就是断点续传功能

其他参数测试

//创建 Flink-MySQL-CDC 的 SourceDebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder().hostname( "hadoop102" ) //主机名.port( 3306 ) //端口号.username( "root" ) //用户名.password( "000000" ) //密码.databaseList( "cdc_test" ) //数据库名.tableList( "cdc_test.user_info" ) //表名.deserializer( new StringDebeziumDeserializationSchema() ) // 反序列化.startupOptions( StartupOptions.initial() )  //初始化 (拍快照).build();

1,当我们把表名注销掉之后,运行时,依然也能看到信息,说明,只要有数据库名称,哪怕是不写上表的名字,依然也能监控表中的信息

2,如果 参数是 startupOptions(StartupOptions.latest()); 那么不会输出历史信息,只会讲再次操作的信息展示出来。

前言

FlinkCDC 用的是2.0版本的,但是2.0版本不支持1.12版本的FlinkSQL , 只支持1.13版本的FlinkSQL,如果要使用FlinkSQL 编程 CDC 的话,需要引入1.13版本的Flink

依赖准备

<properties><flink-version>1.13.0</flink-version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

scan.startup.mode 这个参数只有两个值 "initial" and "latest-offset". (初始化),initial 代表 初始化拍快照,lateset-offset 代表不拍快照

官网是有的,想配置相应的参数参考官网:

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

编码(FlinkSQL)

package com.bigdata.cdc;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class FlinkSQLCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =              StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism( 1 );//创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );//用FlinkSQL 构建CDCtableEnv.executeSql( "CREATE TABLE user_info(" +"id INT," +"name STRING," +"age STRING" +") WITH (" +"'connector' = 'mysql-cdc'," +"'scan.startup.mode' = 'latest-offset'," +"'hostname' = 'hadoop102'," +"'port' = '3306'," +"'username' = 'root'," +"'password' = '000000'," +"'database-name' = 'cdc_test'," +"'table-name' = 'user_info'," +"'scan.incremental.snapshot.enabled' = 'false'" +")");//查询输出并转换流输出Table table = tableEnv.sqlQuery( "select * from user_info" );DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream( table, Row.class );rowDataStream.print();//执行env.execute( "Job" );}
}

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加’scan.incremental.snapshot.enabled’ = 'false’否则会报错:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

自定义反序列化器

1,实现DebeziumDeserializationSchema接口

2,重写deserialize,getProducedType两个方法

3,进入StringDebeziumDeserializationSchema类中将getProducedType返回值写进刚刚重写的getProducedType 方法中

如果用官方的序列化的话,打印的数据是如下这样子的。

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1646306260, file=mysql-bin.000002,pos=443,snapshot=true}
}
ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null,key=null,              keySchema=null, value=Struct{after=Struct{id=3,name=欢欢,age=20},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1646306260302,snapshot=true,db=cdc_test,table=user_info,server_id=0,file=mysql-bin.000002,pos=443,row=0},op=r,ts_ms=1646306260302}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)
}

使用自定义反序列化器,打印出如下格式

{

​ “db”:" ",

​ “tableName”:" ",

​ “before”:{“id”:“1”,“name”:“…”…},

​ “after”:{“id”:“1”,“name”:“…”…},

​ “op”:" "

}

代码展示:同目录下function目录两组文件

自定义序列化之后,展示的数据如下:

{"op":"READ","before":{},"after":{"name":"欢欢","id":3,"age":"20"},"db":"cdc_test","tableName":"user_info"
}

总结

1,FlinkDream 做 CDC 在1.12版本和1.13版本中都能用 ,而FlinkSQL 只能在1.13版本使用

2,FlinkDream 可以监控多表,而FlinkSQL 只能监控单表

  • FlinkDream 的缺点就是,自带的反序列化的数据,太复杂了,想简单化需要自己编写,而FlinkSQL已经是最好的Row对象展示,很方便,如果一定要使用FlinkDream 做CDC 的话,需要自定义一个反序列化器,要考虑到这一点。
  • FlinkSQL的缺点就是只能监控单表,其他的比较完美,后续等官方更新,看是否支持监控多表

一文进入Flink CDC 的世界相关推荐

  1. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  2. 37 手游基于 Flink CDC + Hudi 湖仓一体方案实践

    简介: 介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案. 本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何选择 ...

  3. Flink CDC 将MySQL的数据写入Hudi实践

    Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...

  4. Flink cdc +doris生产遇到的问题汇总-持续更新

    问题: 我有个表主键是字符串类型 然后cdc去读取的时候 自己split了很久 checkpoint一直显示执行中,我看日志打印是info : checkpoint一直卡在那里 程序一直等待中: 原因 ...

  5. Flink CDC MongoDB Connector 的实现原理和使用实践

    本文整理自 XTransfer 资深 Java 开发工程师.Flink CDC Maintainer 孙家宝在 Flink CDC Meetup 的演讲.主要内容包括: MongoDB Change ...

  6. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  7. Flink CDC 2.x 让一切变得美好

    本文基于阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flink Meetup 分享的<详解 Flink-CDC>整理. <详解 Flink-CDC>深入讲 ...

  8. flink cdc笔记(一):flink cdc简介

    1,什么是cdc CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC .目前通常描述的 CDC 技术主要面向数据库的变更, ...

  9. flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖

    原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...

最新文章

  1. --noinput loaddata
  2. 系统架构的演变 -----自 罗文浩
  3. [react] create-react-app创建新运用怎么解决卡的问题?
  4. 博客更新内容简单介绍
  5. R语言编程基础(2)
  6. python 多线程笔记(2)-- 锁
  7. 公司 MyEclipse设置和SVN安装设置
  8. Drool学习记录(一) 概念、Helloworld
  9. IBM人工智能项目Watson旧金山开设新总部
  10. xrd精修教程_XRD精修教程.pdf
  11. php根据两点经纬度计算距离
  12. 【定位技术】:常见的定位技术
  13. 从辅助运动到让人开口说话,脑机接口:“你的福气还在后头!”
  14. 2020.12.28用isprime函数判断m是否为素数
  15. 设置gmail邮箱代收的方法
  16. JAVA购物商城系统毕业设计 开题报告
  17. loadrunner 注册破解
  18. 数据库一条insert插入多条记录
  19. 史上最详细唇语识别数据集综述
  20. 小红书账号分析丨小红书kol速成干货分享

热门文章

  1. 二维点云ICP原理推导
  2. 迈瑞CL6000i全自动化学发光免疫分析仪【双向通讯】
  3. 经典运动估计算法之全搜索、三步搜索、四步搜索、菱形搜索
  4. Dell 330显卡驱动在2003server安装问题
  5. 嵌入式编程技巧(持续更新)
  6. 通过修改EFI引导文件配置WIN10与UBUNTU20.04双系统
  7. [RK3568 Android11] 开发之内置默认中文输入法(谷歌输入法)
  8. 自学Java-day01(初学Java)
  9. LeakCanary源码分析
  10. 玩家离开游戏10个理由