整合flink-cdc实现实时读postgrasql
整合flink-cdc实现实时读postgrasql
什么是wal日志
wal日志即write ahead log预写式日志,简称wal日志。wal日志可以说是PostgreSQL中十分重要的部分,相当于oracle中的redo日志。
当数据库中数据发生变更时:
change发生时:先要将变更后内容计入wal buffer中,再将变更后的数据写入data buffer;
commit发生时:wal buffer中数据刷新到磁盘;
checkpoint发生时:将所有data buffer刷新的磁盘。
可以想象,如果没有wal日志,那么数据库中将会发生什么?
首先,当我们在数据库中更新数据时,如果没有wal日志,那么每次更新都会将数据刷到磁盘上,并且这个动作是随机i/o,性能可想而知。并且没有wal日志,关系型数据库中事务的ACID如何保证呢?
因此wal日志重要性可想而知。其中心思想就是:先写入日志文件,再写入数据
什么是复制槽
制槽(replication slot)在postgresql9.4版本中被引入,引入之初是为了防止备库需要的xlog日志在主库被删除,主库会会根据备库返回的信息确认哪些xlog已不再需要,,才能进行清理。同时主库不会移除那些导致恢复冲突的行,关于恢复冲突,前面有一篇文章讲到过可以通过设置hot_standby_feedback、max_standby_streaming_delay等参数进行预防,但是这些参数只有在主备关系正常时才能起到作用,而replication slot能够确保在主备断连后主库的xlog仍不被清理。
复制槽分为物理复制槽physical replication slot和逻辑复制槽logic replication slot。物理复制槽一般结合流复制一起使用,能够很好的保证备库需要的日志不会在主库删除,本文重点讨论逻辑复制槽。
Logic replication slots一般被用于逻辑异步复制,一个很好的应用就是用于异构数据库之间的逻辑复制。大致原理是将源端xlog进行解码,解析成具体sql,然后到目标端进行回放。支持逻辑解码需要将wal_level设置为logic,logic会在replica的基础上增加一些信息以支持逻辑解码,该模式会增大wal日志的数量,尤其是大量的update,delete操作的库。
需要关注的问题
对于逻辑复制槽,有下面几点需要注意:
①一个逻辑复制槽只能解码一个database,但是一个database可以有多个复制槽进行解码,同一个复制槽可能同时有多个接收端进行订阅。
②复制槽的消息只发送一次,同时它不关心接收端的状态,如果接收端执行失败,那么复制槽不会向前推进,接收端成功后继续从上次失败的位点继续进行消费。
③不支持DDL、列存、压缩表的解码,DDL一般需要需要创建额外的触发器来进行处理,但可以做到表级订阅。
④逻辑复制不能保证数据不丢失,不能用作数据容灾,但是可以用于数据迁移,在主库有大事务的情况下延迟较大。
⑤不使用的复制槽一定要及时删除。
注意
通过复制槽,从库订阅主库,可以保证从库在没有收到主库的日志之前,主库不会删除从库未读的部分。也因此不用的槽要即时删除,否则会导致日志积压
***flink-cdc***的就是通过创建复制槽订阅PG来实现实时监控数据库变化的。
flink-cdc配置以及使用
1.maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.flink</groupId> <artifactId>database</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.13.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <kafka.version>2.2.0</kafka.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--maven properties --> <maven.test.skip>false</maven.test.skip> <maven.javadoc.skip>false</maven.javadoc.skip> <!-- compiler settings properties --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> <spotless.version>2.4.2</spotless.version> <jaxb-api.version>2.3.1</jaxb-api.version> </properties><dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope></dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--log4j日志包 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.2.8.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>PgsqlToMysqlTest</mainClass> </transformer> </transformers> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> <compilerVersion>${maven.compiler.source}</compilerVersion> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> <configuration> <skipTests>${maven.test.skip}</skipTests> </configuration> </plugin> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <version>0.12</version> <configuration> <excludes> <exclude>.gitignore</exclude> <exclude>.travis.yml</exclude> <exclude>.asf.yaml</exclude> <exclude>README.md</exclude> </excludes> </configuration> </plugin> <plugin> <groupId>org.jacoco</groupId> <artifactId>jacoco-maven-plugin</artifactId> <version>0.8.7</version> <executions> <execution> <id>prepare-agent</id> <goals> <goal>prepare-agent</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.eluder.coveralls</groupId> <artifactId>coveralls-maven-plugin</artifactId> <version>4.3.0</version> <dependencies> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency> </dependencies> </plugin> <plugin> <artifactId>maven-checkstyle-plugin</artifactId> <version>2.17</version> <executions> <execution> <id>verify</id> <phase>verify</phase> <configuration> <configLocation>style/rmq_checkstyle.xml</configLocation> <encoding>UTF-8</encoding> <consoleOutput>true</consoleOutput> <failsOnError>true</failsOnError> <includeTestSourceDirectory>false</includeTestSourceDirectory> <includeTestResources>false</includeTestResources> </configuration> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>2.10.4</version> <configuration> <aggregate>true</aggregate> <reportOutputDirectory>javadocs</reportOutputDirectory> <locale>en</locale> </configuration> </plugin> <!-- Due to the Flink build setup, "mvn spotless:apply" and "mvn spotless:check" don't work. You have to use the fully qualified name, i.e. "mvn com.diffplug.spotless:spotless-maven-plugin:apply" --> </plugins> </build></project>
再看看代码怎么写
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class PgsqlToMysqlTest { public static void main(String[] args) throws Exception { // 设置flink表环境变量 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 获取flink流环境变量 StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); exeEnv.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation exeEnv.getCheckpointConfig() .enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); FsStateBackend stateBackend = new FsStateBackend("file:\\\\D:\\fsdata"); // // stateBackend.resolveCheckpoint("D:\\fsdata\\dda9ae98c2b864ba8448d2c5eee2e5c3\\chk-6"); // 固定延迟重启(最多尝试3次,每次间隔10s) // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 10000L)); // 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟) // exeEnv.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), // Time.minutes(1))); // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); exeEnv.setStateBackend(stateBackend); // exeEnv.setDefaultSavepointDirectory(); exeEnv.setParallelism(2); // 表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); // 拼接souceDLL String sourceDDL = "CREATE TABLE pgsql_source (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'postgres-cdc',\n" + " 'hostname' = '192.168.159.100',\n" + " 'port' = '5431',\n" + " 'username' = 'postgres',\n" + " 'password' = '123',\n" + " 'database-name' = 'postgres',\n" + " 'schema-name' = 'public',\n" + " 'debezium.snapshot.mode' = 'initial',\n" + " 'decoding.plugin.name' = 'pgoutput',\n" + " 'debezium.slot.name' = 'pgsql_source_li2',\n" + " 'table-name' = 'test_copy2_copy1'\n" + ")"; // 执行source表ddl tableEnv.executeSql(sourceDDL); String transformSQL = "select * from pgsql_source"; Table tableResult = tableEnv.sqlQuery(transformSQL); DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(tableResult, Row.class); dataStream.print(); // StreamGraph graph = new StreamGraph() exeEnv.execute("jobname"); // 执行sink表ddl // 执行逻辑sql语句 // TableResult tableResult = tableEnv.executeSql(transformSQL); // tableEnv.execute(""); // 控制塔输出 // tableResult.print(); }
}
说明两个配置
debezium.snapshot.mode = 'initial'
initial :默认设置,第一次启动创建数据库快照,后面根据记录偏移量继续读
never:从不建立快照,如果本地无偏移量,从最后的log开始读
always:每次启动都建立快照
exporter: 功能和inintial相同,不同之处在于其不会对表上锁,使用SET TRANSACTION ISOLATION LEVEL REPEATABLE READ,可重复读的隔离级别
实现类io.debezium.connector.postgresql.snapshot.ExportedSnapshotter
public Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) { return Optional.empty();
} public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { if (newSlotInfo != null) { String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; } else { return super.snapshotTransactionIsolationLevelStatement(newSlotInfo); }
}
custom :用户自定义 快照,配合debezium.snapshot.custom.class使用
什么是快照?
之前说过wal日志实际上会删除,因此单纯读wal日志,并不能读到全数据库的数据
,因此在第一次启动flink程序时,需要对数据库相应表做一个快照,将全表的数据拿到flink处理对应源码位置io.debezium.connector.postgresql.spi.Snapshotter
default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
} default Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) { String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); tableIds.forEach((tableId) -> { statements.append("LOCK TABLE ").append(tableId.toDoubleQuotedString()).append(" IN ACCESS SHARE MODE;").append(lineSeparator); }); return Optional.of(statements.toString());
}
可以看出快照需要锁表(exporter除外),IN ACCESS SHARE MODE说明锁表不影响插入读写,但是如果有全表更新操作, 会被阻塞。
开启串行,只读的事务。Snapshotter子类有这几种配置的实现,有兴趣的可以看看。
debezium.slot.name = 'pgsql_source_li2'
这就是flink-cdc创建的逻辑复制槽。
使用flink-cdc碰到的一些问题
1 能不能保证EXACTLY_ONCE一致性要求
假设在默认snapshot.mode=initial下在第一次启动程序时,会对数据路进行快照读,读取当前全量数据,后面根据记录的偏移量继续读取数据,这样既不丢失数据,也不重复读,是保证了EXACTLY_ONCE一致性的。即使flink程序重启,在启动的时候指定savePoint Path也可以继续之前的偏移量,读取到未接收的数据。
这里分享一个技巧,flink在本地Idea运行没有提供设置savepPoint的方法。
***org.apache.flink.client.deployment.executors.LocalExecutor#execute***方法中断点设置
2 在快照时数据一股脑读进flink,事件时间语义下,数据会不会乱序
如果我们对数据开窗计算,那么乱序可能导致窗口提前关闭导致数据丢失,而在对表做快照时,会将全表数据全部拿到flink处理,就很可能导致乱序数据产生,那么flink-cdc有没有解决这个问题呢,我们知道waterMark时周期成的(源码位置org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#onProcessingTime),
一种解决思路时,在waterMark还没生成之前,将全部快照数据处理掉,那么就不会丢失数据。
对于单一slot是单线程处理任务的,如果突然来了一批数据,那么生成waterMark的任务必须等到这批数据全部处理完毕才能继续。因此在批数据未处理完毕时,尽管批数据乱序,但是不存在窗口关闭,丢失数据问题
如果有多并行度多槽生成watermrk呢?
多并行度情况下,数据被分散到多个槽,并且不再是一次处理一批数据,处理数据和waterMark会一起生成,如果一次读一批,就可能会有丢数据的风险
因此从读数据源到设置waterMark设置单并行度1,那么就可以避免数据乱序导致的丢失数据问题
另一种思路也很简单,在数据库做快照时,对数据库进行排序。我们来看看flink-cdc有没有提供类似的接口。
看下io.debezium.connector.postgresql.spi.Snapshotter#buildSnapshotQuery快照查询的sql
public Optional<String> buildSnapshotQuery(TableId tableId) { StringBuilder q = new StringBuilder(); q.append("SELECT * FROM "); q.append(tableId.toDoubleQuotedString()); return Optional.of(q.toString());
}
很遗憾,并未提供排序的接口。
但是就没办法了吗?
还记得之前的自定义快照custom吗。
继承InitialSnapshotter自定义快照做简单排序
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
import io.debezium.relational.TableId;
import jdk.nashorn.internal.runtime.options.Option; import java.util.Optional; public class OrderSnapShoter extends InitialSnapshotter { @Override public Optional<String> buildSnapshotQuery(TableId tableId) { Optional<String> sql = super.buildSnapshotQuery(tableId); return Optional.of(sql.get() + "order by id"); }
}
配置改一下
'debezium.snapshot.mode' = 'custom''debezium.snapshot.custom.class' = 'xxx.OrderSnapShoter'
整合flink-cdc实现实时读postgrasql相关推荐
- Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
1.概览 这篇教程将展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Ic ...
- 4.3.2 Flink-流处理框架-Flink CDC数据实时数据同步-何为Flink CDC?
目录 1.写在前面 2.Flink CDC出现的动机 3.基于传统的CDC的ETL分析 4.基于Flink CDC的ETL分析 5.支持的版本和连接器 1.写在前面 CDC是一种可以捕获数据库变更的技 ...
- Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓
摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...
- StarRocks X Flink CDC,打造端到端实时链路
实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...
- 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...
- Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...
- XTransfer技术专家亮相Flink CDC Meetup
背景信息:Flink CDC 是实时数据集成框架的开源代表,具有全增量一体化.无锁读取.并发读取.分布式架构等技术优势,在开源社区中非常受欢迎. 为促进 Flink CDC 技术的交流和发展,社区于 ...
- Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖
在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...
- 基于Flink CDC打通数据实时入湖
作者 | 数据社 责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...
最新文章
- python pickle模块
- Java中实现多线程的两种方式之间的区别
- flash写保护原理_为什么固态会掉盘?著名的30分钟大法修复是什么原理?
- arcgis 分区 属性值_ArcGIS制图之Maplex自动点抽稀
- 240多个jQuey插件
- 似乎是发现了asp.net ajaxToolkit中TAB控件的一个BUG
- 如何解决Mac无法写入U盘的问题
- 果蔬连连看java源代码_《基于Qt的连连看游戏的设计》-毕业论文.doc
- gridview绑定数据mysql_【gridview控件】怎么绑定数据库并显示?
- iOS开发之UIAlertController的使用
- 百度宣布服务器硬盘,百度秘密拆空服务器硬盘改用SSD 谷歌与网易有道回应
- 【U+】友加畅捷U+通用财务清理操作员密码
- window cmd 命令大全 (order)
- 随机数种子(seed)
- GPT4论文翻译 by GPT4 and Human
- Latex数学公式方程格式总结
- 3500年里,印度被11个文明征服
- CUDA学习:Windows下的CUDA环境配置
- 基于u3d_FPS_Demo
- Oracle 权限详解(grant,revoke)
热门文章
- English语法_指示代词
- cee怎么把大图片放进小盒子_PS的实际应用:怎么给盒子制作包装图片
- 基于 Apache Druid 的实时分析平台在爱奇艺的实践
- 计算机主板i3 i5区别,装机那些事儿之挑选适合自己的CPU;i3/i5/i7的区别
- 基础SQL Server 操作问题——仅当使用了列表并且IDENTITY_INSERT为ON时,才能为表中的标识列制定显示值
- 一文梳理2020年大热的对比学习模型
- html5 加速球 效果,css 渐隐渐现、echarts 圆环图、百度地图覆盖物、echarts水球图(360加速球效果)...
- 开发日记 2017-02-06
- 这款小程序 能让你和孙悟空一样 可以七十二变
- SparkSQL之UDF、UDAF、UDTF