相关环境:

Oracle 11g:11.2.0.1.0 

MySQL:8.0.22

前期准备:

1、打开Oracle的logminer

a.在SQL Shell中,以具有DBA的用户身份登录数据库:

sqlplus /nolog;conn / as sysdba;

b.检查数据库日志记录模式:

select log_mode from v$database;

如果查询结果是ARCHIVELOG,则以下操作都不用执行,如果命令返回NOARCHIVELOG,请继续执行以下步骤:

c.关闭数据库:

shutdown immediate;

d.启动并挂载数据库:

startup mount;

f.启用存档,打开数据库,并使其可写:

alter database archivelog;
alter database open read write;

这里要注意,如果是Linux中通过docker容器安装的Oracle要先更换为oracle用户,再进入容器中进行修改,外部直接sqlplus连接的话会报错。

docker exec -it oracle11g bash

2、启用补充日志

a.要验证是否为数据库启用了补充日志记录,请在SQL Shell中运行以下命令:

SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;

b.状态都为no则执行以下指令:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;

3、创建新Oracle用户并授权

开启补充日志之后为创建新的用户帐户用于Oracle CDC客户端源,并为用户 streamsets 授权,

CREATE USER streamsets IDENTIFIED BY streamsets;
GRANT create session, alter session, execute_catalog_role, select any dictionary, select any transaction, select any table to streamsets;
GRANT select on GV_$DATABASE to streamsets;
GRANT select on V_$ARCHIVED_LOG to streamsets;
GRANT select on V_$LOGMNR_CONTENTS to streamsets;
GRANT select on v_$logmnr_parameters to streamsets;

后面测试创建表后,插入数据报错:

ORA-01950: no privileges on tablespace 'USERS'

因为没有Resource权限,所以需要执行:

GRANT Resource to streamsets;GRANT select on <db>.<table> TO <user name>;

Streamsets管道配置:

1、在Streamsets中创建新的管道

a.通过 192.168.XXX.XXX:18630 打开Streamsets

默认账号密码为admin - admin

b.登录后在左边创建新的管道

2、编辑新建的管道

因为业务是将Oracle的数据实时同步到MySQL中,所以在Origins中选择Oracle CDC Client,在目的地Destinations中选择JDBC Producer。然后将两者相连:

3、导入所需的jdbc驱动包

在StreamSets界面中,点击右上角的Package Manager,然后右边的框拉到最下面选择External Libraries导入外部的jar包:

如果导入报错时,解决步骤可以看这里:导入外部库

4、配置 Oracle CDC Client

注意:
A.这里的要注意一下 Oracle CDC Client支持处理以下数据库中的数据:
    Oracle 11g,12c,18c和19c
    Oracle Real Application Clusters(RAC)12c,18c和19c
B.需要在Oracle终端启动LogMiner,总结数据库活动,使用这些日志来生成记录。LogMiner要求数据库是打开的,可写的,并且在启用归档的情况下处于ARCHIVELOG模式。

a.首先在Oracle 11g中创建一个学生表

命名为student:

然后插入一些测试数据:

b.编辑Oracle CDC中的内容

这里图中的Table Name Pattern是错误的。最后章节错误二有遇到的问题与解决方案。

Tables 配置为ORACLE DATABASE中的定义的测试表STREAMSETS.student,注意ORACEL大小写敏感。

Initial Change配置默认是 From Latest Change,实时数据同步只需要捕获变化数据。

Operation配置为Hbase对应支持的CRUD操作类型,匹配ORACLE Database中实时变化数据的增删改操作事务操作。

这里改一下时区。

c.编辑JDBC配置内容

JDBC Connection String中输入jdbc的连接信息:

jdbc:oracle:thin:@192.168.105.77:helowinXDB

这里图中的JDBC连接信息是错误的。最后章节错误一有遇到的问题与解决方案。

d.编辑Credentials

填写数据库的用户名与密码:

5、配置 JDBC Producer 组件

a.编辑General Property

Required Fields:必填字段,是必须存在于记录中以允许其进入处理阶段的字段。当记录不包含所有必填字段时,将根据为管道配置的错误处理对其进行处理。您可以为任何处理器,执行程序和大多数目标阶段定义必需的字段。

Preconditions:必须评估为TRUE以允许记录进入处理阶段的条件。单击"+"以创建其他前提条件。根据为阶段配置的错误处理,处理不满足所有前提条件的记录。

On Record Error:阶段的错误记录处理:

  • Discard - 丢弃记录。
  • Send to Error - 将记录发送到管道以进行错误处理。
  • Stop Pipeline- 停止管道。

