Flink CDC (Mysql为例)
背景
业务中经常出现一些千万乃至亿级别的大表,此时可能考虑分库分表(Sharding-JDBC、MyCat等方案),也常同步数据进入ES中;同步数据这一业务场景中,Flink CDC是一个很不错的解决方案。
方案
如mysql、postgresql、sqlserver等,flink cdc通过读取binlog日志(注意:请先开启binlog日志),进行数据同步,实时性较好。
对数据的解析和消费进行了二次封装,使用者只需增加简单的配置,实现FlinkConsumerListener接口,关注编写业务代码即可。
代码
show coding
flink: flink cdc 暂时支持mysql
测试demo
创建一个springboot项目
依赖引入(引入上述工程打包后的依赖)
<dependency><groupId>com.kwin</groupId><artifactId>flink</artifactId><version>0.0.1-SNAPSHOT</version></dependency>
配置文件
flink:pipeline-name: flinkCDCTestmysqlDataSource:- port: 3306hostname: 127.0.0.1databaseList:- flinktesttableList:- flinktest.studentusername: rootpassword: 123456
如上,针对flinktest数据库的student表进行binlog监听。
flinktest.student的消费者
student实体
import lombok.Data;/*** @author kwin* @Date 2022/7/25 18:27**/
@Data
public class Student {private Long id;private String name;private Integer age;private Integer maxInx;
}
消费者
import com.kwin.demo.server.module.flink.test.entity.Student;
import com.kwin.flink.sink.FlinkConsumerListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author kwin* @Date 2022/7/25 18:29**/
@Slf4j
@Component
public class StudentConsumerListener implements FlinkConsumerListener<Student> {@Overridepublic String getDBName() {return "flinktest";}@Overridepublic String getTable() {return "student";}@Overridepublic void insert(Student data) {System.out.println("insert: " + data);}@Overridepublic void update(Student srcData, Student destData) {System.out.println("update: \nsrc:" + srcData + "\ndest:" + destData);}@Overridepublic void delete(Student data) {System.out.println("delete:"+data);}
}
启动项目
flinktest.student修改数据时:
flinktest.student插入数据时:
flinktest.student删除数据时:
如上,使用者只需实现FlinkConsumerListener接口,即可对指定表的数据进行消费和业务逻辑操作。
Flink CDC (Mysql为例)相关推荐
- Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo
Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...
- Flink CDC 将MySQL的数据写入Hudi实践
Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...
- Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch
Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...
- Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成
Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成 一.环境准备 1.1 软件版本 Flink 1.14.4Scala 2.11CDH 6. ...
- Flink CDC 系列(1)—— 什么是 Flink CDC
Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...
- Flink CDC 实时同步mysql
前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,这些都可以考虑使用数据同步的方 ...
- 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...
- Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...
- flink cdc 2.2.1 mysql connector
报错 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent ...
最新文章
- 天翼云从业认证(4.7)天翼云安全基础实践
- PHPStorm无法保存个人设置 ctrl左键无法找到类
- C# 获取FormData数据
- iPhone之横竖屏与自动旋转
- python编程求n的阶乘_使用Python编程的阶乘
- 基于留一法的快速KNN代码
- 线性代数--向量的内积,正交,正交矩阵,规范正交,施密特正交化
- gin mysql_golang+gin+mysql构建RESTful API
- win10 C盘优化清理
- CEI Harpoon v1.3-ISO 1CD
- iPhone模拟器部分操作
- Mockplus 3.2前瞻,五大特色功能让你惊喜!
- 获得代理ippython_Python自动获取代理IP
- “鲨鱼仿生学”iGame主板即将迎来新形象
- Python_Dataframe_去除重复数据
- python数据收集整理教案_数据收集整理教学设计
- Android-UI 开源控件
- 求一个数组的非空子集
- 2021-05-23:pandas 新增sheet,不覆盖原来已经保存的sheet
- 企业信息化投入中咨询服务_咨询服务企业如何实施项目核算信息化建设
热门文章
- 字符串的冒泡排序c语言,c语言单链表冒泡排序_c语言字符串排序冒泡法_c语言链表排序...
- Android 图片异步加载
- AI医药论文解读:Modeling Polypharmacy Side Effects with Graph Convolutional Networks
- GridLayout、FlowLayout和BorderLayout布局
- java面向对象--超市购物程序
- android 仿手机助手下载进度条效果
- Linux文件信息中的rwx表示什么?Linux权限理解
- uni-app 实现 fullpage 组件(适用于微信小程序,h5等)
- 去掉iPhone safari下手机号码默认的下划线
- 百度快速收录服务(新网站长期不收录都可以优化)