Flink CDC 实现数据实时同步

1.什么是Flink_CDC

CDC 全称是 Change Data Capture(变化数据获取) ,它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。

2.Flink_CDC应用场景

1.数据同步:用于备份,容灾
2.数据分发:一个数据源分发给多个下游系统
3.数据采集:面向数据仓库/数据湖的ETL数据集成,是非常重要的数据源
3.传统实时数据获取与FlinkCDC数据实时获取
传统实时数据获取:

FlinkCDC实时数据获取:

对比:
Flink 1.11 引入了 Flink CDC,flink-cdc解决了普通的CDC必须通过kafka的问题,简化了流程,第一个图是普通的cdc的流程,通过cdc的工具将mysql的数据采集到kafka,在通过flink、sparkStreaming等流式计算写入到hbase、es,大数据湖等。流程相对复杂,flink-cdc做的就是可以省去普通cdc到kafka的过程。将采集、计算都在flink中完成

Flink_CDC优势:
1.Flink的操作者和SQL模块都比较成熟且易于使用
2.Flink的作业可以通过调整运算器的并行度来完成,易于扩展处理能力
3.Flink支持先进的状态后端(State Backends),允许访问大量的状态数据
4.Flink提供更多的Source和Sink等
5.Flink拥有更大的用户群和活跃的支持社区,问题更容易解决
6.Flink开源协议允许云厂商进行全托管深度定制,而Kafka Streams则只能由其自己部署和运营
7.和Flink Table/SQL模块集成了数据库表和变化记录流(例如CDC的数据流)。作为同一事物的两面,结果是Upsert Message结构(+I表示新增、-U表示记录更新前的值、+U表示记录的更新值、-D表示删除)

3.Flink CDC两种实现方式

1.FlinkDataStream_CDC实现:
利用Flink_CDC自带的连接资源,如MySQLSource通过设置hostname、port、username、password、database、table、deserializer、startupOptions等参数配置
实现获取CRUD数据变化日志

2.FlinkSQL_CDC实现:
通过FlinkSQL创建虚拟表获取关键字段的变化情况并且配置hostname、port、username、password、database、table等参数可以看到具体表数据的变化过程

注意:FlinkSQL_CDC2.0仅支持Flink1.13之后的版本

4.两种方式对比:

1.FlinkDataStream_CDC支持多库多表的操作(优点)
2.FlinkFlinkDataStream_CDC需要自定义序列化器(缺点)
3.FlinkSQL_CDC只能单表操作(缺点)
4.FlinkSQL_CDC自动序列化(优点)

5.FlinkCDC配置

·1.以DBA身份连接到数据库

su - oracle
cd /opt/oracle
sqlplus / as sysdba

·2.启用日志归档

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

注意:'/opt/oracle/oradata/recovery_area’对应自己的目录

·3.检查是否启用了日志归档

archive log list;

·4.为捕获日志的库或表启用日志归档

为表开启:inventory(库名).customers(表名)对应自己表ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;为库开启:
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

5.创建表空间

sqlplus / AS SYSDBA;
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;

注意:logminer_tbs用于下面
'/opt/oracle/oradata/SID/logminer_tbs.dbf’路径应该对应自己的

6.创建用户

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

7.为用户授权

CREATE TABLESPACE logminer_tbs DATAFILE '/data/dg/datafile/logminer_tbs.dbf' SIZE 1000M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;CREATE USER flinkcdc IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkcdc;
GRANT SET CONTAINER TO flinkcdc;
GRANT SELECT ON V_$DATABASE to flinkcdc;
GRANT FLASHBACK ANY TABLE TO flinkcdc;
GRANT SELECT ANY TABLE TO flinkcdc;
GRANT SELECT_CATALOG_ROLE TO flinkcdc;
GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
GRANT SELECT ANY TRANSACTION TO flinkcdc;
GRANT LOGMINING TO flinkcdc;
GRANT CREATE TABLE TO flinkcdc;
GRANT LOCK ANY TABLE TO flinkcdc;
GRANT ALTER ANY TABLE TO flinkcdc;
GRANT CREATE SEQUENCE TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;
GRANT SELECT ON V_$LOG TO flinkcdc;
GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
GRANT SELECT ON V_$LOGFILE TO flinkcdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;#其中有两条未执行成功,替换为下面两条
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc
exit;

