背景

在大数据时代,存在大量基于数据的业务。数据需要在不同的系统之间流动、整合。通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统的数仓通过批量数据同步的方式,定期从OLTP系统中抽取数据。但是随着业务需求的升级,批量同步无论从实时性,还是对在线OLTP系统的抽取压力,都无法满足要求。需要实时从OLTP系统中获取数据变更,实时同步到下游业务系统。

本文基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。

Kafka是一种高效的消息队列实现,通过订阅kafka的消息队列,下游系统可以实时获取在线Oracle系统的数据变更情况,实现业务系统。

环境介绍

组件版本

整体架构图

名词解释

1.OGG Manager

OGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。

2.数据抽取(Extract)

抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下几种类型:

本地抽取
从本地数据库捕获增量变更数据,写入到本地Trail文件

数据推送(Data Pump)
从本地Trail文件读取数据,推送到目标端。

初始数据抽取
从数据库表中导出全量数据,用于初次数据加载

3.数据推送(Data Pump)

Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG

4.Trail文件

数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。

5.数据接收(Collector)

数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。

6.数据复制(Replicat)

数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。

7.检查点(Checkpoint)

检查点用于记录数据库事物变更。

操作步骤

源端Oracle配置

1.检查归档

使用OGG,需要在源端开启归档日志

SQL> archive log list;     Database log mode              Archive Mode     Automatic archival             Enabled     Archive destination            /u01/app/oracle/product/12.2.0/db_1/dbs/arch     Oldest online log sequence     2576     Next log sequence to archive   2577     Current log sequence           2577

2.检查数据库配置

SQL> select force_logging, supplemental_log_data_min from v$database; FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI ---------- ------------------------ YES        YES

如果没有开启辅助日志,需要开启:

SQL> alter database force logging; SQL> alter database add supplemental log data;

3.开启goldengate复制参数

SQL> alter system set enable_goldengate_replication = true;

4.创建源端Oracle账号

SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on; SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg; SQL> grant dba to ggsadmin;

5.创建测试表

SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500; SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id); SQL> select count(*) from baiyang.ora_to_kfk;    COUNT(*) ----------         436

源端OGG配置

1.检查源端OGG环境

cd /oradata/oggorcl/ogg ./ggsci GGSCI (dtproxy) 1> info all Program     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     STOPPED

2.创建相关文件夹

GGSCI (dtproxy) 2> create subdirs     Creating subdirectories under current directory /oradata/oggorcl/ogg        Parameter file                 /oradata/oggorcl/ogg/dirprm: created.     Report file                    /oradata/oggorcl/ogg/dirrpt: created.     Checkpoint file                /oradata/oggorcl/ogg/dirchk: created.     Process status files           /oradata/oggorcl/ogg/dirpcs: created.     SQL script files               /oradata/oggorcl/ogg/dirsql: created.     Database definitions files     /oradata/oggorcl/ogg/dirdef: created.     Extract data files             /oradata/oggorcl/ogg/dirdat: created.     Temporary files                /oradata/oggorcl/ogg/dirtmp: created.     Credential store files         /oradata/oggorcl/ogg/dircrd: created.     Masterkey wallet files         /oradata/oggorcl/ogg/dirwlt: created.     Dump files                     /oradata/oggorcl/ogg/dirdmp: created

3.配置源端Manager

GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle     Successfully logged into database. GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals

添加

oggschema ggsadmin GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr

添加

PORT 7810 --默认监听端口 DYNAMICPORTLIST  7811-7820 --动态端口列表 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --进程有问题,每3分钟重启一次,一共重启五次 PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7  --*/ LAGREPORTHOURS 1 --每隔一小时检查一次传输延迟情况 LAGINFOMINUTES 30 --传输延时超过30分钟将写入错误日志 LAGCRITICALMINUTES 45 --传输延时超过45分钟将写入警告日志 PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件 ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --设定172网段可连接

添加同步的表

GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID. GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239

目标端OGG配置
1.目标端检查环境

GGSCI (172-16-101-242) 1> info all     Program     Status      Group       Lag at Chkpt  Time Since Chkpt     MANAGER     STOPPED 

2.创建目录

GGSCI (172-16-101-242) 2> create subdirs     Creating subdirectories under current directory /app/ogg     Parameter file                 /app/ogg/dirprm: created.     Report file                    /app/ogg/dirrpt: created.     Checkpoint file                /app/ogg/dirchk: created.     Process status files           /app/ogg/dirpcs: created.     SQL script files               /app/ogg/dirsql: created.     Database definitions files     /app/ogg/dirdef: created.     Extract data files             /app/ogg/dirdat: created.     Temporary files                /app/ogg/dirtmp: created.     Credential store files         /app/ogg/dircrd: created.     Masterkey wallet files         /app/ogg/dirwlt: created. Dump files                     /app/ogg/dirdmp: created.

