FlinkCdc从Mysql指定的binlog日志offsetPos位置开始读取数据
flinkCdc1.4.0版本有specificOffset方式指定binlog日志的位置开始读数据, 新版本测试还未支持该功能。
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;public class MySqlSourceExample {public static void main(String[] args) throws Exception {//show binary logs;//show master status;String offsetFile = "mysql-bin.000008";//show binlog events IN 'mysql-bin.000008' FROM 154 ;int offsetPos = 219; //154 219 504Properties prop = new Properties();prop.setProperty("snapshot.locking.mode", "none");SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("127.0.0.1").port(3307).databaseList("cdc_test") // monitor all tables under inventory database.tableList("cdc_test.test1") // set captured table.username("root").password("mysql").serverTimeZone("UTC") //时区//设置读取位置 initial全量, latest增量, specificOffset(binlog指定位置开始读,该功能新版本暂未支持).startupOptions(StartupOptions.specificOffset(offsetFile, Integer.valueOf(offsetPos)))
// .startupOptions(StartupOptions.initial())
// .startupOptions(StartupOptions.latest()).debeziumProperties(prop).deserializer(new StringDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print("====>").setParallelism(1);env.execute();}
}
不指定位置默认全量读,可以读取全部5条数据,
向cdc_test表插入两条数据后,
查看binlog日志的开始位置分别为219, 504
测试设置binlog日志位置为504,只能读取到一条记录(id=7的那条)
参考:
Flink CDC2.0快速上手demo示例(Jar对接,非SQL)_北鹤M的代码手账-CSDN博客重要! Flink CDC 这个名字带着Flink,但是注意!!!我们本地搭建简易demo的时候不需要下载flink环境就可以本地跑起来。 FlinkCDC本质就是一个jar包,引入后写个main方法即可展现它的简单功能。(因为需要Canal切FlinkCDC,小白的我把FlinkCDC当成了类似于ik分词的插件,居然先去下载了Flink...)准备工作您需要会使用maven≤FlinkCDC 1.4.0版本可以用MySQL5.6+,FlinkCDC必须5.7+ F...https://blog.csdn.net/qq_20051535/article/details/121071915Flink CDC 如何从指定位置拉取消息_北鹤M的代码手账-CSDN博客场景已知binlog的文件名和pow编号,希望从指定的位置拉消息代码在连接的时候加上这句.startupOptions(StartupOptions.specificOffset("mysql-bin.000013", 1260))https://blog.csdn.net/qq_20051535/article/details/121072733
FlinkCdc从Mysql指定的binlog日志offsetPos位置开始读取数据相关推荐
- mysql正确清理binlog日志的方法
MySQL中的binlog日志记录了数据库中数据的变动,便于对数据的基于时间点和基于位置的恢复,但是binlog也会日渐增大,占用很大的磁盘空间,因此,要对binlog使用正确安全的方法清理掉一部分没 ...
- MYSQL专题-使用Binlog日志恢复MySQL数据
大家有没有碰到过由于误操作把测试数据库的一张表给删除了,导致测试的数据都被删除了,然后手足无措,测试把你一定数落,顿时感觉自己要死了?今天就教你即使误删了也可以将删除的数据恢复,以后误删再也不用惊吓了 ...
- MySQL中的binlog日志
binlog是mysql的日志工具,binlog日志可以记录insert.update.delete的sql和操作时间.因为log数据是二进制格式的,所以称为binary log,即binlog. 文 ...
- Mysql 如何通过binlog日志恢复数据
一.起因: 由于误删数据,造成服务报错,经排查发现误删了一个表,造成数据不一致.但由于时间较近,范围不太确定,所以采用mysqlbinlog日志进行时间恢复. 二.恢复: 1.将前一天的mysql-b ...
- mysql日志备份的脚本_脚本备份MySQL数据库和binlog日志
用Mysqldump实现全库备份+binlog的数据还原 首先是为mysql做指定库文件的全库备份 vim mysqlbak.sh #!/bin/bash #定义数据库目录,要能找到mysqldump ...
- MySQL主动清理binlog日志
在使用时候发现binlog日志过大,又不想修改配置重启情况下使用这种方式可以快速清理日志. 主动清理一天前日志 PURGE MASTER LOGS BEFORE DATE_SUB( NOW( ), I ...
- MySQL:偏移量为 0x000000000ae000 的位置执行 读取 期间,操作系统已经向 SQL Server 返回了错误 21(设备未就绪。)。
环境:SQL Server 问题是:在文件 'D:\Test\db.mdf' 中.偏移量为 0x000000000ae000 的位置执行 读取 期间,操作系统已经向 SQL Server 返回了错误 ...
- Mysql是否开启binlog日志开启方法
运行sql show variables like 'log_bin'; 如果Value 为 OFF 则为开启日志文件 如何开启mysql日志? 找到my,cnf 中 [mysqld] 添加如下 ...
- mysql binlog purge_正确清理mysql binlog日志方法
MySQL中的binlog日志记录了数据库中数据的变动,便于对数据的基于时间点和基于位置的恢复,但是binlog也会日渐增大,占用很大的磁盘空间,因此,要对binlog使用正确安全的方法清理掉一部分没 ...
最新文章
- 数字经济时代,险企如何构建数字化经营体系实现突围?
- STL - bitset
- 【声入人心:音频新体验】
- ACM及各类程序竞赛专业术语
- sql怎么撤回update_零基础快速自学SQL,2天足矣!
- 要来了!国内安卓统一推送标准将于3月开启测试
- 想学IT的必看!深度解析跳槽从开始到结束完整流程万字长文!
- c ++ 打印二进制_C / C ++中的二进制搜索树
- 网站暴库原理与方法剖析
- vue实现轮播图(每隔两秒自动翻页、翻到末页自动返回首页、点击按钮左右翻页)
- 在win7和win10上通过INF文件安装64位WDM驱动
- nRF51822 SPI 驱动 ADXL362
- 5python 体脂率计算(优化版)
- Java就业方向有哪些?
- 混合现实门户SteamVR环境下
- 关于公共IP地址(公网ip):您需要知道的一切
- nodejs使用addon调用c/c++
- 圆角装饰条_护角条是圆角好还是直角好
- 算法:扑克牌顺序问题
- Java基础看这篇就足够用了(基础知识汇总)