到此CDC配置结束

7.DataStream模式代码演示


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import scala.Tuple7;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @since JDK 1.8*/
public class FlinkToOracle_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple7<Integer, String, String, Integer, String, String, Integer>> stream = env.addSource(new source());stream.addSink(new SinkOracle());stream.print();env.execute();}public static class SinkOracle extends RichSinkFunction<Tuple7<Integer,String,String,Integer,String,String,Integer>>  {private Connection conn;private PreparedStatement ps;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Class.forName("oracle.jdbc.driver.OracleDriver");conn= DriverManager.getConnection("jdbc:oracle:thin:@10.158.5.111:1521/orcl", "ods", "ods");ps = conn.prepareStatement("insert into TEST_OGG.WORKER(ID,NAME,SEX,AGE,DEPT,WORK,SALARY) values(?,?,?,?,?,?,?)");}@Overridepublic void invoke(Tuple7<Integer, String, String, Integer, String, String, Integer> value, Context context) throws Exception {ps.setInt(1, value._1());ps.setString(2, value._2());ps.setString(3, value._3());ps.setInt(4, value._4());ps.setString(5, value._5());ps.setString(6, value._6());ps.setInt(7, value._7());ps.execute();

8.Flink SQL 模式代码演示


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import scala.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @since JDK 1.8*/
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);TableResult tableResult = tableEnv.executeSql("CREATE TABLE WORK(" +"ID INT," +"NAME STRING," +"SEX STRING," +"AGE INT," +"DEPT STRING," +"WORK STRING," +"SALARY INT" +") WITH (" +"'connector'='oracle-cdc'," +"'hostname'='10.158.5.88'," +"'port'='1521'," +"'username'='ods'," +"'passwordd'='ods'," +"'database-name'='orcl'," +"'schema-name'='TEST_OGG'," +"'table-name'='WORKER'," +"'debezium.log.mining.continuous.mine'='true'," +"'debezium.log.mining.strategy'='online_catalog'," +"'debezium.database.tablename.case.insensitive'='false'," +"'scan.startup.mode'='latest-offset')" +"");Table resultTable = tableEnv.sqlQuery("select * from WORK");DataStream<Tuple2<Boolean, Row>> table2Datstream = tableEnv.toRetractStream(resultTable, Row.class);SingleOutputStreamOperator<Tuple7<String, String, String, String, String, String, String>> outputStream = table2Datstream.map(new MapFunction<Tuple2<Boolean, Row>, Tuple7<String, String, String, String, String, String, String>>() {@Overridepublic Tuple7<String, String, String, String, String, String, String> map(Tuple2<Boolean, Row> s) throws Exception {String s1 = s.f1.getField(0).toString();String s2 = s.f1.getField(1).toString();String s3 = s.f1.getField(2).toString();String s4 = s.f1.getField(3).toString();String s5 = s.f1.getField(4).toString();String s6 = s.f1.getField(5).toString();String s7 = s.f1.getField(6).toString();return new Tuple7<String, String, String, String, String, String, String>(s1,s2,s3,s4,s5,s6,s7);}});outputStream.addSink(new MySinkOracle220());outputStream.print();env.execute();}public static class MySinkOracle220 extends RichSinkFunction<Tuple7<String,String,String,String,String,String,String>>  {private Connection conn;private PreparedStatement ps;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Class.forName("oracle.jdbc.driver.OracleDriver");conn= DriverManager.getConnection("jdbc:oracle:thin:@10.158.5.222:1521/dbm", "ods", "123456");ps = conn.prepareStatement("insert into OGG.WORKER(ID,NAME,SEX,AGE,DEPT,WORK,SALARY) values(?,?,?,?,?,?,?)");}@Overridepublic void invoke(scala.Tuple7<String, String, String, String, String, String, String> value, Context context) throws Exception {ps.setString(1, value._1());ps.setString(2, value._2());ps.setString(3, value._3());ps.setString(4, value._4());ps.setString(5, value._5());ps.setString(6, value._6());ps.setString(7, value._7());ps.execute();}@Overridepublic void close() throws Exception {super.close();if (ps != null)ps.close();if (conn != null)conn.close();}}}