3.目标端Manager配置

GGSCI (172-16-101-242) 3> edit params mgr

添加

PORT 7810     DYNAMICPORTLIST 7811-7820     AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3     PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3        GGSCI (172-16-101-242) 4> edit  param  ./GLOBALSCHECKPOINTTABLE ggsadmin.checkpoint

全量数据同步

1.配置源端数据初始化

配置源端初始化进程

GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable 

配置源端初始化参数

GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk

添加

EXTRACT initkfk    SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)    USERID ggsadmin,PASSWORD oracle    RMTHOST 172.16.101.242, MGRPORT 7810    RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500table baiyang.ora_to_kfk;

2.源端生成表结构define文件

GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk

添加

defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt     userid ggsadmin,password oracle     table baiyang.ora_to_kfk;

执行

$./defgen paramfile dirprm/define_kfk.prm -- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt

将此文件传输到目标段dirdef文件夹

scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt

3.配置目标端数据初始化进程

配置目标端初始化进程

GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun GGSCI (172-16-101-242) 6> edit params initkfk

添加

SPECIALRUN     end runtime     setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")     targetdb libfile libggjava.so set property=./dirprm/kafka.props     SOURCEDEFS ./dirdef/define_kfk.txt     EXTFILE ./dirdat/ekfk000000     reportcount every 1 minutes, rate     grouptransops 10000 map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;

4.配置kafka相关参数
vi ./dirprm/kafka.props

添加

gg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.topicMappingTemplate=test_ogggg.handler.kafkahandler.format=jsongg.handler.kafkahandler.mode=opgg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/*  --*/

vi custom_kafka_producer.properties

添加

bootstrap.servers=172.16.101.242:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000

5.源端开启全量数据抽取
源端

GGSCI (dtproxy) 20>  start mgr GGSCI (dtproxy) 21>  start initkfk

6.目标端全量数据应用

GGSCI (172-16-101-242) 13> start mgr ./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

7.kafka数据验证

使用kafka客户端工具查看topic的数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}} {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}

全量数据已经同步到目标kafka topic
增量数据同步

1.源端抽取进程配置

GGSCI (dtproxy) 9> edit param extkfk

添加

dynamicresolution SETENV (ORACLE_SID = "dtstack") SETENV (NLS_LANG = "american_america.AL32UTF8") userid ggsadmin,password oracle exttrail ./dirdat/to table baiyang.ora_to_kfk;

添加extract进程

GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now

添加trail文件的定义与extract进程绑定

GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

2.源端数据推送进程配置

配置源端推送进程

GGSCI (dtproxy) 12> edit param pupkfk

添加

extract pupkfk passthru dynamicresolution userid ggsadmin,password oracle rmthost 172.16.101.242 mgrport 7810 rmttrail ./dirdat/to table baiyang.ora_to_kfk;

添加extract进程

GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to

添加trail文件的定义与extract进程绑定

GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk

3.配置目标端恢复进程

配置目标端恢复进程

edit param repkfk

添加

REPLICAT repkfk SOURCEDEFS ./dirdef/define_kfk.txt targetdb libfile libggjava.so set property=./dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;

添加trail文件到replicate进程

add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

4.源端开启实时数据抓取

./ggsci GGSCI (dtproxy) 5> start extkfk Sending START request to MANAGER ... EXTRACT EXTKFK startingGGSCI (dtproxy) 6> start pupkfk Sending START request to MANAGER ... EXTRACT PUPKFK starting  GGSCI (dtproxy) 7> status all Program     Status      Group       Lag at Chkpt  Time Since Chkpt MANAGER     RUNNING EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10 EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00

5.目标端开启实时数据同步

./ggsci GGSCI (172-16-101-242) 7> start replicat repkfk Sending START request to MANAGER ... REPLICAT REPKFK starting  GGSCI (172-16-101-242) 8> info all Program     Status      Group       Lag at Chkpt  Time Since Chkpt  MANAGER     RUNNING REPLICAT    RUNNING     REPKFK      00:00:00      00:00:00

6.测试增量数据同步

Oracle插入增量数据

SQL> insert into baiyang.ora_to_kfk  select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and  object_id < 1000; SQL> commit; SQL> select count(*) from baiyang.ora_to_kfk;COUNT(*) ----------     905

查看Kafka消息队列消费数据

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}} {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}

源端Oracle删除数据

SQL> delete from baiyang.ora_to_kfk ; 906 rows deleted. SQL> commit;

查看kafka消息队列消费数据

