使用Streamsets将Oracle数据实时同步到MySQL中
相关环境:
Oracle 11g:11.2.0.1.0
MySQL:8.0.22
前期准备:
1、打开Oracle的logminer
a.在SQL Shell中,以具有DBA的用户身份登录数据库:
sqlplus /nolog;conn / as sysdba;
b.检查数据库日志记录模式:
select log_mode from v$database;
如果查询结果是ARCHIVELOG,则以下操作都不用执行,如果命令返回NOARCHIVELOG,请继续执行以下步骤:
c.关闭数据库:
shutdown immediate;
d.启动并挂载数据库:
startup mount;
f.启用存档,打开数据库,并使其可写:
alter database archivelog;
alter database open read write;
这里要注意,如果是Linux中通过docker容器安装的Oracle要先更换为oracle用户,再进入容器中进行修改,外部直接sqlplus连接的话会报错。
docker exec -it oracle11g bash
2、启用补充日志
a.要验证是否为数据库启用了补充日志记录,请在SQL Shell中运行以下命令:
SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;
b.状态都为no则执行以下指令:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;
3、创建新Oracle用户并授权
开启补充日志之后为创建新的用户帐户用于Oracle CDC客户端源,并为用户 streamsets 授权,
CREATE USER streamsets IDENTIFIED BY streamsets;
GRANT create session, alter session, execute_catalog_role, select any dictionary, select any transaction, select any table to streamsets;
GRANT select on GV_$DATABASE to streamsets;
GRANT select on V_$ARCHIVED_LOG to streamsets;
GRANT select on V_$LOGMNR_CONTENTS to streamsets;
GRANT select on v_$logmnr_parameters to streamsets;
后面测试创建表后,插入数据报错:
ORA-01950: no privileges on tablespace 'USERS'
因为没有Resource权限,所以需要执行:
GRANT Resource to streamsets;GRANT select on <db>.<table> TO <user name>;
Streamsets管道配置:
1、在Streamsets中创建新的管道
a.通过 192.168.XXX.XXX:18630 打开Streamsets
默认账号密码为admin - admin
b.登录后在左边创建新的管道
2、编辑新建的管道
因为业务是将Oracle的数据实时同步到MySQL中,所以在Origins中选择Oracle CDC Client,在目的地Destinations中选择JDBC Producer。然后将两者相连:
3、导入所需的jdbc驱动包
在StreamSets界面中,点击右上角的Package Manager,然后右边的框拉到最下面选择External Libraries导入外部的jar包:
如果导入报错时,解决步骤可以看这里:导入外部库
4、配置 Oracle CDC Client
注意:
A.这里的要注意一下 Oracle CDC Client支持处理以下数据库中的数据:
Oracle 11g,12c,18c和19c
Oracle Real Application Clusters(RAC)12c,18c和19c
B.需要在Oracle终端启动LogMiner,总结数据库活动,使用这些日志来生成记录。LogMiner要求数据库是打开的,可写的,并且在启用归档的情况下处于ARCHIVELOG模式。
a.首先在Oracle 11g中创建一个学生表
命名为student:
然后插入一些测试数据:
b.编辑Oracle CDC中的内容
这里图中的Table Name Pattern是错误的。最后章节错误二有遇到的问题与解决方案。
Tables 配置为ORACLE DATABASE中的定义的测试表STREAMSETS.student,注意ORACEL大小写敏感。
Initial Change配置默认是 From Latest Change,实时数据同步只需要捕获变化数据。
Operation配置为Hbase对应支持的CRUD操作类型,匹配ORACLE Database中实时变化数据的增删改操作事务操作。
这里改一下时区。
c.编辑JDBC配置内容
JDBC Connection String中输入jdbc的连接信息:
jdbc:oracle:thin:@192.168.105.77:helowinXDB
这里图中的JDBC连接信息是错误的。最后章节错误一有遇到的问题与解决方案。
d.编辑Credentials
填写数据库的用户名与密码:
5、配置 JDBC Producer 组件
a.编辑General Property
Required Fields:必填字段,是必须存在于记录中以允许其进入处理阶段的字段。当记录不包含所有必填字段时,将根据为管道配置的错误处理对其进行处理。您可以为任何处理器,执行程序和大多数目标阶段定义必需的字段。
Preconditions:必须评估为TRUE以允许记录进入处理阶段的条件。单击"+"以创建其他前提条件。根据为阶段配置的错误处理,处理不满足所有前提条件的记录。
On Record Error:阶段的错误记录处理:
- Discard - 丢弃记录。
- Send to Error - 将记录发送到管道以进行错误处理。
- Stop Pipeline- 停止管道。
b.编辑JDBC
JDBC Connection String: 用于连接数据库的连接字符串,各数据库的连接配置使用以下格式
- Mysql: jdbc:mysql://xxxxxxxxxx:3306/数据库名
- Oracle: jdbc:oracle:thin:@xxxxxxxxxx:1521:服务名
- PostgreSQL- jdbc:postgresql://<host>:<port>/<database_name>
- SQL server: jdbc:sqlserver://xxxxxxxxxx:1433;databaseName=xxx
Schema Name :要使用的可选数据库或模式名称。在数据库需要完全限定的表名时使用。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。
Table Name :要使用的数据库表名称。使用数据库所需的表名格式。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。
Field to Column Mapping:用于覆盖默认字段到列映射。默认情况下,字段将写入同名的列。覆盖映射时,可以定义参数化值,以便在将字符值写入列之前将SQL函数应用于字段值。例如,要将字段值转换为整数,请为参数化值输入以下内容:
CAST(? AS INTEGER)
Enclose Object Names:写入数据库时,将数据库或模式名称,表名称和列名称括在引号中。允许使用区分大小写的名称或带有特殊字符的名称。未启用时,目标使用的JDBC驱动程序将确定名称的提交方式。Oracle JDBC驱动程序默认将名称提交为全部大写。此外,Oracle默认使用模式,表和列名称的全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。
Change Log Format:变更捕获数据的格式。处理更改捕获数据时使用。
Default Operation:如果未设置sdc.operation.type记录标头属性,则执行的默认增删改操作。
Unsupported Operation Handling :不支持在sdc.operation.type记录标头属性中定义的CRUD操作类型时要采取的操作:
- Discard - 丢弃记录。
- Send to Error - 将记录发送到管道以进行错误处理。
- Use Default Operation - 使用默认操作将记录写入目标系统。
Use Multi-Row Operation:确定阶段如何处理记录。选择以一次启用插入和删除多个记录。在启用此选项之前,请验证数据库是否支持阶段使用的多行SQL语句。默认情况下,该阶段执行单行操作。
Max Cache Size Per Batch (Entries):定义多行插入的预准备语句中允许的参数数量。使用-1可禁用参数限制。默认值为-1。
Rollback Batch On Error:当批次中发生错误时,回滚整个批次。
Additional JDBC Configuration Propertie:要使用的其他JDBC配置属性。要添加属性,请单击“ + ”并定义JDBC属性名称和值,使用JDBC所期望的属性名称和值。
c.编辑Credentials
填写数据库的用户名与密码:
d.如果jdbc版本低于4.0,则需要配置Legacy Drivers
JDBC Driver Class Name:JDBC驱动程序的类名。早于版本4.0的JDBC版本必须填写。
Connection Health Test Query :可选查询,用于测试连接的运行状况。仅当JDBC版本低于4.0时建议使用。
e.编辑Advanced
Maximum Pool Size:要创建的最大连接数,默认值为1,建议值为1。
Minimum Idle Connections:要创建和维护的最小连接数。要定义固定连接池,请将其设置为与“Maximum Pool Size”相同的值,默认值为1。
Connection Timeout (Seconds):等待连接的最长时间。在表达式中使用时间常量来定义时间增量,默认值为30秒。
Idle Timeout (Seconds):允许连接空闲的最长时间。在表达式中使用时间常量来定义时间增量,使用0可以避免删除任何空闲连接,默认值为10分钟.
Max Connection Lifetime (Seconds):连接的最长寿命。在表达式中使用时间常量来定义时间增量,使用0设置无最大生命周期,默认值为30分钟
Transaction Isolation:用于连接数据库的事务隔离级别。默认值是为数据库设置的默认事务隔离级别。您可以通过将级别设置为以下任何一项来覆盖数据库默认值:
- 读取已提交
- 读取未提交
- 可重复阅读
序列化
Init Query:在该组件第一次连接到数据库之后立即执行的SQL查询
6、启动管道流:
问题解决:
报错一:
启动时候报错了 OraclORA-12505, TNS:listener does not currently know of SID given in connect descriptor
jdbc连接数据库的时候,需要使用数据库的sid_name,而不是数据库的services_name.而使用plsql连接数据库的时候,只需要数据库的services_name即可,所以修改连接字符串中的services_name 为sid_name.
查询sid_name的语句:
select INSTANCE_NAME from v$instance;
在JDBC中修改即可:
报错二:
我Oracle数据修改以后,并没有在MySQL数据库中同步到数据,然后在错误中可以看到错误信息:
具体的报错内容如下:
JDBC_405 - Error while generating records: java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.addRecordsToQueue(OracleCDCSource.java:1151)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecords(OracleCDCSource.java:742)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$startGeneratorThread$5(OracleCDCSource.java:463)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.objectToField(OracleCDCSource.java:1926)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecord(OracleCDCSource.java:930)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$addRecordsToQueue$11(OracleCDCSource.java:1135)
... 4 more
将Oracle与MySQL的数据表中的字段都改为大写,则成功执行:
在Oracle中新增一条数据后:
MySQL中实时新增一条数据:
使用Streamsets将Oracle数据实时同步到MySQL中相关推荐
- SqlServer数据实时同步到mysql
SqlServer数据实时同步到mysql 一.首先要装一个ODBC的驱动用来建立mysql的连接.http://www.mysql.com/products/connector/ 注意:一定要用32 ...
- python豆瓣mysql_Python3.5爬取豆瓣电视剧数据并且同步到mysql中
#!/usr/local/bin/python # -*- coding: utf-8 -*- # Python: 3.5 # Author: zhenghai.zhang@xxx.com # Pro ...
- python爬豆瓣电视剧_Python3.5爬取豆瓣电视剧数据并且同步到mysql中
#!/usr/local/bin/python#-*- coding: utf-8 -*-#Python: 3.5#Author: zhenghai.zhang@xxx.com#Program: 爬取 ...
- 爬取豆瓣评论连接mysql_Python3.5爬取豆瓣电视剧数据并且同步到mysql中
#!/usr/local/bin/python#-*- coding: utf-8 -*-#Python: 3.5#Author: zhenghai.zhang@xxx.com#Program: 爬取 ...
- Oracle 数据怎么实时同步到 MySQL | 亲测干货分享建议收藏
摘要: 这段时间负责一个老项目开发的数据库管理工作,这个项目中开发库与测试数据库分离,其中有些系统表数据与基础资料数据经常需要进行同步,相信很多 DBA 同学经常会遇到要从一个数据库实时同步到另一 ...
- ogg oracle 测试kafka_利用ogg实现oracle到kafka的增量数据实时同步
前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标 ...
- Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步
因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...
- oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战
引言: <异构数据源的CDC实时同步系统> 系列第一篇 (已完成) <零编码打造异构数据实时同步系统--异构数据源CDC之2> 系列第二篇(已完成) <零编码打造异构数据 ...
- oracle oci.dll无法加载_基于OGG 实现Oracle到Kafka增量数据实时同步
背景 在大数据时代,存在大量基于数据的业务.数据需要在不同的系统之间流动.整合.通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据.传统的数仓通过批量数据同步的 ...
最新文章
- spring security CSRF 问题 Invalid CSRF Token 'null' was found on ......
- hibernate系列之四
- MySQL(6)--- 创建数据库
- “跨界养猪”这件事,华为正式回应了
- Tree树结构java实现
- 【USACO】回文平方数
- python __file__ is not defined 解决方法
- python: 使用正则表达式的时候,传递参数的方法:
- Oracle P6培训系列:13分配限制条件
- TOGAF ADM 架构开发方法概述以及各个阶段的目的和交付物
- android 拼音搜索
- 计数器java代码_计数器的java代码
- 电脑版微信公众号文章加载不出来,空白的可能解决办法
- YY0709|听觉报警信号要求及测试方法
- Genlovy_Hoo大神的杰作
- 搜索引擎模块设计与实现——相关度搜索算法模块
- Log4j的基本应用
- 9.DNS和DNSmasq服务
- 外贸客户如何选择供应商:5R原则
- 矩阵/向量/标量间相互求导
热门文章
- Python 链表反转
- android 获取properties文件路径,读取local.properties文件
- 怎样修改MacOS默认输入法?
- 五款免费的网络管理工具
- linux拷贝目录并修改名字,linux复制文件夹、重命名文件夹、删除文件夹
- HITSZ智能证券投资报告(一)
- LRU算法,以及Apache LRUMap源码解析
- 【算法3】---滑动窗口(python)
- 一篇搞懂微信小程序以及和其他对比
- 服务器主机改装温7系统蓝屏,win7系统改装xp系统蓝屏怎么回事|win7换xp系统蓝屏解决方法...