Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步。

0、本篇中源端和目标端的一些配置信息:

- 版本 OGG版本 id地址
源端 Oracle11gR2 Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64 Carlota3
目标端 kafka_2.12-2.5.0 Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1 Carlota2

源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle!

PS:源端是安装好了Oracle的机器,目标端是安装好了Kafka的机器,二者环境变量之前都配置好了。

1、源端OGG安装

  • 先建立ogg目录

    mkdir -p /opt/ogg
    
  • 解压zip文件

    unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
    
  • 解压后得到一个tar包,再解压这个tar

    tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
    
  • 使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

    chown -R oracle:oinstall /data/ogg
    
  • 配置OGG环境变量

    vim /etc/profile
    

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=ORACLEHOME/lib:/usr/libexportPATH=ORACLE_HOME/lib:/usr/lib export PATH=ORACLEH​OME/lib:/usr/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
    

2、目标端OGG安装

  • 先建立ogg目录

    mkdir -p /data/ogg
    
  • 解压zip文件

    unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
    
  • 解压后得到一个tar包,再解压这个tar

    tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
    
  • 使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

    chown -R oracle:oinstall /data/ogg
    
  • 配置OGG环境变量

    vim /etc/profile
    

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64:JAVAH​OME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64/server:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/libjsig.so:JAVAH​OME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/server/libjvm.so:OGGHOME/libexportPATH=OGG_HOME/lib export PATH=OGGH​OME/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
    
  • ggsci
    
  • create subdirs
    

3、源端Oracle归档模式设置

  • 登陆Oracle用户

    su - oracle
    
  • 登陆Oracle

    sqlplus / as sysdba
    
  • 查看当前是否为归档模式(若为Disabled,则需手动打开)

    archive log list
    

    Database log mode

    No Archive Mode Automatic archival

    Disabled Archive destination

    USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12

    Current log sequence 14

  • 立即关闭数据库

    shutdown immediate
    
  • 启动实例并加载数据库,但不打开

    startup mount
    
  • 更改数据库为归档模式

    alter database archivelog;
    
  • 打开数据库

    alter database open;
    
  • 启用自动归档

    alter system archive log start;
    
  • 再次查看当前是否为归档模式(看到为Enabled,则成功打开归档模式。)

    archive log list
    

    Database log mode Archive Mode
    Automatic archival Enabled
    Archive destination USE_DB_RECOVERY_FILE_DEST
    Oldest online log sequence 12
    Next log sequence to archive 14
    Current log sequence 14

  • 查看辅助日志状态(若为NO,则需要通过命令修改)

    select force_logging, supplemental_log_data_min from v$database;
    

    FORCE_ SUPPLEMENTAL_LOG


    NO NO

  • alter database force logging;
    
  • alter database add supplemental log data;
    
  • 再次查看辅助日志状态(为YES即可)

    select force_logging, supplemental_log_data_min from v$database;
    

    FORCE_ SUPPLEMENTAL_LOG


    YES YES

4、源端oracle创建复制用户

  • root用户建立相关文件夹,并赋予权限

    mkdir -p /data/oracle/oggdata/orcl
    
    chown -R oracle:oinstall /data/oracle/oggdata/orcl
    
  • 执行下面sql

    SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;Tablespace created.SQL>  create user ogg identified by ogg default tablespace oggtbs;User created.SQL> grant dba to ogg;Grant succeeded.
  • Oracle创建测试表

    create user test_ogg  identified by test_ogg default tablespace users;
    grant dba to test_ogg;
    conn test_ogg/test_ogg;
    create table test_ogg(id int ,name varchar(20),primary key(id));
    

5、OGG源端配置

  • ggsci
    
  • create subdirs
    
  • dblogin userid ogg password ogg
    
  • edit param ./globals
    

    oggschema ogg

  • 配置管理器mgr

    edit param mgr
    

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 添加复制表

    add trandata test_ogg.test_ogginfo trandata test_ogg.test_ogg
    
  • 配置extract进程(ORACLE_SID与Orcale中的相同)

    edit param extkafka
    

    extract extkafka

    dynamicresolution

    SETENV (ORACLE_SID = “orcl11g”)

    SETENV (NLS_LANG = “american_america.AL32UTF8”)

    userid ogg,password ogg

    exttrail /da ta/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract extkafka,tranlog,begin now
    

    若报错

    ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

    执行下面的命令再重新添加即可。

    create subdirs
    
    add exttrail /data/ogg/dirdat/to,extract extkafka
    
  • 配置pump进程

    edit param pukafka
    

    extract pukafka

    passthru

    dynamicresolution

    userid ogg,password ogg

    rmthost Carlota2 mgrport 7809

    rmttrail /data/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract pukafka,exttrailsource /data/ogg/dirdat/to
    
    add rmttrail /data/ogg/dirdat/to,extract pukafka
    
  • 配置define文件(Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,)

    edit param test_ogg
    

    defsfile /data/ogg/dirdef/test_ogg.test_ogg

    userid ogg,password ogg

    table test_ogg.test_ogg;

  • 返回终端执行

    ./defgen paramfile dirprm/test_ogg.prm
    
  • 将生成的/data/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

    scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/
    