9 ·删除日志命令

su - oracle
rman target /
delete archivelog u ntil time 'sysdate' ;
crosscheck archivelog all;
delete noprompt expired archivelog all;

10.FlinkCDC问题总结

10.1CDC捕获日志问题

·initial()模式即获取创建表有史以来的日志,但是遇见布置CDC后的日志就报错
·latest()模式即获取最新的日志,但运行就报错
以上两个错误都是以下显示

错误说明:提示没有为该表设置日志归档

错误原因:cdc底层自动将配置的表名转为小写,而oracle日志的表名是大写,导致cdc无法找到配置表的日志,所以就报没有为该表配置日志归档,但这一步确实已经做过了

解决办法:
1.加配置文件
a)Stream模式:“database.tablename.case.insensitive”,“false”
b)SQL模式:‘debezium.database.tablename.case.insensitive’=‘false’
2.修改jar包底层源码,经测试失败,可能jar包依赖重,修改不到位,修改后整个项目都报错
3.升级oracle版本,据说12c版本不会出现该异常

10.2捕获数据延迟

OracleCDC的归档日志增长很快,且读取log慢,导致捕捉数据变化延迟较大
解决办法:
Stream模式:“log.mining.strategy”,“online_catalog”
“log.mining.continuous.mine” ,“true”
SQL模式: ‘debezium.log.mining.strategy’ = ‘online_catalog’
‘debezium.log.mining.continuous.mine’ = ‘true’、

10.3 日志乱码问题

如:{“scale”:0,“value”:“F3A=”},原值为6000
解决办法:
配置文件添加如下
properties.setProperty(“decimal.handling.mode”,“string”);

10.4 日志存储问题

配置时给定归档日志存储空间为10G,经测试,10G的内存很快就存满了,同时FlinkCDC官网也提示了归档日志会占用大量磁盘空间,建议定期清理过期的日志。是给予更大的内存还是定期删除日志?无论给予多大的磁盘,如果不定期清理,磁盘也会很快占满

总结

FlinkCDC有两种模式实现,FlinkDataStream模式相比FlinkSQL模式好处是可以监听多库和多表的组合,而FlinkSQL模式只能对单独一张表可进行监听;FlinkDataStream需要自己序列化,而FlinkSQL模式可以具体到关心变化的字段,不需要自己序列化。从这来看,两种模式均有优劣,从业务库的表较多来看,肯定FlinkDataStream模式更适合我们使用,直接对库进行设置日志归档,不用对每个表都设置归档日志,也不用对每个表都进行单独的监控。

