测试使用的是flinkCdc2.1.1版本(无锁同步,1.x版本有同步锁)目前支持mysql5.7及以上版本;

要在mysql.cnf中配置开启开启mysql的bin-log日志,

log_bin=mysql-bin
binlog_format=ROW

并且只支持ROW格式,其他mixed和statement会报错 如果想配置控制binlog日志的范围,在数据库范围可以使用mysql的binlog-do-db和binlog-ignore-db,表级别可以使用replicate_wild_do_table, 不过不建议配置,因为在一些特殊情况下有可能造成同步的数据不完整,参考mysql中replicate_wild_do_table和replicate_do_db区别 - mofy - 博客园。

如果不配置binlog-do-db相当于所有的库都可以同步, 一旦配置了binlog-do-db则除了配置了binlog-do-db的库能同步binlog日志,其他的库都不能同步了。

修改mysql.cnf之后要重启数据库生效,重启之后可以查询binlog日志是否已经开启以及格式等信息;

-- 创建mysql用户
CREATE USER 'cdc'@'%' IDENTIFIED BY 'cdc';
-- 赋权
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, SHUTDOWN, PROCESS, FILE, REFERENCES, INDEX, ALTER, SHOW DATABASES, SUPER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER ON *.* TO 'cdc'@'%';-- # 是否启用binlog日志
show variables like 'log_bin';
-- # 查看 binlog 内容
show binlog events;/*# 事件查询命令
# IN 'log_name' :指定要查询的binlog文件名(不指定就是第一个binlog文件)
# FROM pos :指定从哪个pos起始点开始查起(不指定就是从整个文件首个pos点开始算)
# LIMIT [offset,] :偏移量(不指定就是0)
# row_count :查询总条数(不指定就是所有行) */
show binlog events [IN 'log_name'] [FROM pos] [LIMIT [offset,] row_count];-- # 设置binlog文件保存事件,过期删除,单位天
set global expire_logs_days=31; # mysqlbinlog查看binlog
mysqlbinlog --no-defaults --database=etl_test_1210 --base64-output=decode-rows -v --start-datetime='2021-12-03 09:00:00' --stop-datetime='2021-12-03 10:27:00' mysql-bin.000101

另外注意flinkcdc使用的mysql用户要有reload权限,否则读取不到存量数据,只能获取增量。

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc' IDENTIFIED BY 'cdc';

java代码


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlCdcPrint {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("127.0.0.1").port(3307).databaseList("cdc_test") // monitor all tables under inventory database.tableList("cdc_test.test1") // set captured table.username("root").password("123456").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(60000);  //checkpoint需要什么条件?com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.snapshotState()env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 4 parallel source tasks.setParallelism(1).print("最终数据===>").setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("MySqlCdcPrint");}
}

打印了表中的数据,并且新增insert数据后也会打印出来

flinkCdc的mysql配置及java测试代码_shy_snow的专栏-CSDN博客

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinkCdc</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.5</flink.version><debezium.version>1.5.4.Final</debezium.version><geometry.version>2.2.0</geometry.version><java.version>8</java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><slf4j.version>1.7.25</slf4j.version><log4j.version>2.16.0</log4j.version><spotless.version>2.4.2</spotless.version><!-- Enforce single fork execution due to heavy mini cluster use in the tests --><flink.forkCount>1</flink.forkCount><flink.reuseForks>true</flink.reuseForks><log4j.configuration>log4j2-test.properties</log4j.configuration><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.ververica</groupId><!-- add the dependency matching your database --><artifactId>flink-connector-mysql-cdc</artifactId><!-- the dependency is available only for stable releases. --><version>2.1.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><type>test-jar</type></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>${slf4j.version}</version><scope>compile</scope></dependency></dependencies>
</project>

