1.需要环境
zookeeper,小编安装环境为zookeeper-3.4.10
kakfa,小编安装环境为kafka_2.13-2.8.0
kafka-connect-oracle,此为kafka-connect的oracle实时同步开源工程,源码地址:

https://github.com/erdemcer/kafka-connect-oracle

confluent,小编安装环境为confluent-5.3.1,下载链接:

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

2.搭建说明
多种方式可以实现实时监听数据库DDL操作,小编选择通过如上三个组件,zookeeper、kafka、confluent搭建,另也可通过debezium的方式去搭建环境,感兴趣的可以再详细了解,confluent操作如下。

*2.1、*必须开启oracle归档日志。

*2.2、*基于oracle logminer的解析方式,对源库有一定影响,影响在5%以内。

*2.3、*上传jar包到/xxx/xx/xx/confluent/share/java/kafka-connect-jdbc:kafka-connect-oracle-1.0.jar,ojdbc7.jar,jsqlparser-1.2.jar,其中kafka-connect-oracle-1.0.jar为第一步kafka-connect-oracle源码jar包

*2.4、*cd /xxxx/xxxxx/confluent/etc/kafka-connect-jdbc 增加OracleSourceConnector.properties,内容如下:

name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=mscdw
tasks.max=1
topic=oracletokafka
db.name=orcl
db.hostname=ip
db.port=port
db.user=user
db.user.password=password
db.fetch.size=1
table.whitelist=MSCDW.*,MSCDW.CONFIG
parse.dml.data=true
reset.offset=false
start.scn=
multitenant=false
table.blacklist=

*2.5、*cd /xxxx/xxxxx/confluent/etc/schema-registry, 修改schema-registry下connect-avro-standalone.properties文件,内容如下

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

*2.6、*启动,按顺序执行

启动zookeeper
sh zkServer.sh start

停止zookeeper
sh zkServer.sh stop

启动kafka
/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

关闭kafka
/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

启动zookeeper和kafka后,进入confluent下,通过如下命令监听oracle数据ddl
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/OracleSourceConnector.properties

通过如下命令查看topic为oracletokafka的数据变化状态
./kafka-console-consumer.sh --bootstrap-server 172.16.50.22:9092 --topic oracletokafka --from-beginning

3.结果演示
如图所示,可以捕捉到DML操作类型OPERATION:insert、update、delete,对于update而言,data为修改后数据,before为修改前数据。
4.引言
当拿到kafka监听oralce的DML语句时,可以搭配flink实现数据的sink,将DML语句解析实时同步计算到任意数据库,如果是同数据源之间的数据同步,小编建议直接做主从,如果是不同数据源的同步,那通过以上方式再搭配flink确实很高效。

5.flink的sink
网盘链接,有对应的组件环境以及个人手册记录:

https://pan.baidu.com/s/15rM84nK0bRcHKYO28KorBg

提取码:gaq0
5.1、首先,小编flink版本使用1.13.1
5.2、其次,贴出一些需要用到的jar

