文章目录

  • 前文
    • (1)什么是CDC
    • (2)Flink-CDC是什么
    • (3)Flink-CDC 特性
  • CDC与Flink毕业版本
  • Springboot项目整合Flink-CDC
    • (1)说明
    • (2)引入依赖
    • (3)接入springboot项目
      • 创建监听类 实现 ApplicationRunner
      • 自定义数据读取解析器
      • 变更对象
      • 自定义sink 交由spring管理

前文

(1)什么是CDC

CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。

(2)Flink-CDC是什么

CDC Connectors for Apache Flink ®是一组用于Apache Flink ®的源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更。用于 Apache Flink ®的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

白话的意思是,Flink-CDC 一个成型的cdc技术实现(Debezium)的包装,我前面也使用过Debezium,并编写了一个简略的博客,感兴趣的可以戳下方连接去看一下

springboot+debezium捕获数据库变更(mysql、sql-server、mongodb、oracle…)

(3)Flink-CDC 特性

  1. 支持读取数据库快照,即使发生故障也能继续读取binlog,一次处理。

  2. DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。

  3. Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监控单个表的更改。

CDC与Flink毕业版本

下表显示了 Flink® CDC 连接器和 Flink® 之间的版本映射:

Flink ® CDC 版本 Flink®版本_
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*
2.0.* 1.13.*
2.1.* 1.13.*
2.2.* 1.13.* , 1.14.*

Springboot项目整合Flink-CDC

(1)说明

按常理来说,一个正常的flink-job 最终我们并不会集成到springboot项目中,我们会直接编写一个maven项目,在发布时使用flink程序来启动任务

比如官网示例:

本文即要使用flink-cdc进行数据变更捕获 (可以视作为一个flink-job),但又要契合我们的springboot项目,使用spring的特性,因此,我们需要转换一下思路,转换成什么样子呢?就是不要将这个flink-cdc作为一个job 使用flink程序进行发布提交,我们就当它在我们开发时一样,作为一个本地项目,main方法启动

(2)引入依赖

flink客户端版本使用 1.13.6 cdc 版本使用 2.0.0

    <properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency></dependencies>

(3)接入springboot项目

无法简单的使用main方法来启动cdc 作业,因为如果这样的话,我们就无法与spring完美的契合

因此我们可以利用springboot的特性, 实现 ApplicationRunner 将flink-cdc 作为一个项目启动时需要运行的分支子任务即可

创建监听类 实现 ApplicationRunner

package com.leilei.mysql;import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;/*** @author lei* @create 2022-08-25 13:42* @desc mysql变更监听**/
@Component
public class MysqlEventListener implements ApplicationRunner {private final DataChangeSink dataChangeSink;public MysqlEventListener(DataChangeSink dataChangeSink) {this.dataChangeSink = dataChangeSink;}@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);streamSource.addSink(dataChangeSink);env.execute("mysql-stream-cdc");}/*** 构造变更数据源** @param* @return DebeziumSourceFunction<DataChangeInfo>* @author lei* @date 2022-08-25 15:29:38*/private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {return MySqlSource.<DataChangeInfo>builder().hostname("10.50.40.145").port(3306).databaseList("paas_common_db").tableList("paas_common_db.base_business_driver_score_*").username("root").password("cdwk-3g-145")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)*/.startupOptions(StartupOptions.latest()).deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();}
}

自定义数据读取解析器

我这里解析为一个数据变更对象

package com.leilei.mysql;import com.alibaba.fastjson.JSON;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import com.alibaba.fastjson.JSONObject;import java.util.List;
import java.util.Optional;/*** @author lei* @create 2022-08-25 13:43* @desc mysql消息读取自定义序列化**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化数据,转为变更JSON对象* @param sourceRecord* @param collector* @return void* @author lei* @date 2022-08-25 14:44:31*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());//5.获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;dataChangeInfo.setEventType(eventType);dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.输出数据collector.collect(dataChangeInfo);}/**** 从袁术数据获取出变更之前或之后的数据* @param value* @param fieldElement* @return JSONObject* @author lei* @date 2022-08-25 14:48:13*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);}
}

变更对象

import lombok.Data;/*** @author lei* @create 2022-08-25 14:33* @desc 数据变更对象**/
@Data
public class DataChangeInfo {/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;/*** 变更类型 1新增 2修改 3删除*/private Integer eventType;/*** binlog文件名*/private String fileName;/*** binlog当前读取点位*/private Integer filePos;/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private Long changeTime;}