flinkCdc的mysql配置及java测试代码相关推荐

  1. [Java] 使用Android Studio编写Java测试代码

    目录 一.创建"Java or Kotlin Library" 模组. 1.1 新建Android Project. 1.2 新建 "Java or Kotlin Lib ...

  2. apachejmeter_java源码_自定义编写jmeter的Java测试代码

    我们在做性能测试时,有时需要自己编写测试脚本,很多测试工具都支持自定义编写测试脚本,比如LoadRunner就有很多自定义脚本的协议,比如"C Vuser","JavaV ...

  3. jmeter之java代码性能测试_松勤软件性能测试-自定义编写的Java测试代码在Jmeter中如何使用...

    原标题:松勤软件性能测试-自定义编写的Java测试代码在Jmeter中如何使用 我们在做性能测试时,有时需要自己编写测试脚本,很多测试工具都支持自定义编写测试脚本,比如LoadRunner就有很多自定 ...

  4. MySQL万字总结(含测试代码)

    本文大部分笔记来源于:[狂神说Java]MySQL最新教程通俗易懂 1.MySQL 什么是数据库 数据库(DataBase,简称DB) 概念:长期存放在计算机内,有组织,可共享的大量数据的集合,是一个 ...

  5. Ubuntu14.04下配置OpenGL及测试代码

    ubuntu14.04 64位下,默认是没有安装OpenGL相关依赖库的,若安装,则依次执行如下几条命令即可: $ sudo apt-get update $ sudo apt-get install ...

  6. 科大讯飞语音转文字以及中文分词的Java测试代码

    我录了一段音存储在这个test.m4a文件里,语音内容为"测试一下Netweaver对于并发请求的响应性能". 使用如下Java代码进行测试: package com.iflyte ...

  7. MongoDB之Java测试代码(DAO层)

    MongoInit.java是数据库初始化及连接类 MongoUtils.java是对mongodb的各种操作方法 MongoInit.java package com.wlwcloud.datate ...

  8. sqlserver AlwaysOn实现读写分离配置及java/net代码实现

    1.用读写分离的原因: O.读写量很大,为了提升数据库读写性能,将读写进行分离: O.如果多机房下写少读多,同时基于数据一致性考虑,只有一个主库存入所有的数据写入,本地再做从库提供读取,减少多机房间直 ...

  9. java 测试代码效率_JAVA程序的性能测试方法

    JAVA程序的性能测试方法 Java 1.5以上都在虚拟机里内建了程序性能跟踪的功能,并提供了Java Profiling API,简称JPA,你可以搜索'java profiling'.Java也提 ...

最新文章

  1. 在MAC上搭建eclipse+android开发环境
  2. Redis存储总用String?你大概错过了更优的使用方法
  3. centos mysql root_CentOS下MySQL忘记root密码解决方法
  4. (2.19)备份与还原--备份的原理与总结
  5. 【Linux】一步一步学Linux——perl命令(264)
  6. 单点登录系统和CAS的简介
  7. STM32F103系统滴答计时器
  8. Java Calendar 基本用法
  9. 极简代码(五)—— 斐波那契数列
  10. Bootstrap框架---Uploadify插件----多张图片上传交互方式一
  11. 使用phpStudy搭建74cms(详)
  12. 多线段几何图形—— 简单几何图形(多边形三角形化)
  13. Android系统开发——WiFi Hotspot限速2M每秒
  14. 一些AUTOSAR会议论坛的介绍(附资料下载)
  15. 布局区块链数据中心的互联在线,积极筹备精选层
  16. 爬虫漫游指南:浏览器指纹
  17. 酒店市场消费趋势洞察报告
  18. 《拆解 XLNet 模型设计,回顾语言表征学习的思想演进》
  19. 学术期刊划分(SCI、EI、SSCI、IEEE、CSSCI等)
  20. 二分网络上的电影推荐

热门文章

  1. 微软新闻推荐:深度学习与 NLP 的融合, 让 Windows 11“新闻与兴趣”知你所需...
  2. 【报告分享】2021新青年国货消费研究报告:文化觉醒时代,宝藏国货迎复兴机遇.pdf(附下载链接)...
  3. 【报告分享】2020大中华区人工智能成熟度调研:解码2020,展望数字未来.pdf(附下载链接)...
  4. ICML 2019 | 强化学习用于推荐系统,蚂蚁金服提出生成对抗用户模型(附论文下载链接)...
  5. 【推荐实践】推荐系统中模型训练及使用流程的标准化
  6. 全球首发!惯性导航导论(剑桥大学)第四部分
  7. BST:Transformer在推荐领域的应用
  8. cydia源_Cydia错误提示解决大全
  9. 日常踩坑:pip安装包速度过慢,更换国内镜像源(亲测有效)
  10. CSS2中文文档下载