6、OGG目标端配置

  • 开启kafka服务

    zkServer.sh startkafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
    
  • ggsic
    
  • 配置管理器mgr

    edit param mgr
    

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 配置checkpoint

    edit  param  ./GLOBALS
    

    CHECKPOINTTABLE test_ogg.checkpoint

  • 配置replicate进程

    edit param rekafka
    

    REPLICAT rekafka

    sourcedefs /data/ogg/dirdef/test_ogg.test_ogg

    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

    REPORTCOUNT EVERY 1 MINUTES, RATE

    GROUPTRANSOPS 10000

    MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

  • 配置kafka.props(去掉注释)

    cd /opt/ogg/dirprm/
    vim kafka.props
    

    gg.handlerlist=kafkahandler //handler类型
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
    gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
    gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
    gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
    gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/

    vim custom_kafka_producer.properties
    

    bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
    acks=1
    compression.type=gzip //压缩类型
    reconnect.backoff.ms=1000 //重连延时
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=102400
    linger.ms=10000

  • 添加trail文件到replicate进程

    add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
    

7、测试

在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。
启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。
全部需要在ogg目录下执行ggsci目录进入ogg命令行。
源端依次是

start mgr
start extkafka
start pukafka

目标端

start mgr
start rekafka

GGSCI (Carlota2) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REKAFKA 00:00:00 00:00:08

GGSCI (Carlota3) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00

EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10

现在源端执行sql语句

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件状态

ls -l /data/ogg/dirdat/to*

查看目标端trail文件状态

ls -l /data/ogg/dirdat/to*

查看kafka是否自动建立对应的主题

kafka-topics.sh --list --zookeeper localhost:2181

在列表中显示有test_ogg则表示没问题
通过消费者看是否有同步消息

kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}

使用ogg实现oracle到kafka的增量数据实时同步相关推荐

  1. ogg oracle 测试kafka_利用ogg实现oracle到kafka的增量数据实时同步

    前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标 ...

  2. oracle oci.dll无法加载_基于OGG 实现Oracle到Kafka增量数据实时同步

    背景 在大数据时代,存在大量基于数据的业务.数据需要在不同的系统之间流动.整合.通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据.传统的数仓通过批量数据同步的 ...

  3. Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步

    因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...

  4. 基于OGG的Oracle与Hadoop集群准实时同步介绍

    版权声明:本文由王亮原创文章,转载请注明出处:  文章原文链接:https://www.qcloud.com/community/article/220 来源:腾云阁 https://www.qclo ...

  5. 使用RestCloud ETL实现增量数据实时同步

    用ETL工具做数据库增量同步方式总共有如下几种: 通过时间戳实时增量同步,每次读取数据时用上一次读的时间戳进行最新数据的查询,有缺点也有优点,这种方式增量我们后面再介绍用ETL怎么做 使用触发器实现增 ...

  6. 利用OGG实现Oracle到Kafka到Greenplum的增量数据同步

    墨墨导读:本文来自墨天轮用户 肖杰 的投稿,介绍用OGG实现Oracle到Kafka到Greenplum的增量数据同步的全过程. 墨天轮主页:https://www.modb.pro/u/6722 背 ...

  7. oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战

    引言: <异构数据源的CDC实时同步系统> 系列第一篇 (已完成) <零编码打造异构数据实时同步系统--异构数据源CDC之2> 系列第二篇(已完成) <零编码打造异构数据 ...

  8. 使用Streamsets将Oracle数据实时同步到MySQL中

    相关环境: Oracle 11g:11.2.0.1.0  MySQL:8.0.22 前期准备: 1.打开Oracle的logminer a.在SQL Shell中,以具有DBA的用户身份登录数据库: ...

  9. oracle 11gogg,【OGG】Oracle GoldenGate 11g (二) GoldenGate 11g 单向同步配置 上

    Oracle GoldenGate 11g (二) GoldenGate 11g 单向同步配置 上 Item Source System Target System Platform RHEL6.4 ...

最新文章

  1. 使用Nginx为Leanote配置Https
  2. MATLAB 获取某个文件夹下所有文件夹或者文件的名字
  3. 【实操】Y7000P 2020款安装黑苹果完整步骤记录
  4. 爬虫 - HDU题目信息
  5. BufferedOutputStream_字节缓冲输出流
  6. 怎样让电脑速度变快_硬盘在电脑中起什么作用?
  7. 自己改造 VSPaste 插件
  8. Vue组件间的传值五大场景,你造吗?
  9. 一致性哈希算法 应用场景
  10. 使用trace_event跟踪进程的一生
  11. 【8.0、9.0c】树形列表 列标题 不对齐的问题及解决方案
  12. 强悍的 vim —— 处理大小写转换
  13. OpenCV笔记(十八)——使用霍夫变换检测圆圈
  14. 屏蔽五项网络功能 让XP系统极速狂飙
  15. 庖丁解牛之-Android平台RTSP|RTMP播放器设计
  16. js 获取ip和浏览器信息
  17. Python: 鲁卡斯队列
  18. GANs奇思妙想TOP10榜单
  19. [ansible系列③]Ansible Inventory配置及详解
  20. 前端三件套之css笔记

热门文章

  1. html 页面重复度高,html – CSS背景渐变重复问题
  2. svm回归matlab工具箱很慢,PSO优化SVM参数进行回归预测,结果很不理想
  3. qt动态添加窗口到垂直布局
  4. 成员变量(全局变量)和局部变量区别
  5. 包机制、阿里巴巴开发手册
  6. CTF-MISC杂项题2
  7. python动态_python --动态类型
  8. java基数排序 数组_万字长文带你掌握Java数组与排序,代码实现原理都帮你搞明白!...
  9. php开发视频播放顺序,请问关于php代码运行顺序问题
  10. 计算机网络把许多什么连接在一起,计算机网络技术基础知识汇总习题