自定义sink 交由spring管理

package com.leilei.mysql;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.springframework.stereotype.Component;/*** @author lei* @create 2022-08-25 14:01* @desc**/
@Component
@Log4j2
public class DataChangeSink implements SinkFunction<DataChangeInfo> {@Overridepublic void invoke(DataChangeInfo value, Context context) {log.info("收到变更原始数据:{}", value);// todo 数据处理;因为此sink也是交由了spring管理,您想进行任何操作都非常简单}
}

当然,以上仅仅只是整合思路,如果你想使用flink-cdc 进行数据同步或日志记录等,结合您自身的需求进行调整接口,以上内容,大的架子是没问题的

如果遇到问题,可以先从官网QA寻找:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

项目源码:springboot-flink-cdc

springboot集成flink-cdc相关推荐

  1. Flink CDC + OceanBase 全增量一体化数据集成方案

    本文整理自 OceanBase 技术专家王赫(川粉)在 5 月 21 日 Flink CDC Meetup 的演讲.主要内容包括: OceanBase 介绍 Flink CDC OceanBase C ...

  2. Flink CDC 新一代数据集成框架

    前言: 主要讲解了技术原理,入门与生产实践,主要功能:全增量一体化数据集成.实时数据入库入仓.最详细的教程.Flink CDC 是Apache Flink的一个重要组件,主要使用了CDC技术从各种数据 ...

  3. Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成

    Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成 一.环境准备 1.1 软件版本 Flink 1.14.4Scala 2.11CDH 6. ...

  4. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  5. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  6. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  7. 数据湖架构Hudi(五)Hudi集成Flink案例详解

    五.Hudi集成Flink案例详解 5.1 hudi集成flink flink的下载地址: https://archive.apache.org/dist/flink/ Hudi Supported ...

  8. XTransfer技术专家亮相Flink CDC Meetup

    背景信息:Flink CDC 是实时数据集成框架的开源代表,具有全增量一体化.无锁读取.并发读取.分布式架构等技术优势,在开源社区中非常受欢迎. 为促进 Flink CDC 技术的交流和发展,社区于 ...

  9. Flink CDC MongoDB Connector 的实现原理和使用实践

    本文整理自 XTransfer 资深 Java 开发工程师.Flink CDC Maintainer 孙家宝在 Flink CDC Meetup 的演讲.主要内容包括: MongoDB Change ...

最新文章

  1. linux 瞬间文件数没了,关于linux:如何快速汇总文件中的所有数字?
  2. 聊一聊跨域,Vue向Django请求数据的一些问题
  3. 简述Linux和Windows下Python搭建步骤
  4. 什么是B/S模式?什么是C/S模式?
  5. 第十五届全国大学生智能车全国总决赛获奖信息-东北赛区
  6. cpp 怎么连接mysql_C++连接mysql数据库的两种方法
  7. Linux学习笔记(一):常用命令(2)
  8. 输出等边三角形php,php打印三角星星方法实列
  9. jvm破坏双亲委派_破坏JVM
  10. php session和cookie区别,php中session和cookie的区别是什么?
  11. 2022年APP软件游戏应用网站Pbootcms模板源码+支持WAP
  12. ETCD数据库源码分析——etcdserver bootstrap初始化存储
  13. OC block的回环引用
  14. 别翻了,Lambda 表达式入门,看这篇就够了
  15. vue实现登录时的图片验证码(纯前端)
  16. HTML5中的data-id与id
  17. 基于MATLAB的神经网络进行手写体数字识别(含鼠绘GUI / 数据集:MNIST)
  18. 计算机中文期刊abc类,计算机类国际期刊列表:
  19. matlab abel变换图片处理
  20. icp备案是什么意思?什么是ICP备案?

热门文章

  1. 7-12 个位数字统计 (15 分)
  2. 深圳市威视爱普手术示教系统
  3. 【教程】手机远程连接另一台手机的termux进行拍照
  4. Ac-RADADADARADADADA-NH2
  5. 关于台电X16 plus (Tpad)安装win10系统
  6. Linux下cp和scp的详细说明及其他们的区别
  7. 联想第一季度业绩超预期,增长势头强劲
  8. 一篇文章带你玩转C语言基础语法5:条件判断 if else 语句与分支 。(千字总结)
  9. A pseudo attribute name is expected.解决方法
  10. stm32f4内部flash讲解