<properties><java.version>1.8</java.version><fastjson.version>1.2.75</fastjson.version><druid.version>1.2.5</druid.version><flink.version>1.13.1</flink.version><scala.binary.version>2.12</scala.binary.version><HikariCP.version>3.2.0</HikariCP.version><kafka.version>2.8.0</kafka.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.flink.json</groupId><artifactId>flink-json</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>2.11-1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime_2.11</artifactId><version>${flink.version}**</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}**</version></dependency><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>${HikariCP.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency></dependencies>

5.3、贴出小编使用的测试表
oracle:mysql:
5.4、贴出flink-sql:
sink:

CREATE TABLE sinkMysqlConfig
(ID  VARCHAR,CRON VARCHAR
) WITH ('connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xxx:xxx/xxx', 'connector.table' = 'xxx','connector.username' = 'xxx','connector.password' = 'xxx', 'connector.write.flush.max-rows' = '1'
);

source:

CREATE TABLE sourceOracleConfig (payload ROW(SCN string,SEG_OWNER string,TABLE_NAME string,data ROW(ID string,CRON string))
) WITH ('connector.type' = 'kafka','connector.version' = 'universal',      'connector.topic' = 'xxx',          'connector.startup-mode' = 'earliest-offset',       'connector.properties.group.id' = 'xxx','connector.properties.zookeeper.connect' = 'xxx:2181','connector.properties.bootstrap.servers' = 'xxx:9092','format.type' = 'json','format.json-schema' =      --json format'{"type": "object","properties": {"payload":{type: "object","properties" : {"SCN"       : {type:"string"},"SEG_OWNER"  : {type:"string"},"TABLE_NAME" : {type:"string"},"data": {type : "object", "properties": {"ID"   : {type : "string"},"CRON" : {type : "string"}}}}}}}'
);

5.5、flinksqlclient演示,可跳过此步骤

5.6、贴出代码流程

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment  tableEnv = StreamTableEnvironment.create(env,settings);String sourceKafkaTable = String.format("CREATE TABLE sourceOracleConfig (\n" +"    payload ROW(SCN string,SEG_OWNER string,TABLE_NAME string,data ROW(ID string,CRON string))\n" +") WITH (\n" +"    'connector.type' = 'kafka',\n" +"    'connector.version' = 'universal',      \n" +"    'connector.topic' = 'xxx',          \n" +"    'connector.startup-mode' = 'earliest-offset',       \n" +"    'connector.properties.group.id' = 'xxx',\n" +"    'connector.properties.zookeeper.connect' = 'ip:2181',\n" +"    'connector.properties.bootstrap.servers' = 'ip:9092',\n" +"    'format.type' = 'json',\n" +"    'format.json-schema' =      --json format\n" +"    '{\n" +"        \"type\": \"object\",\n" +"        \"properties\": \n" +"        {\n" +"           \"payload\":\n" +"           {type: \"object\",\n" +"                   \"properties\" : \n" +"                   {\n" +"\t\t\t\t\t\t\"SCN\" \t\t : {type:\"string\"},\n" +"\t\t\t\t\t\t\"SEG_OWNER\"  : {type:\"string\"},\n" +"\t\t\t\t\t\t\"TABLE_NAME\" : {type:\"string\"},\n" +"\t\t\t\t\t\t\"data\": \n" +"\t\t\t\t\t\t{type : \"object\", \n" +"                               \"properties\": \n" +"                               {\n" +"                               \t\"ID\"   : {type : \"string\"},\n" +"                                \"CRON\" : {type : \"string\"}\n" +"                               }\n" +"                   \t\t}\n" +"           \t\t   }\n" +"           }\n" +"        }\n" +"    }'\n" +")");String sinkMysqlTable = String.format("CREATE TABLE sinkMysqlConfig \n" +"(\n" +"    ID  VARCHAR,\n" +"    CRON VARCHAR\n" +") WITH (\n" +"    'connector.type' = 'jdbc', \n" +"    'connector.url' = 'jdbc:mysql://ip:port/xxx', \n" +"    'connector.table' = 'xxx',\n" +"    'connector.username' = 'xxx',\n" +"    'connector.password' = 'xxx', \n" +"    'connector.write.flush.max-rows' = '1' \n" +")");System.out.println(sourceKafkaTable+"\n"+sinkMysqlTable);tableEnv.executeSql(sourceKafkaTable);tableEnv.executeSql(sinkMysqlTable);String sql = "insert into sinkMysqlConfig select payload.data.ID,payload.data.CRON from sourceOracleConfig";tableEnv.executeSql(sql);env.execute("FlinkSourceOracleSyncKafkaDDLSinkMysqlJob");

5.7、结果演示
当在oracle中新增数据后,发现mysql中对应表数据同步过来,自此oracle-mysql的数据同步测试demo验证完毕。最后打包将任务提交在flink web中。

flink实时消费kafka中oracle的DML数据写入mysql相关推荐

  1. Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

    Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据 一.引入flink相关依赖 二.properties保存连接kafka的配置 三.构建flink实时消费环境 ...

  2. 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)

    接    使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...

  3. 使用Flink时从Kafka中读取Array[Byte]类型的Schema

    使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema: val myConsumer = new FlinkKafkaConsumer08[String](&qu ...

  4. flink实时流遇到的问题排查——部分数据未落库redis问题

    flink实时流遇到的问题排查 1.技术和环境 2.问题表述 3.简化的代码 4.问题排查思路 5.结论 6.后续补充 1.技术和环境 技术:kafka.zookeeper.DataStream.re ...

  5. Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现

    需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog. MongoDB中的存储数据的文 ...

  6. java 写oracle clob_Java将数据写入Oracle的Clob字段中

    ---------------------将大量数据写入Clob字段中------------- 一.得到连接 def dbDriver = "oracle.jdbc.driver.Orac ...

  7. flink批处理访问mysql_Flink 异步IO访问外部数据(mysql篇)

    最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上用的时候,可以不用现去找了. 最开始想用scala 实现 ...

  8. icc校色文件使用教程_Flink教程-flink 1.11使用sql将流式数据写入文件系统

    滚动策略 分区提交 分区提交触发器 分区时间的抽取 分区提交策略 完整示例 定义实体类 自定义source 写入file flink提供了一个file system connector,可以使用DDL ...

  9. php怎么将往txt文档中,python如何将数据写入txt

    版权所有:https://wWw.pxcodeS.compython将数据写入txt文本文件:首先打开txt文件:然后向文件写入数据,代码为[file_handle.write('hello word ...

最新文章

  1. deepin系统引导_国产 Linux 发行版 深度操作系统 20 正式版发布
  2. 注册界面翻译_B站UP主自制的开源OCR翻译器走红Github,用一次就粉了
  3. nyoj - 947(Max Xor)字典树
  4. $ npm install时候报出:npm ERR! Unexpected token in JSON at position 10841
  5. 如何使用实时计算 Flink 搞定实时数据处理难题?
  6. MyBatis 源码解读-typeAliasesElement()
  7. linux配置4g网络命令_树莓派移动网络连接(配置4G网卡)
  8. Linux 多核下绑定硬件中断到不同 CPU(IRQ Affinity)
  9. dbcc dbreindex server sql_SQL Server性能的提高,可通过DBCC DBREINDEX重建索引
  10. 系统架构设计师-软件水平考试(高级)-理论-项目管理
  11. Eclipse如何使用Git完成代码比对并提交操作
  12. 一张图看懂Resnet50与Resnet101算法
  13. 关于SketchUp 2017版本安装之后一打开就会发送错误报告的问题
  14. JUCE 中的音频编解码
  15. 计算机网络教室验收标准,计算机网络教室建设和配备要求
  16. 睡眠是锁定计算机怎么设置密码,电脑休眠锁屏怎么设置
  17. IDEA编译输出/控制台改为英文,运行信息报错信息改为英文
  18. 易语言注册机接码平台对接
  19. 4.9 数值分析: 牛顿下山法
  20. 9 个美观大气的后台管理系统

热门文章

  1. uni-app PDA激光扫条形码功能开发
  2. Noesis.Javascript中执行arttempllate读取运行错误
  3. 数据库系列之Oracle总结一
  4. 20230306xgs
  5. 社会各界对996的态度
  6. 机器学习的划分:监督学习、非监督学习、强化学习、进化学习 概述 (二)
  7. java计算机毕业设计足球赛会管理系统源码+数据库+lw文档+系统+部署
  8. linux硬盘4k对齐,linux查看硬盘4K对齐方法
  9. vue项目seo优化-预渲染prerender-spa-plugin配置
  10. 基础笔记:计算机组成原理(第二版)蒋本珊 清华大学出版社(未完成)