{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

源端插入数据

SQL> insert into  baiyang.ora_to_kfk values('汉字', 'y1', 'z1', 111000,2000,'x1'); 1 row created. SQL> commit;

查看kafka消息队列消费数据

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"汉字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

总结

使用OGG可以方便地将Oracle的数据变更情况实时同步到Kafka消息队列。下游业务系统通过订阅kafka的消息队列,能方便地实现各类实时数据的应用。

oracle oci.dll无法加载_基于OGG 实现Oracle到Kafka增量数据实时同步相关推荐

  1. ogg oracle 测试kafka_利用ogg实现oracle到kafka的增量数据实时同步

    前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标 ...

  2. sqlserver2008基于发布/订阅功能实现主从数据库数据实时同步

    网上关于sqlserver基于发布/订阅实现数据同步的文章很多,大多介绍不详细,各种copy.为实现发布服务器.订阅服务器数据库实时同步,近期花了几天时间认真研究了一下,并实践验证通过,希望本文能帮助 ...

  3. python 按需加载_基于python的opcode优化和模块按需加载机制研究(学习与个人思路)(原创)...

    基于python的opcode优化和模块按需加载机制研究(学习与思考) 姓名:XXX 学校信息:XXX 主用编程语言:python3.5 文档转换为PDF有些图片无法完全显示,请移步我的博客查看 完成 ...

  4. dll侧加载_动态载入DLL所需要的三个函数详解(LoadLibrary,GetProcAddress,FreeLibrary)...

    动态载入 DLL 动态载入方式是指在编译之前并不知道将会调用哪些 DLL 函数, 完全是在运行过程中根据需要决定应调用哪些函数. 方法是:用 LoadLibrary 函数加载动态链接库到内存,用 Ge ...

  5. 使用ogg实现oracle到kafka的增量数据实时同步

    Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步. ...

  6. vue-ueditor 后端配置项没有正常加载_当运营商遇上极简流畅的产商品配置体验

    "销售品配置复杂!" "销售品展示的信息偏IT,不直观!业务部门不能理解!" "销售品配置到上架时间漫长!" "对一线营销支撑能力 ...

  7. WSAStartup()函数以及DLL的加载

    本节讲解 Windows 下 DLL 的加载,学习 Linux Socket 的读者可以跳过. WinSock(Windows Socket)编程依赖于系统提供的动态链接库(DLL),有两个版本: 较 ...

  8. mkl_def.dll文件加载失败

    mkl_def.dll文件加载失败 下载 mkl_def.dll文件的下载地址如下:Fix mkl_def.dll related errors in Windows 7, 8 or 10 | DLL ...

  9. 像加载DLL一样加载EXE

    介绍 你可能已经被警告过,不要用LoadLibrary()加载可执行文件,你可能尝试这么做过,然后程序就崩溃了,所以你可能会认为这是不可能的. 但实际上这是可行的,本文就将介绍具体的方法. 声明 这好 ...

最新文章

  1. 有助于建立使用者对套件的信任 GitHub释出管理服务
  2. 面向对象PHP之静态延迟绑定
  3. 核心频率个加速频率_流处理器、核心频率、 位宽……这些显卡参数你知道吗?—— 电脑硬件科普篇(八)...
  4. (JAVA)CollectionDemo1
  5. mysql socket tcp udp_TCP、UDP、HTTP、SOCKET之间的区别
  6. 解决办法:/usr/bin/ld: 找不到 -lstdc++
  7. Hibernate批量处理数据、HQL连接查询
  8. win64 oracle下载,oracle 11g 64位下载
  9. 运用系统分析方法,分析校园二手交易平台的可行性
  10. python tests in xxx问题
  11. 计算机主机忘了密码怎么办,如果我忘记了笔记本计算机的开机密码怎么办
  12. 【C语言】童年的扫雷游戏(递归展开)你也可以做出来,将他发给你的网瘾室友玩吧 ——含详细注释及解析
  13. print函数的高级用法(输出到文件,自定义间隔符,强制刷新)
  14. HDU 4507 吉哥系列故事——恨7不成妻(数位DP)
  15. python使用Tesseract,pytesseract图片处理识别(1)
  16. 【vsftpd】Ubuntu下搭建FTP服务器
  17. 使用MAT的命令行工具分析hprof文件
  18. 【C语言】初识二级指针
  19. pfc计算机仿真在矿山发展趋势,PFC电路的计算机仿真模拟.pdf
  20. 天正索引号是什么lisp_天正电气CAD教程之符号篇(内附往期秘籍)

热门文章

  1. NET START MSSQLSERVER 服务名无效解决办法
  2. Java String类和常量池
  3. Netty实现简单的多人聊天
  4. Opensim教程3-缩放,逆运动学,逆动力学
  5. 珠三角节前返乡客流持续走高
  6. Winzip的创始人Philip Katz之死,老程序要细读一下。
  7. HybridNets:多任务端到端感知网络 目标检测+可行驶区域+车道线检测
  8. 2019 前端面试题 (Vue)
  9. What's wrong baby?
  10. jquery仿作qq音乐笔记