Flink-----Flink CDC 实现数据实时同步相关推荐

  1. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  2. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  3. oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战

    引言: <异构数据源的CDC实时同步系统> 系列第一篇 (已完成) <零编码打造异构数据实时同步系统--异构数据源CDC之2> 系列第二篇(已完成) <零编码打造异构数据 ...

  4. binlog流程 mysql_小米 MySQL 数据实时同步到大数据数仓的架构与实践

    背景MySQL由于自身简单.高效.可靠的特点,成为小米内部使用最广泛的数据库,但是当数据量达到千万/亿级别的时候,MySQL的相关操作会变的非常迟缓:如果这时还有实时BI展示的需求,对于mysql来说 ...

  5. 小米 MySQL 数据实时同步到大数据数仓的架构与实践

    背景 MySQL由于自身简单.高效.可靠的特点,成为小米内部使用最广泛的数据库,但是当数据量达到千万/亿级别的时候,MySQL的相关操作会变的非常迟缓:如果这时还有实时BI展示的需求,对于mysql来 ...

  6. DB与ES混合应用之数据实时同步

    一.技术背景 DB与ES本质上是属于不同应用领域的数据库产品,混合应用在一起主要面临2个问题 : 同步实时性,数据在DB更新之后,需要多久才能更新到Elasticsearch,多久的时间是应用系统可以 ...

  7. 小米技术分享:Mysql数据实时同步实践

    背景 MySQL由于自身简单.高效.可靠的特点,成为小米内部使用最广泛的数据库,但是当数据量达到千万/亿级别的时候,MySQL的相关操作会变的非常迟缓:如果这时还有实时BI展示的需求,对于mysql来 ...

  8. Mysql数据实时同步实践

    关于小米内部使用的数据库你知道多少?(文末有福利) 往期文章回顾:Flink流式计算在节省资源方面的简单分析 背景 MySQL由于自身简单.高效.可靠的特点,成为小米内部使用最广泛的数据库,但是当数据 ...

  9. 使用Streamsets将Oracle数据实时同步到MySQL中

    相关环境: Oracle 11g:11.2.0.1.0  MySQL:8.0.22 前期准备: 1.打开Oracle的logminer a.在SQL Shell中,以具有DBA的用户身份登录数据库: ...

最新文章

  1. 近期活动盘点:统计学概论和医疗临床大数据分析讲座、24小时创新挑战:数字时代的人类健康与福祉...
  2. 如何高效的通过BP算法来训练CNN
  3. request.servervariables参数
  4. 终于有人把赌徒谬误讲明白了
  5. 医保费用监控指标体系建立(九)其他专项分析
  6. zabbix监控的配置
  7. 文件比较与同步工具——FreeFileSync
  8. FPGA学习笔记—UART,RS485串口通信(verilog)
  9. HTML css把图片变圆,HTML+CSS:圆形和圆角图片格式
  10. Springboot项目启动异常 org.springframework.beans.factory.UnsatisfiedDependencyException
  11. DNA甲基化可实现转座因子驱动的基因组扩增
  12. 华硕无双+2022款笔记本重装系统笔记
  13. Java——腐烂的橘子
  14. python 连接mysql_Python 连接MySQL
  15. Android系统10 RK3399 init进程启动(十八) isLoggable日志级别输出控制
  16. 解决can not access a member of class xxx with modifiers ““问题
  17. 3G无线上网分析、优惠以及推荐选择
  18. ECSHOP安装流程
  19. python爬虫抓包工具_「docker实战篇」python的docker爬虫技术-fiddler抓包软件详细配置(七)...
  20. 智慧农业,现代农业,数字农业-大数据应用,智慧农业方案,智慧农业-智能畜牧,智慧农业设施农业(大棚) 树形结构图,高标准农田(大田)树形结构图分为4层架构;

热门文章

  1. 万能数据库查询分析器使用技巧之(十六)
  2. Rejecting mapping update to [XXx] as the final mapping would have more than 1 type: 报错
  3. 解决:coursera视频加载不出来无法观看
  4. Spring各种注解 @PersistenceContext和@Resource @GetMapping、@PostMapping、@PutMapping、@DeleteMapping
  5. 没有处理程序要使用以下任何注释:javax.persistence.PersistenceContext
  6. html5网页流行色,Pantone 2018流行色:紫外光色(附紫色的UI设计作品)
  7. LightOJ1012-Guilty Prince
  8. 中北大学算法分析与设计实验报告一(BF算法)
  9. java json parser_Java JSONParser.parse方法代碼示例
  10. cmd命令创建文件文件夹