flinkCdc1.4.0版本有specificOffset方式指定binlog日志的位置开始读数据, 新版本测试还未支持该功能。

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;public class MySqlSourceExample {public static void main(String[] args) throws Exception {//show binary logs;//show master status;String offsetFile = "mysql-bin.000008";//show binlog events  IN 'mysql-bin.000008' FROM 154 ;int offsetPos = 219;  //154 219 504Properties prop = new Properties();prop.setProperty("snapshot.locking.mode", "none");SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("127.0.0.1").port(3307).databaseList("cdc_test") // monitor all tables under inventory database.tableList("cdc_test.test1") // set captured table.username("root").password("mysql").serverTimeZone("UTC")  //时区//设置读取位置 initial全量, latest增量,  specificOffset(binlog指定位置开始读,该功能新版本暂未支持).startupOptions(StartupOptions.specificOffset(offsetFile, Integer.valueOf(offsetPos)))
//                .startupOptions(StartupOptions.initial())
//                .startupOptions(StartupOptions.latest()).debeziumProperties(prop).deserializer(new StringDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print("====>").setParallelism(1);env.execute();}
}

不指定位置默认全量读,可以读取全部5条数据,

向cdc_test表插入两条数据后,

查看binlog日志的开始位置分别为219, 504

测试设置binlog日志位置为504,只能读取到一条记录(id=7的那条)

参考:

Flink CDC2.0快速上手demo示例(Jar对接,非SQL)_北鹤M的代码手账-CSDN博客重要! Flink CDC 这个名字带着Flink,但是注意!!!我们本地搭建简易demo的时候不需要下载flink环境就可以本地跑起来。 FlinkCDC本质就是一个jar包,引入后写个main方法即可展现它的简单功能。(因为需要Canal切FlinkCDC,小白的我把FlinkCDC当成了类似于ik分词的插件,居然先去下载了Flink...)准备工作您需要会使用maven≤FlinkCDC 1.4.0版本可以用MySQL5.6+,FlinkCDC必须5.7+ F...https://blog.csdn.net/qq_20051535/article/details/121071915Flink CDC 如何从指定位置拉取消息_北鹤M的代码手账-CSDN博客场景已知binlog的文件名和pow编号,希望从指定的位置拉消息代码在连接的时候加上这句.startupOptions(StartupOptions.specificOffset("mysql-bin.000013", 1260))https://blog.csdn.net/qq_20051535/article/details/121072733

FlinkCdc从Mysql指定的binlog日志offsetPos位置开始读取数据相关推荐

  1. mysql正确清理binlog日志的方法

    MySQL中的binlog日志记录了数据库中数据的变动,便于对数据的基于时间点和基于位置的恢复,但是binlog也会日渐增大,占用很大的磁盘空间,因此,要对binlog使用正确安全的方法清理掉一部分没 ...

  2. MYSQL专题-使用Binlog日志恢复MySQL数据

    大家有没有碰到过由于误操作把测试数据库的一张表给删除了,导致测试的数据都被删除了,然后手足无措,测试把你一定数落,顿时感觉自己要死了?今天就教你即使误删了也可以将删除的数据恢复,以后误删再也不用惊吓了 ...

  3. MySQL中的binlog日志

    binlog是mysql的日志工具,binlog日志可以记录insert.update.delete的sql和操作时间.因为log数据是二进制格式的,所以称为binary log,即binlog. 文 ...

  4. Mysql 如何通过binlog日志恢复数据

    一.起因: 由于误删数据,造成服务报错,经排查发现误删了一个表,造成数据不一致.但由于时间较近,范围不太确定,所以采用mysqlbinlog日志进行时间恢复. 二.恢复: 1.将前一天的mysql-b ...

  5. mysql日志备份的脚本_脚本备份MySQL数据库和binlog日志

    用Mysqldump实现全库备份+binlog的数据还原 首先是为mysql做指定库文件的全库备份 vim mysqlbak.sh #!/bin/bash #定义数据库目录,要能找到mysqldump ...

  6. MySQL主动清理binlog日志

    在使用时候发现binlog日志过大,又不想修改配置重启情况下使用这种方式可以快速清理日志. 主动清理一天前日志 PURGE MASTER LOGS BEFORE DATE_SUB( NOW( ), I ...

  7. MySQL:偏移量为 0x000000000ae000 的位置执行 读取 期间,操作系统已经向 SQL Server 返回了错误 21(设备未就绪。)。

    环境:SQL Server 问题是:在文件 'D:\Test\db.mdf' 中.偏移量为 0x000000000ae000 的位置执行 读取 期间,操作系统已经向 SQL Server 返回了错误 ...

  8. Mysql是否开启binlog日志开启方法

    运行sql   show variables like 'log_bin'; 如果Value 为 OFF 则为开启日志文件 如何开启mysql日志? 找到my,cnf 中 [mysqld]  添加如下 ...

  9. mysql binlog purge_正确清理mysql binlog日志方法

    MySQL中的binlog日志记录了数据库中数据的变动,便于对数据的基于时间点和基于位置的恢复,但是binlog也会日渐增大,占用很大的磁盘空间,因此,要对binlog使用正确安全的方法清理掉一部分没 ...

最新文章

  1. 数字经济时代,险企如何构建数字化经营体系实现突围?
  2. STL - bitset
  3. 【声入人心:音频新体验】
  4. ACM及各类程序竞赛专业术语
  5. sql怎么撤回update_零基础快速自学SQL,2天足矣!
  6. 要来了!国内安卓统一推送标准将于3月开启测试
  7. 想学IT的必看!深度解析跳槽从开始到结束完整流程万字长文!
  8. c ++ 打印二进制_C / C ++中的二进制搜索树
  9. 网站暴库原理与方法剖析
  10. vue实现轮播图(每隔两秒自动翻页、翻到末页自动返回首页、点击按钮左右翻页)
  11. 在win7和win10上通过INF文件安装64位WDM驱动
  12. nRF51822 SPI 驱动 ADXL362
  13. 5python 体脂率计算(优化版)
  14. Java就业方向有哪些?
  15. 混合现实门户SteamVR环境下
  16. 关于公共IP地址(公网ip):您需要知道的一切
  17. nodejs使用addon调用c/c++
  18. 圆角装饰条_护角条是圆角好还是直角好
  19. 算法:扑克牌顺序问题
  20. Java基础看这篇就足够用了(基础知识汇总)

热门文章

  1. 【白皮书分享】创新中国2030:释放技术红利,解锁增长动能-埃森哲.pdf(附下载链接)...
  2. LSTM训练过程与参数解读
  3. Python实战从入门到精通第十三讲——返回多个值的函数
  4. leetcode力扣75. 颜色分类
  5. 浙大 PAT 甲级1009
  6. 卷起来了,写了一套Tensorflow和Pytorch的学习笔记(20G/代码/PPT/视频)
  7. 外贸人不知道的Facebook广告营销技巧和营销工具
  8. 关于运营Tiktok账号的问题?
  9. wms智能仓储系统不可缺少?
  10. 吴恩达机器学习之单变量线性回归实现部分