Debezium 抽取oracle数据
1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-oracle-1.6.1.Final-plugin.tar.gz)
oracle版本:19c
kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744
2、安装oracle
https://blog.csdn.net/zyj81092211/article/details/120082828
3、设置oracle
(1)创建目录
mkdir /u01/app/oracle/oradata/recovery_area
(2)连接oracle
sqlplus / as sysdba
(3)进行设置
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
archive log list;
(4)打开pdb
alter pluggable database orders open;
(5)、连接至pdb ORDERS
alter session set container=ORDERS;
(6)、创建用户
create user debezium identified by Smtgbk_123;
grant dba to debezium;
(7)、重新链接数据库
sqlplus debezium/Smtgbk_123@localhost/orders
(8)、创建测试表
CREATE TABLE customers (id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,first_name VARCHAR2(255) NOT NULL,last_name VARCHAR2(255) NOT NULL,email VARCHAR2(255) NOT NULL UNIQUE
);
(9)、开启补充日志
开启表级补充日志
ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
开启数据库级别日志补充,在CDB中,执行以下命令
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
(10)、为连接器创建用户
a、使用管理员重新连接数据库
sqlplus / as sysdba
b、创建根容器表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
c、创建PDB表空间
切换至PDB
alter session set container=ORDERS;
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/ORDERS/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
d、创建连接器的 LogMiner 用户
sqlplus / as sysdba
CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
4、获取 Oracle JDBC 驱动程序
下载地址
https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html
将软件包中ojdbc8.jar的上传到kafka connector集群所有节点libs文件夹内
杀死kafka 进程重启
kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties
connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties
5、Debezium Oracle 连接器配置
官方示例:
curl -H "Content-Type: application/json" -X POST -d '{"name": "source-oracle202", "config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector","tasks.max" : "1","database.server.name" : "oracle202","database.hostname" : "10.99.99.202","database.port" : "1521","database.user" : "c##dbzuser","database.password" : "dbz","database.dbname" : "CDB19C","database.pdb.name" : "ORDERS","database.history.kafka.bootstrap.servers" : "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092","database.history.kafka.topic": "schema-changes.orders"}
}' http://kafkac01.wtown.com:8083/connectors/
查看状态正常
查看topic
6、测试
插入数据
INSERT INTO "DEBEZIUM"."CUSTOMERS" ("ID", "FIRST_NAME", "LAST_NAME", "EMAIL") VALUES ('1', 'zhang', 'san', 'zhagnsan@163.com');
查看topic中多了一个oracle202.DEBEZIUM.CUSTOMERS
消费oracle202.DEBEZIUM.CUSTOMERS
kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic oracle202.DEBEZIUM.CUSTOMERS --from-beginning
刚才插入的数据已经抓取到
注:创建non cdb数据库参考官方文档
官方文档:
https://debezium.io/documentation/reference/1.6/connectors/oracle.html
Debezium 抽取oracle数据相关推荐
- 关于flink cdc 抽取oracle数据 oracle表名大小写的问题
使用flink cdc 抽取oracle数据 报错ALTER TABLE XXXXX ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; 我使用的是flink cdc的 ...
- orcle抽数据到mysql_抽取oracle数据到mysql数据库的实现过程
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重点环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多个数据迁移程序,性能都不是很好 ...
- Sqoop增量抽取Oracle数据,最近8个小时的数据未被抽取
在使用sqoop增量抽取数据时,在不指定m的情况下,导出日志中可以看到添加了截止时间,但这个时间并不是系统时间,经过多次比较发现,这个时间比当前系统时间早8个小时.因此怀疑是时区问题. 检查Oracl ...
- oracle 数据抽取 java_oracle数据抽取步骤
oracle数据抽取步骤 Database links: 1. 在本地计算机上,新建一个连接远程数据库的连接,并记住这个连接的服务名(例如:jzfx_remote): 2. 返 ...
- sqlinesdata教程_如何将Oracle数据导入MySQL
Manager进程:需要源端跟目标端同时运行,主要作用是监控管理其它进程,报告错误,分配及清理数据存储空间,发布阈值报告等 Extract进程:运行在数据库源端,主要用于捕获数据的变化,负责全量.增量 ...
- 基于OGG Datahub插件将Oracle数据同步上云
摘要:随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中,并利用大数据工具对数据进行分析. 一.背景介绍 随着数据规 ...
- Ogg For Bigdata 同步Oracle数据到KAFKA(包括初始化历史数据)
OGG同步Oracle数据到KAFKA:OGG初始化进程初始化历史数据 在前面曾写过几篇关于OGG同步Oracle等库数据到kafka的文章: OGG实时同步Oracle数据到Kafka实施文档(供f ...
- Oracle数据迁移MySQL
前言: 现今,Oracle数据迁移MySQL的需求已经越来越普遍,主要的迁移场景大致可以分为三类,第一类是涉及小表以及少量表的一次性迁移,无需进行增量同步,第二类是涉及大表以及多表的一次性迁移,第三类 ...
- 『Oracle数据复制容灾案例系列』中银国际证券Oracle容灾案例
『Oracle数据复制容灾案例系列』 中银国际证券Oracle数据库容灾案例 使用产品:DDS 上线时间:2007 系统环境:Oracle 10g RAC 惠普 安腾 IA 64 HPUX 11. ...
最新文章
- 博弈最高位POJ 1704(Georgia and Bob-Nim博弈)
- mysql的innodb数据库引擎详解
- linux 快速删除大量/大文件
- 小巧优美的ORM框架-doodads
- 计算机语言import,python中import指的是什么意思
- 基于HTML5手机上下滑动翻页特效
- rw1601可以用C语言写程序吗,用8051+1601LCD设计的整型计算器讲解.doc
- Science | 从结构生物学的角度理解人类mRNA剪接体分支位点的识别
- 最大子段和动态规划_动态规划解最大子段和问题
- 19.看板方法---变异性的根源
- 阶段5 3.微服务项目【学成在线】_day04 页面静态化_12-页面静态化-页面静态化流程...
- 命令端口C++检测本地网络端口占用
- ecg 幅度_用ECG和PPG测血压靠谱吗?有什么比较好的算法?
- C语言不支持函数重载的原因
- 今天修改包名时出现 java.lang.ClassNotFoundException: com.myandroid.qqlogin1.MainActivity错误
- 文件MD5查看linuxwindows
- mysql 字符集测试_关于字符集的测试报告_MySQL
- 如何编写firefox插件
- C++学习笔记day3
- 8086系列(22):中断响铃