1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-oracle-1.6.1.Final-plugin.tar.gz)
oracle版本:19c

kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744

2、安装oracle
https://blog.csdn.net/zyj81092211/article/details/120082828

3、设置oracle
(1)创建目录

mkdir /u01/app/oracle/oradata/recovery_area

(2)连接oracle

sqlplus / as sysdba

(3)进行设置

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
archive log list;

(4)打开pdb

alter pluggable database orders open;


(5)、连接至pdb ORDERS

alter session set container=ORDERS;

(6)、创建用户

create user debezium identified by Smtgbk_123;
grant dba to debezium;

(7)、重新链接数据库

sqlplus debezium/Smtgbk_123@localhost/orders

(8)、创建测试表

CREATE TABLE customers (id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,first_name VARCHAR2(255) NOT NULL,last_name VARCHAR2(255) NOT NULL,email VARCHAR2(255) NOT NULL UNIQUE
);

(9)、开启补充日志
开启表级补充日志

ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

开启数据库级别日志补充,在CDB中,执行以下命令

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

(10)、为连接器创建用户
a、使用管理员重新连接数据库

sqlplus / as sysdba

b、创建根容器表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

c、创建PDB表空间
切换至PDB

alter session set container=ORDERS;

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/ORDERS/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

d、创建连接器的 LogMiner 用户

 sqlplus / as sysdba

CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

4、获取 Oracle JDBC 驱动程序
下载地址
https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html

将软件包中ojdbc8.jar的上传到kafka connector集群所有节点libs文件夹内


杀死kafka 进程重启

kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties
connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties

5、Debezium Oracle 连接器配置
官方示例:

curl -H "Content-Type: application/json" -X POST -d  '{"name": "source-oracle202", "config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector","tasks.max" : "1","database.server.name" : "oracle202","database.hostname" : "10.99.99.202","database.port" : "1521","database.user" : "c##dbzuser","database.password" : "dbz","database.dbname" : "CDB19C","database.pdb.name" : "ORDERS","database.history.kafka.bootstrap.servers" : "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092","database.history.kafka.topic": "schema-changes.orders"}
}' http://kafkac01.wtown.com:8083/connectors/

查看状态正常

查看topic

6、测试
插入数据

INSERT INTO "DEBEZIUM"."CUSTOMERS" ("ID", "FIRST_NAME", "LAST_NAME", "EMAIL") VALUES ('1', 'zhang', 'san', 'zhagnsan@163.com');

查看topic中多了一个oracle202.DEBEZIUM.CUSTOMERS

消费oracle202.DEBEZIUM.CUSTOMERS

kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic  oracle202.DEBEZIUM.CUSTOMERS --from-beginning

刚才插入的数据已经抓取到

注:创建non cdb数据库参考官方文档
官方文档:
https://debezium.io/documentation/reference/1.6/connectors/oracle.html

Debezium 抽取oracle数据相关推荐

  1. 关于flink cdc 抽取oracle数据 oracle表名大小写的问题

    使用flink cdc 抽取oracle数据 报错ALTER TABLE XXXXX ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; 我使用的是flink cdc的 ...

  2. orcle抽数据到mysql_抽取oracle数据到mysql数据库的实现过程

    在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重点环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多个数据迁移程序,性能都不是很好 ...

  3. Sqoop增量抽取Oracle数据,最近8个小时的数据未被抽取

    在使用sqoop增量抽取数据时,在不指定m的情况下,导出日志中可以看到添加了截止时间,但这个时间并不是系统时间,经过多次比较发现,这个时间比当前系统时间早8个小时.因此怀疑是时区问题. 检查Oracl ...

  4. oracle 数据抽取 java_oracle数据抽取步骤

    oracle数据抽取步骤 Database links: 1.      在本地计算机上,新建一个连接远程数据库的连接,并记住这个连接的服务名(例如:jzfx_remote): 2.        返 ...

  5. sqlinesdata教程_如何将Oracle数据导入MySQL

    Manager进程:需要源端跟目标端同时运行,主要作用是监控管理其它进程,报告错误,分配及清理数据存储空间,发布阈值报告等 Extract进程:运行在数据库源端,主要用于捕获数据的变化,负责全量.增量 ...

  6. 基于OGG Datahub插件将Oracle数据同步上云

    摘要:随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中,并利用大数据工具对数据进行分析. 一.背景介绍 随着数据规 ...

  7. Ogg For Bigdata 同步Oracle数据到KAFKA(包括初始化历史数据)

    OGG同步Oracle数据到KAFKA:OGG初始化进程初始化历史数据 在前面曾写过几篇关于OGG同步Oracle等库数据到kafka的文章: OGG实时同步Oracle数据到Kafka实施文档(供f ...

  8. Oracle数据迁移MySQL

    前言: 现今,Oracle数据迁移MySQL的需求已经越来越普遍,主要的迁移场景大致可以分为三类,第一类是涉及小表以及少量表的一次性迁移,无需进行增量同步,第二类是涉及大表以及多表的一次性迁移,第三类 ...

  9. 『Oracle数据复制容灾案例系列』中银国际证券Oracle容灾案例

    『Oracle数据复制容灾案例系列』   中银国际证券Oracle数据库容灾案例 使用产品:DDS 上线时间:2007 系统环境:Oracle 10g RAC 惠普 安腾 IA 64 HPUX 11. ...

最新文章

  1. 博弈最高位POJ 1704(Georgia and Bob-Nim博弈)
  2. mysql的innodb数据库引擎详解
  3. linux 快速删除大量/大文件
  4. 小巧优美的ORM框架-doodads
  5. 计算机语言import,python中import指的是什么意思
  6. 基于HTML5手机上下滑动翻页特效
  7. rw1601可以用C语言写程序吗,用8051+1601LCD设计的整型计算器讲解.doc
  8. Science | 从结构生物学的角度理解人类mRNA剪接体分支位点的识别
  9. 最大子段和动态规划_动态规划解最大子段和问题
  10. 19.看板方法---变异性的根源
  11. 阶段5 3.微服务项目【学成在线】_day04 页面静态化_12-页面静态化-页面静态化流程...
  12. 命令端口C++检测本地网络端口占用
  13. ecg 幅度_用ECG和PPG测血压靠谱吗?有什么比较好的算法?
  14. C语言不支持函数重载的原因
  15. 今天修改包名时出现 java.lang.ClassNotFoundException: com.myandroid.qqlogin1.MainActivity错误
  16. 文件MD5查看linuxwindows
  17. mysql 字符集测试_关于字符集的测试报告_MySQL
  18. 如何编写firefox插件
  19. C++学习笔记day3
  20. 8086系列(22):中断响铃

热门文章

  1. 微信小程序 - 商城项目 - 图片详情预览
  2. 手机从哈林子弹节省人力
  3. ZigBee TI ZStack CC2530 4.1 三种网络设备类型
  4. 微信公众号接入微软小冰
  5. attempted an unsupported operation 、an unsupported operation was attempted [尝试不支持的操作]错误的三种解决方案
  6. 27种迹象你该考虑辞掉程序员的工作
  7. ZOJ 3789 Gears
  8. 网站标题设计与搜索引擎
  9. 等我装完B 我再收拾你 Wating me to fuck you
  10. 智慧水务大数据平台-智慧水务建设方案