b.编辑JDBC

JDBC Connection String: 用于连接数据库的连接字符串,各数据库的连接配置使用以下格式

  • Mysql: jdbc:mysql://xxxxxxxxxx:3306/数据库名
  • Oracle: jdbc:oracle:thin:@xxxxxxxxxx:1521:服务名
  • PostgreSQL- jdbc:postgresql://<host>:<port>/<database_name>
  • SQL server: jdbc:sqlserver://xxxxxxxxxx:1433;databaseName=xxx

Schema Name :要使用的可选数据库或模式名称。在数据库需要完全限定的表名时使用。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。

Table Name :要使用的数据库表名称。使用数据库所需的表名格式。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。

Field to Column Mapping:用于覆盖默认字段到列映射。默认情况下,字段将写入同名的列。覆盖映射时,可以定义参数化值,以便在将字符值写入列之前将SQL函数应用于字段值。例如,要将字段值转换为整数,请为参数化值输入以下内容:

CAST(? AS INTEGER)

Enclose Object Names:写入数据库时​​,将数据库或模式名称,表名称和列名称括在引号中。允许使用区分大小写的名称或带有特殊字符的名称。未启用时,目标使用的JDBC驱动程序将确定名称的提交方式。Oracle JDBC驱动程序默认将名称提交为全部大写。此外,Oracle默认使用模式,表和列名称的全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。

Change Log Format:变更捕获数据的格式。处理更改捕获数据时使用。

Default Operation:如果未设置sdc.operation.type记录标头属性,则执行的默认增删改操作。

Unsupported Operation Handling :不支持在sdc.operation.type记录标头属性中定义的CRUD操作类型时要采取的操作:

  • Discard - 丢弃记录。
  • Send to Error - 将记录发送到管道以进行错误处理。
  • Use Default Operation - 使用默认操作将记录写入目标系统。

Use Multi-Row Operation:确定阶段如何处理记录。选择以一次启用插入和删除多个记录。在启用此选项之前,请验证数据库是否支持阶段使用的多行SQL语句。默认情况下,该阶段执行单行操作。

Max Cache Size Per Batch (Entries):定义多行插入的预准备语句中允许的参数数量。使用-1可禁用参数限制。默认值为-1。

Rollback Batch On Error:当批次中发生错误时,回滚整个批次。

Additional JDBC Configuration Propertie:要使用的其他JDBC配置属性。要添加属性,请单击“ + ”并定义JDBC属性名称和值,使用JDBC所期望的属性名称和值。

c.编辑Credentials

填写数据库的用户名与密码:


d.如果jdbc版本低于4.0,则需要配置Legacy Drivers

JDBC Driver Class Name:JDBC驱动程序的类名。早于版本4.0的JDBC版本必须填写。
Connection Health Test Query :可选查询,用于测试连接的运行状况。仅当JDBC版本低于4.0时建议使用。

e.编辑Advanced

Maximum Pool Size:要创建的最大连接数,默认值为1,建议值为1。

Minimum Idle Connections:要创建和维护的最小连接数。要定义固定连接池,请将其设置为与“Maximum Pool Size”相同的值,默认值为1。

Connection Timeout (Seconds):等待连接的最长时间。在表达式中使用时间常量来定义时间增量,默认值为30秒。

Idle Timeout (Seconds):允许连接空闲的最长时间。在表达式中使用时间常量来定义时间增量,使用0可以避免删除任何空闲连接,默认值为10分钟.

Max Connection Lifetime (Seconds):连接的最长寿命。在表达式中使用时间常量来定义时间增量,使用0设置无最大生命周期,默认值为30分钟

Transaction Isolation:用于连接数据库的事务隔离级别。默认值是为数据库设置的默认事务隔离级别。您可以通过将级别设置为以下任何一项来覆盖数据库默认值:

  • 读取已提交
  • 读取未提交
  • 可重复阅读

序列化
Init Query:在该组件第一次连接到数据库之后立即执行的SQL查询

6、启动管道流:

问题解决:

报错一:

启动时候报错了 OraclORA-12505, TNS:listener does not currently know of SID given in connect descriptor

jdbc连接数据库的时候,需要使用数据库的sid_name,而不是数据库的services_name.而使用plsql连接数据库的时候,只需要数据库的services_name即可,所以修改连接字符串中的services_name 为sid_name.
查询sid_name的语句:

select INSTANCE_NAME from v$instance;

在JDBC中修改即可:

报错二:

我Oracle数据修改以后,并没有在MySQL数据库中同步到数据,然后在错误中可以看到错误信息:

具体的报错内容如下:

JDBC_405 - Error while generating records: java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.addRecordsToQueue(OracleCDCSource.java:1151)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecords(OracleCDCSource.java:742)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$startGeneratorThread$5(OracleCDCSource.java:463)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.objectToField(OracleCDCSource.java:1926)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecord(OracleCDCSource.java:930)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$addRecordsToQueue$11(OracleCDCSource.java:1135)
    ... 4 more

将Oracle与MySQL的数据表中的字段都改为大写,则成功执行:

在Oracle中新增一条数据后:

MySQL中实时新增一条数据:

使用Streamsets将Oracle数据实时同步到MySQL中相关推荐

  1. SqlServer数据实时同步到mysql

    SqlServer数据实时同步到mysql 一.首先要装一个ODBC的驱动用来建立mysql的连接.http://www.mysql.com/products/connector/ 注意:一定要用32 ...

  2. python豆瓣mysql_Python3.5爬取豆瓣电视剧数据并且同步到mysql中

    #!/usr/local/bin/python # -*- coding: utf-8 -*- # Python: 3.5 # Author: zhenghai.zhang@xxx.com # Pro ...

  3. python爬豆瓣电视剧_Python3.5爬取豆瓣电视剧数据并且同步到mysql中

    #!/usr/local/bin/python#-*- coding: utf-8 -*-#Python: 3.5#Author: zhenghai.zhang@xxx.com#Program: 爬取 ...

  4. 爬取豆瓣评论连接mysql_Python3.5爬取豆瓣电视剧数据并且同步到mysql中

    #!/usr/local/bin/python#-*- coding: utf-8 -*-#Python: 3.5#Author: zhenghai.zhang@xxx.com#Program: 爬取 ...

  5. Oracle 数据怎么实时同步到 MySQL | 亲测干货分享建议收藏

      摘要: 这段时间负责一个老项目开发的数据库管理工作,这个项目中开发库与测试数据库分离,其中有些系统表数据与基础资料数据经常需要进行同步,相信很多 DBA 同学经常会遇到要从一个数据库实时同步到另一 ...

  6. ogg oracle 测试kafka_利用ogg实现oracle到kafka的增量数据实时同步

    前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标 ...

  7. Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步

    因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...

  8. oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战

    引言: <异构数据源的CDC实时同步系统> 系列第一篇 (已完成) <零编码打造异构数据实时同步系统--异构数据源CDC之2> 系列第二篇(已完成) <零编码打造异构数据 ...

  9. oracle oci.dll无法加载_基于OGG 实现Oracle到Kafka增量数据实时同步

    背景 在大数据时代,存在大量基于数据的业务.数据需要在不同的系统之间流动.整合.通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据.传统的数仓通过批量数据同步的 ...

最新文章

  1. spring security CSRF 问题 Invalid CSRF Token 'null' was found on ......
  2. hibernate系列之四
  3. MySQL(6)--- 创建数据库
  4. “跨界养猪”这件事,华为正式回应了
  5. Tree树结构java实现
  6. 【USACO】回文平方数
  7. python __file__ is not defined 解决方法
  8. python: 使用正则表达式的时候,传递参数的方法:
  9. Oracle P6培训系列:13分配限制条件
  10. TOGAF ADM 架构开发方法概述以及各个阶段的目的和交付物
  11. android 拼音搜索
  12. 计数器java代码_计数器的java代码
  13. 电脑版微信公众号文章加载不出来,空白的可能解决办法
  14. YY0709|听觉报警信号要求及测试方法
  15. Genlovy_Hoo大神的杰作
  16. 搜索引擎模块设计与实现——相关度搜索算法模块
  17. Log4j的基本应用
  18. 9.DNS和DNSmasq服务
  19. 外贸客户如何选择供应商:5R原则
  20. 矩阵/向量/标量间相互求导

热门文章

  1. Python 链表反转
  2. android 获取properties文件路径,读取local.properties文件
  3. 怎样修改MacOS默认输入法?
  4. 五款免费的网络管理工具
  5. linux拷贝目录并修改名字,linux复制文件夹、重命名文件夹、删除文件夹
  6. HITSZ智能证券投资报告(一)
  7. LRU算法,以及Apache LRUMap源码解析
  8. 【算法3】---滑动窗口(python)
  9. 一篇搞懂微信小程序以及和其他对比
  10. 服务器主机改装温7系统蓝屏,win7系统改装xp系统蓝屏怎么回事|win7换xp系统蓝屏解决方法...