墨墨导读:本文来自墨天轮用户 肖杰 的投稿,介绍用OGG实现Oracle到Kafka到Greenplum的增量数据同步的全过程。

墨天轮主页:https://www.modb.pro/u/6722

背景

在大数据库时代,数据经常需要在不同的数据库之间流动、整合,并要求具有一定的实时性,传统的通过脚本定时,批量同步的方式根本无法满足需求。

本文基于Oracle OGG,Kafka消息队列实现Oracle到Greenplum之间的准实时同步(实测延时在ms级别)。

一、环境准备

版本 OGG版本 IP
源端 oracle 12.2.0.1 123012_fbo_ggs_Linux_x64_shiphome.zip 192.168.11.151
目标端 kafka 2.12 OGG_BigData_Linux_x64_12.3.2.1.1.zip 192.168.11.165/4/3

二、ORACLE,GREENPLUM,OGG安装

此过程所用方法比较通用,此处不再赘述。

三、源库配置

源库需要开启归档,配置ogg用户,表空间及相关参数等,此处不再详细赘述

四、源端OGG配置

1. 进程配置
mgr.prm

PORT 7809

extn.prm

extract extn
userid C##GGADMIN@ora12c,password GGadmin_123
DISCARDFILE ./dirrpt/extn.dsc, APPEND, MEGABYTES 1024
EXTTRAIL  ./dirdat/na
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
DYNAMICRESOLUTION
--源库是一个CDB,本次测试抽取pdb_test下面的两张表
table pdb_test.test.ccps_traderecord;
table pdb_test.test.ccps_unnormal_traderecord;

dpen.prm

extract dpen
RMTHOST 192.168.11.165, MGRPORT 7809
RMTTRAIL ./dirdat/na
DYNAMICRESOLUTION
TABLE pdb_test.test.*;

2. 创建进程

add extract extn, TRANLOG, BEGIN NOW
add exttrail ./dirdat/na, EXTRACT extn, MEGABYTES 200ADD EXTRACT dpen, EXTTRAILSOURCE ./dirdat/na
ADD RMTTRAIL ./dirdat/na, EXTRACT dpen, MEGABYTES 200

五、目标端OGG配置

1. 配置JDBC Hander参数

示例目录下面有配置模板,复制过来即可
cp /home/oracle/ogg/AdapterExamples/big-data/kafka/* /home/oracle/ogg/dirprm

custom_kafka_producer.properties:

bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=0
max.request.size = 5024000
send.buffer.bytes = 5024000

kafka.props

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
#The following selects the message key using the concatenated primary keys
#gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=txgoldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUEgg.log=log4j
gg.log.level=INFOgg.report.time=30sec#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/home/oracle/kafka/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*javawriter.bootoptions=-Xmx512m -Xms512m -Djava.class.path=ggjava/ggjava.jar

2. 进程配置

mgr.prm

port 7809

repn.prm

replicat repn
getenv(JAVA_HOME)
getenv(CLASSPATH)
getenv(PATH)
getenv(LD_LIBRARY_PATH)
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
TARGETDB LIBFILE libggjava.so SET property=./dirprm/kafka.props
SOURCEDEFS /home/oracle/ogg/dirdef/source.def
REPORTCOUNT EVERY 1 MINUTES, RATE
--GROUPTRANSOPS 10000
MAP pdb_test.test.ccps_traderecord, TARGET ccps.ccps_traderecord;
MAP pdb_test.test.ccps_unnormal_traderecord, TARGET ccps.ccps_unnormal_traderecord;

3. 创建进程

add replicat repn, exttrail ./dirdat/na

六、目标端kafka安装配置

1. 安装

解压即可(此版本kafka本身已经集成zookeeper,不需要单独安装。)

2. 配置

因为是测试环境,此处列出的都是一些关键性参数,至于其他性能相关参数根据自己需要调整即可。

consumer.properties

bootstrap.servers=localhost:9092

server.properties

listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181

zookeeper.properties

clientPort=2181

3. 启动zookeeper

[oracle@gpmaster kafka]$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[oracle@gpmaster kafka]$ jps
18001 Jps
2505 QuorumPeerMain

4. 启动kafka

[oracle@gpmaster kafka]$ bin/kafka-server-start.sh config/server.properties
[oracle@gpmaster kafka]$ jps
18016 Kafka
2505 QuorumPeerMain
18458 Jps

5. kafka常用命令

显示所有topic(其中oggtopic就是用于此次测试的topic,kafka.props中配置的名字):

[oracle@gpmaster bin]$ kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
mySchemaTopic
oggtest
oggtopic
test

生产消息:

[oracle@gpmaster bin]$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
>test kafka!!!!
>

消费消息:

[oracle@gpmaster bin]$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
test kafka!!!!

七、编写代码将数据从kafka加载到GP

from kafka import KafkaConsumer
import json
import psycopg2
import time
import redbconn = psycopg2.connect(dbname = "devin",user = "ccps",password = "xjlove1224",host = "192.168.11.165",port = "5432")consumer = KafkaConsumer('oggtopic',group_id = "oggtopic",auto_offset_reset='earliest',bootstrap_servers=['localhost:9092'])
for msg in consumer:message = msg.valuemesg = message.decode('utf-8')msglist = re.findall(r'(.*?}})',mesg)  --切分

代码细节略,需要注意NULL,各种特殊字符以及异常的处理。

八、测试

测试环境,目标端没有做初始化,运行结果如下:

抽取到kafka的数据,json格式:

 {"table":"CCPS.CCPS_UNNORMAL_TRADERECORD","op_type":"I","op_ts":"2020-08-18 18:15:48.048933","current_ts":"2020-08-18T18:15:54.146000","pos":"00000000000026312368","after":{"UTR_ID":923628,"UTR_NO":"200818181648559171322","UTR_MER_NO":191713,"UTR_GW_NO":19171301,"UTR_MER_ORDERNO":"1597745803378","UTR_CURRENCY":"USD","UTR_AMOUNT":"0.01","UTR_BANK_CODE":null,"UTR_CHA_CODE":null,"UTR_RETURNURL":null,"UTR_WEBSITE":"http://192.168.11.98:7105/Test/jsp/testPayTwo.jsp","UTR_SUBMITURL":"http://192.168.11.98:7105/PaymentGateway/gateway/directservice/pay","UTR_PAYSTARTTIME":"2020-08-18 18:16:48.233971000","UTR_PAYTIME":null,"UTR_PAYENDTIME":null,"UTR_ERRORCODE":"50000","UTR_ERRORREASON_OUT":"Unpaid","UTR_ERRORREASON_IN":"Unpaid","UTR_MER_REMARK":"2fang","UTR_RISKINFO":null,"UTR_SETSCORE":null,"UTR_TOTALSCORE":null,"UTR_PASSRISKINFO":null,"UTR_PM_ID":1,"UTR_CARDNO_ENCRYPT":null,"UTR_NOTICEURL":null,"UTR_BILL_ADDRESS_WHITE":null,"UTR_BILL_ADDRESS_EXTRA1_WHITE":null,"UTR_LOGO_URL":null,"UTR_LANGUAGE":null,"UTR_PAY_EXPIRATIONTIME":null,"UTR_MCC":null,"UTR_PM_NAME":"Credit Card","UTR_TRADE_BANK_NAME":null,"UTR_EXPIRATION_TIME":null}}{"table":"CCPS.CCPS_UNNORMAL_TRADERECORD","op_type":"U","op_ts":"2020-08-18 18:15:48.048933","current_ts":"2020-08-18T18:15:54.167000","pos":"00000000000026316287","before":{},"after":{"UTR_NO":"200818181648559171322","UTR_CARDNO_ENCRYPT":"GuL6A5JN4rwAaHb8fPSAMdhxRA9CL1RVIjWwP81YujHEvXuz0YnmBssWqslPmsAgsuPeKBdQarDb4B23oy0C6wMJYner9LKmRVbuNfuGummiWnRmHAM9Dn2dJKcujEPProxYeBWUMbEhk9V54Q018cML1bcf3fvJjKL/vt4xBbk="}}{"table":"CCPS.CCPS_UNNORMAL_TRADERECORD","op_type":"D","op_ts":"2020-08-18 18:15:48.048933","current_ts":"2020-08-18T18:15:54.167001","pos":"00000000000026316959","before":{"UTR_ID":923628,"UTR_NO":"200818181648559171322","UTR_MER_NO":191713,"UTR_GW_NO":19171301,"UTR_MER_ORDERNO":"1597745803378","UTR_CURRENCY":"USD","UTR_AMOUNT":"0.01","UTR_BANK_CODE":null,"UTR_CHA_CODE":null,"UTR_RETURNURL":null,"UTR_WEBSITE":"http://192.168.11.98:7105/Test/jsp/testPayTwo.jsp","UTR_SUBMITURL":"http://192.168.11.98:7105/PaymentGateway/gateway/directservice/pay","UTR_PAYSTARTTIME":"2020-08-18 18:16:48.233971000","UTR_PAYTIME":null,"UTR_PAYENDTIME":null,"UTR_ERRORCODE":"50000","UTR_ERRORREASON_OUT":"Unpaid","UTR_ERRORREASON_IN":"Unpaid","UTR_MER_REMARK":"2fang","UTR_RISKINFO":null,"UTR_SETSCORE":null,"UTR_TOTALSCORE":null,"UTR_PASSRISKINFO":null,"UTR_PM_ID":1,"UTR_CARDNO_ENCRYPT":"GuL6A5JN4rwAaHb8fPSAMdhxRA9CL1RVIjWwP81YujHEvXuz0YnmBssWqslPmsAgsuPeKBdQarDb4B23oy0C6wMJYner9LKmRVbuNfuGummiWnRmHAM9Dn2dJKcujEPProxYeBWUMbEhk9V54Q018cML1bcf3fvJjKL/vt4xBbk=","UTR_NOTICEURL":null,"UTR_BILL_ADDRESS_WHITE":null,"UTR_BILL_ADDRESS_EXTRA1_WHITE":null,"UTR_LOGO_URL":null,"UTR_LANGUAGE":null,"UTR_PAY_EXPIRATIONTIME":null,"UTR_MCC":null,"UTR_PM_NAME":"Credit Card","UTR_TRADE_BANK_NAME":null,"UTR_EXPIRATION_TIME":null}}{"table":"CCPS.CCPS_TRADERECORD","op_type":"I","op_ts":"2020-08-18 18:15:48.048933","current_ts":"2020-08-18T18:15:54.168000","pos":"00000000000026318029","after":{"TR_ID":787160,"TR_NO":"200818181648559171322","TR_MER_ORDERNO":"1597745803378","TR_MER_NO":191713,"TR_GW_NO":19171301,"TR_CURRENCY":"USD","TR_AMOUNT":0.01,"TR_STATUS":-1,"TR_TRADE_RATE":0.0100,"TR_SPP_CURRENCY":"USD","TR_SPP":0,"TR_AGENT_NO":0,"TR_AGENT_RATE":null,"TR_AGENT_SPP_CURRENCY":null,"TR_AGENT_SPP":null,"TR_RESEVER_RATE":0.1000,"TR_RATE_VALUE":1,"TR_BANKCURRENCY":"USD","TR_BANKAMOUT":0.01,"TR_BANK_CODE":"OPBank","TR_CHA_CODE":121,"TR_ISDELAY":0,"TR_CHA_RATE":0.0100,"TR_BANK_SPP_CURRENCY":"USD","TR_BANK_SPP":1.00,"TR_CHA_SETT_BANK":null,"TR_BANKORDERNO":null,"TR_BANKRETURNCODE":null,"TR_BANKINFO":null,"TR_PAYSTARTTIME":"2020-08-18 18:16:48.139000000","TR_PAYENDTIME":null,"TR_DATETIME":"2020-08-18 18:16:48.346618000","TR_BANKDATETIME":null,"TR_RETURNURL":null,"TR_WEBSITE":"http://192.168.11.98:7105/Test/jsp/testPayTwo.jsp","TR_SUBMITURL":"http://192.168.11.98:7105/PaymentGateway/gateway/directservice/pay","TR_CHECKED":0,"TR_CHECKDATETIME":null,"TR_REFUNDMENT":0,"TR_PROTEST":0,"TR_CONGEAL":0,"TR_DELIVERY":null,"TR_REFUNDMENT_AMOUNT":0,"TR_PROTEST_AMOUNT":0,"TR_EXP_AMOUNT":0,"TR_CONGEAL_AMOUNT":0,"TR_UNSETT_AMOUNT":null,"TR_TS_ID":0,"TR_TS_STATUS":0,"TR_RS_ID":0,"TR_RS_STATUS":0,"TR_AS_ID":0,"TR_AS_STATUS":0,"TR_QUERYNO":null,"TR_AUTHORIZELD":null,"TR_BATCHNO":null,"TR_TERMINALNO":null,"TR_ISDCC":0,"TR_TS_BATCH":0,"TR_ISTSCHECK":0,"TR_RS_BATCH":0,"TR_ISRSCHECK":0,"TR_ISDAYCHECK":0,"TR_ISEXPLAIN":-2,"TR_ISFRAUD":0,"TR_ISCOMPLAIN":0,"TR_INF_TYPE":2,"TR_ISLOCK":0,"TR_FEE_FAIL_MER":0,"TR_FEE_SUCCESS_MER":0,"TR_FEE_SUCCESS_AFTER_MER":0,"TR_IS_BACK_MER":0,"TR_IS_BACK_AFTER_MER":0,"TR_FEE_FAIL_AGENT":0,"TR_FEE_SUCCESS_AGENT":0,"TR_FEE_SUCCESS_AFTER_AGENT":0,"TR_IS_BACK_AGENT":0,"TR_IS_BACK_AFTER_AGENT":0,"TR_FEE_FAIL_CHA":1,"TR_FEE_SUCCESS_CHA":1,"TR_FEE_SUCCESS_AFTER_CHA":1,"TR_IS_BACK_CHA":1,"TR_IS_BACK_AFTER_CHA":1,"TR_REMARK":"2fang","TR_BANKTRADETIME":null,"TR_CARDTYPE":1,"TR_REFERENCE":"200818181648559171322","TR_IS_REPAY":0,"TR_RISKINFO":null,"TR_SETSCORE":0,"TR_TOTALSCORE":98.00,"TR_PASSRISKINFO":null,"TR_PM_ID":1,"TR_AUTH_TYPE":0,"TR_SEND_CR_FLAG":null,"TR_SEND_MER_FLAG":null,"TR_REFCHAPRO_FLAG":null,"TR_CLEAR_AMOUNT":null,"TR_CLEAR_TIME":null,"TR_CLEAR_STATUS":null,"TR_BANK_CHARGES":null,"TR_CLEAR_CURRENCY":null,"TR_AUTH_OP_TIME":null,"TR_DEBITTIME":"2020-08-18 18:16:48.346618000","TR_SETT_RATE":1,"TR_SETT_AMOUNT":0.01,"TR_SETT_CURRENCY":"USD","TR_TRANRETURNCODE":null,"TR_TRANINFO":null,"TR_WHETHER_3D":"0","TR_ECI_VALUE":null,"TR_ISAUTH":0,"TR_SETTLEMENT":0,"TR_NOTICEURL":null,"TR_NOTICESTATUS":null,"TR_CHARESERVERATE":0.0100,"TR_REFUSEAMOUNT":0,"TR_REFOUNDAMOUNT":0,"TR_BAILAMOUNT":null,"TR_IS_MER_REFOUND":0,"TR_IS_MER_REFUSE":0,"TR_SETT_SPP":0,"TR_BILL_ADDRESS":"OPBANK 账单地址","TR_PAY_SOLUTIONS":null,"TR_PAY_INSTALLCOUNT":null,"TR_PAY_INSTALLRATE":null,"TR_SPP_RATE":1,"TR_RATE_INSTALLRATE":0,"TR_MER_LOGOURL":null,"TR_CLEAR_BATCHNO":null,"TR_CLEAR_REMARK":null,"TR_IS_MER_REFOUND_PART":0,"TR_BILL_ADDRESS_WHITE":null,"TR_BILL_ADDRESS_EXTRA1_WHITE":null,"TR_PRIOR_CHANNEL":0,"TR_SETTLEMENT_BANK":null,"TR_SUB_BATCHNO_BANK":null,"TR_SUB_BANK_NO":null,"TR_BANK_REMARK":null,"TR_CONTRACTTYPE":302,"TR_SETTLEMENTBODY":8,"TR_CHANNELBODY":2,"TR_SETTLEMENTAMONT":null,"TR_TSINGLEFEE":null,"TR_TREFUNDFEE":null,"TR_TRATE":null,"TR_TRATERETURN":null,"TR_TAXATION":null,"TR_MIN_TRADE_FEE":0,"TR_BANK_RATE_HKD":7.75081,"TR_SETT_RATE_HKD":7.75081,"TR_RETURN_AVS":null,"TR_RETURN_CVV":null,"TR_TYPE":1,"TR_SETT_STATUS":null,"TR_REFUND_LOCK":null,"TR_MCC":5722,"TR_UPDATE_ORDER_STATUS":0,"TR_MANUAL_LIQUIDATION_REMARK":null,"TR_OP_ENTITY":1,"TR_TRADE_BANK_NAME":null,"TR_CLEAR_RATE":null,"TR_INTERCHANGE_FEE":null,"TR_SCHEME_FEE":null,"TR_ACQUIRER_FEE":null,"TR_CLEAR_TRADE_RATE":null,"TR_CROSS_BORDER_FEE":null,"TR_AVS_OUT_RESULT":null,"TR_CAVV":null,"TR_BANK_PID":null,"TR_EXPIRATION_TIME":null,"TR_TCHARGEBACK_FEE":null,"TR_TCONGEAL_FEE":null,"TR_FXAJD_FEE":null,"TR_TOTHER_FEE":null}}{"table":"CCPS.CCPS_TRADERECORD","op_type":"U","op_ts":"2020-08-18 18:15:48.048933","current_ts":"2020-08-18T18:15:54.168001","pos":"00000000000026320941","before":{},"after":{"TR_NO":"200818181648559171322","TR_STATUS":1,"TR_BANKORDERNO":null,"TR_BANKRETURNCODE":"80000","TR_BANKINFO":"Transaction Approved1","TR_BANKDATETIME":"2020-08-18 18:16:48.360706000","TR_CHECKED":0,"TR_CHECKDATETIME":"2020-08-18 18:16:48.000000000","TR_QUERYNO":null,"TR_AUTHORIZELD":null,"TR_BATCHNO":null,"TR_TERMINALNO":null,"TR_AUTH_TYPE":0,"TR_DEBITTIME":"2020-08-18 18:16:48.360706000","TR_TRANRETURNCODE":"10000","TR_TRANINFO":"Transaction Approved","TR_WHETHER_3D":"0","TR_ECI_VALUE":null,"TR_NOTICESTATUS":2,"TR_PAY_SOLUTIONS":"Transaction Approved3","TR_BANK_REMARK":";;","TR_RETURN_AVS":null,"TR_RETURN_CVV":null,"TR_UPDATE_ORDER_STATUS":0}}
{"table":"CCPS.CCPS_TRADERECORD","op_type":"U","op_ts":"2020-08-18 18:16:01.042146","current_ts":"2020-08-18T18:16:06.549000","pos":"00000000000026321695","before":{},"after":{"TR_NO":"160627142027150120836","TR_RS_ID":1000779,"TR_RS_STATUS":2,"TR_UPDATE_ORDER_STATUS":0}}

python解析后的SQL:

insert into CCPS_UNNORMAL_TRADERECORD(UTR_ID, UTR_NO, UTR_MER_NO, UTR_GW_NO, UTR_MER_ORDERNO, UTR_CURRENCY, UTR_AMOUNT, UTR_BANK_CODE, UTR_CHA_CODE, UTR_RETURNURL, UTR_WEBSITE, UTR_SUBMITURL, UTR_PAYSTARTTIME, UTR_PAYTIME, UTR_PAYENDTIME, UTR_ERRORCODE, UTR_ERRORREASON_OUT, UTR_ERRORREASON_IN, UTR_MER_REMARK, UTR_RISKINFO, UTR_SETSCORE, UTR_TOTALSCORE, UTR_PASSRISKINFO, UTR_PM_ID, UTR_CARDNO_ENCRYPT, UTR_NOTICEURL, UTR_BILL_ADDRESS_WHITE, UTR_BILL_ADDRESS_EXTRA1_WHITE, UTR_LOGO_URL, UTR_LANGUAGE, UTR_PAY_EXPIRATIONTIME, UTR_MCC, UTR_PM_NAME, UTR_TRADE_BANK_NAME, UTR_EXPIRATION_TIME) values (923628, '200818181648559171322', 191713, 19171301, '1597745803378', 'USD', '0.01', NULL, NULL, NULL, 'http://192.168.11.98:7105/Test/jsp/testPayTwo.jsp', 'http://192.168.11.98:7105/PaymentGateway/gateway/directservice/pay', '2020-08-18 18:16:48.233971000', NULL, NULL, '50000', 'Unpaid', 'Unpaid', '2fang', NULL, NULL, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Credit Card', NULL, NULL);
update CCPS_UNNORMAL_TRADERECORD set UTR_CARDNO_ENCRYPT = 'GuL6A5JN4rwAaHb8fPSAMdhxRA9CL1RVIjWwP81YujHEvXuz0YnmBssWqslPmsAgsuPeKBdQarDb4B23oy0C6wMJYner9LKmRVbuNfuGummiWnRmHAM9Dn2dJKcujEPProxYeBWUMbEhk9V54Q018cML1bcf3fvJjKL/vt4xBbk=' where UTR_NO = '200818181648559171322'
delete from CCPS_UNNORMAL_TRADERECORD where UTR_ID = 923628
insert into CCPS_TRADERECORD(TR_ID, TR_NO, TR_MER_ORDERNO, TR_MER_NO, TR_GW_NO, TR_CURRENCY, TR_AMOUNT, TR_STATUS, TR_TRADE_RATE, TR_SPP_CURRENCY, TR_SPP, TR_AGENT_NO, TR_AGENT_RATE, TR_AGENT_SPP_CURRENCY, TR_AGENT_SPP, TR_RESEVER_RATE, TR_RATE_VALUE, TR_BANKCURRENCY, TR_BANKAMOUT, TR_BANK_CODE, TR_CHA_CODE, TR_ISDELAY, TR_CHA_RATE, TR_BANK_SPP_CURRENCY, TR_BANK_SPP, TR_CHA_SETT_BANK, TR_BANKORDERNO, TR_BANKRETURNCODE, TR_BANKINFO, TR_PAYSTARTTIME, TR_PAYENDTIME, TR_DATETIME, TR_BANKDATETIME, TR_RETURNURL, TR_WEBSITE, TR_SUBMITURL, TR_CHECKED, TR_CHECKDATETIME, TR_REFUNDMENT, TR_PROTEST, TR_CONGEAL, TR_DELIVERY, TR_REFUNDMENT_AMOUNT, TR_PROTEST_AMOUNT, TR_EXP_AMOUNT, TR_CONGEAL_AMOUNT, TR_UNSETT_AMOUNT, TR_TS_ID, TR_TS_STATUS, TR_RS_ID, TR_RS_STATUS, TR_AS_ID, TR_AS_STATUS, TR_QUERYNO, TR_AUTHORIZELD, TR_BATCHNO, TR_TERMINALNO, TR_ISDCC, TR_TS_BATCH, TR_ISTSCHECK, TR_RS_BATCH, TR_ISRSCHECK, TR_ISDAYCHECK, TR_ISEXPLAIN, TR_ISFRAUD, TR_ISCOMPLAIN, TR_INF_TYPE, TR_ISLOCK, TR_FEE_FAIL_MER, TR_FEE_SUCCESS_MER, TR_FEE_SUCCESS_AFTER_MER, TR_IS_BACK_MER, TR_IS_BACK_AFTER_MER, TR_FEE_FAIL_AGENT, TR_FEE_SUCCESS_AGENT, TR_FEE_SUCCESS_AFTER_AGENT, TR_IS_BACK_AGENT, TR_IS_BACK_AFTER_AGENT, TR_FEE_FAIL_CHA, TR_FEE_SUCCESS_CHA, TR_FEE_SUCCESS_AFTER_CHA, TR_IS_BACK_CHA, TR_IS_BACK_AFTER_CHA, TR_REMARK, TR_BANKTRADETIME, TR_CARDTYPE, TR_REFERENCE, TR_IS_REPAY, TR_RISKINFO, TR_SETSCORE, TR_TOTALSCORE, TR_PASSRISKINFO, TR_PM_ID, TR_AUTH_TYPE, TR_SEND_CR_FLAG, TR_SEND_MER_FLAG, TR_REFCHAPRO_FLAG, TR_CLEAR_AMOUNT, TR_CLEAR_TIME, TR_CLEAR_STATUS, TR_BANK_CHARGES, TR_CLEAR_CURRENCY, TR_AUTH_OP_TIME, TR_DEBITTIME, TR_SETT_RATE, TR_SETT_AMOUNT, TR_SETT_CURRENCY, TR_TRANRETURNCODE, TR_TRANINFO, TR_WHETHER_3D, TR_ECI_VALUE, TR_ISAUTH, TR_SETTLEMENT, TR_NOTICEURL, TR_NOTICESTATUS, TR_CHARESERVERATE, TR_REFUSEAMOUNT, TR_REFOUNDAMOUNT, TR_BAILAMOUNT, TR_IS_MER_REFOUND, TR_IS_MER_REFUSE, TR_SETT_SPP, TR_BILL_ADDRESS, TR_PAY_SOLUTIONS, TR_PAY_INSTALLCOUNT, TR_PAY_INSTALLRATE, TR_SPP_RATE, TR_RATE_INSTALLRATE, TR_MER_LOGOURL, TR_CLEAR_BATCHNO, TR_CLEAR_REMARK, TR_IS_MER_REFOUND_PART, TR_BILL_ADDRESS_WHITE, TR_BILL_ADDRESS_EXTRA1_WHITE, TR_PRIOR_CHANNEL, TR_SETTLEMENT_BANK, TR_SUB_BATCHNO_BANK, TR_SUB_BANK_NO, TR_BANK_REMARK, TR_CONTRACTTYPE, TR_SETTLEMENTBODY, TR_CHANNELBODY, TR_SETTLEMENTAMONT, TR_TSINGLEFEE, TR_TREFUNDFEE, TR_TRATE, TR_TRATERETURN, TR_TAXATION, TR_MIN_TRADE_FEE, TR_BANK_RATE_HKD, TR_SETT_RATE_HKD, TR_RETURN_AVS, TR_RETURN_CVV, TR_TYPE, TR_SETT_STATUS, TR_REFUND_LOCK, TR_MCC, TR_UPDATE_ORDER_STATUS, TR_MANUAL_LIQUIDATION_REMARK, TR_OP_ENTITY, TR_TRADE_BANK_NAME, TR_CLEAR_RATE, TR_INTERCHANGE_FEE, TR_SCHEME_FEE, TR_ACQUIRER_FEE, TR_CLEAR_TRADE_RATE, TR_CROSS_BORDER_FEE, TR_AVS_OUT_RESULT, TR_CAVV, TR_BANK_PID, TR_EXPIRATION_TIME, TR_TCHARGEBACK_FEE, TR_TCONGEAL_FEE, TR_FXAJD_FEE, TR_TOTHER_FEE) values (787160, '200818181648559171322', '1597745803378', 191713, 19171301, 'USD', 0.01, -1, 0.01, 'USD', 0, 0, NULL, NULL, NULL, 0.1, 1, 'USD', 0.01, 'OPBank', 121, 0, 0.01, 'USD', 1.0, NULL, NULL, NULL, NULL, '2020-08-18 18:16:48.139000000', NULL, '2020-08-18 18:16:48.346618000', NULL, NULL, 'http://192.168.11.98:7105/Test/jsp/testPayTwo.jsp', 'http://192.168.11.98:7105/PaymentGateway/gateway/directservice/pay', 0, NULL, 0, 0, 0, NULL, 0, 0, 0, 0, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL, NULL, 0, 0, 0, 0, 0, 0, -2, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, '2fang', NULL, 1, '200818181648559171322', 0, NULL, 0, 98.0, NULL, 1, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, '2020-08-18 18:16:48.346618000', 1, 0.01, 'USD', NULL, NULL, '0', NULL, 0, 0, NULL, NULL, 0.01, 0, 0, NULL, 0, 0, 0, 'OPBANK 账单地址', NULL, NULL, NULL, 1, 0, NULL, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL, NULL, 302, 8, 2, NULL, NULL, NULL, NULL, NULL, NULL, 0, 7.75081, 7.75081, NULL, NULL, 1, NULL, NULL, 5722, 0, NULL, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
update CCPS_TRADERECORD set TR_STATUS = 1,TR_BANKORDERNO = NULL,TR_BANKRETURNCODE = '80000',TR_BANKINFO = 'Transaction Approved1',TR_BANKDATETIME = '2020-08-18 18:16:48.360706000',TR_CHECKED = 0,TR_CHECKDATETIME = '2020-08-18 18:16:48.000000000',TR_QUERYNO = NULL,TR_AUTHORIZELD = NULL,TR_BATCHNO = NULL,TR_TERMINALNO = NULL,TR_AUTH_TYPE = 0,TR_DEBITTIME = '2020-08-18 18:16:48.360706000',TR_TRANRETURNCODE = '10000',TR_TRANINFO = 'Transaction Approved',TR_WHETHER_3D = '0',TR_ECI_VALUE = NULL,TR_NOTICESTATUS = 2,TR_PAY_SOLUTIONS = 'Transaction Approved3',TR_BANK_REMARK = ';;',TR_RETURN_AVS = NULL,TR_RETURN_CVV = NULL,TR_UPDATE_ORDER_STATUS = 0 where TR_NO = '200818181648559171322'
update CCPS_TRADERECORD set TR_RS_ID = 1000779,TR_RS_STATUS = 2,TR_UPDATE_ORDER_STATUS = 0 where TR_NO = '160627142027150120836'

在GP数据库验证结果(测试数据及DML较多,随机抽了一条验证):

devin=> select TR_STATUS,TR_BANKRETURNCODE,TR_BANKDATETIME,TR_CHECKDATETIME,TR_TRANINFO,TR_BANK_REMARK from CCPS_TRADERECORD where TR_NO = '200818181648559171322';tr_status | tr_bankreturncode |      tr_bankdatetime       |  tr_checkdatetime   |     tr_traninfo      | tr_bank_remark
-----------+-------------------+----------------------------+---------------------+----------------------+----------------1 | 80000             | 2020-08-18 18:16:48.360706 | 2020-08-18 18:16:48 | Transaction Approved | ;;
(1 row)

至此,数据同步成功。

作者

肖杰:云和恩墨Oracle技术顾问,长期服务于银行,金融,能源等行业的数据中心,8年数据库运维经验,擅长Oracle,PostgreSQL等数据库高可用设计和运维故障处理,异常恢复,升级迁移,性能优化。

墨天轮原文链接:https://www.modb.pro/db/29348(复制到浏览器中打开或者点击“阅读原文”立即查看)。

推荐阅读:267页!2020年度数据库技术年刊

推荐下载:2020数据技术嘉年华PPT下载

2020数据技术嘉年华近50个PPT下载、视频回放已上传墨天轮平台,可在“数据和云”公众号回复关键词“2020DTC”获得!

视频号,新的分享时代,关注我们,看看有什么新发现?

数据和云

ID:OraNews

如有收获,请划至底部,点击“在看”,谢谢!

点击下图查看更多 ↓

云和恩墨大讲堂 | 一个分享交流的地方

长按,识别二维码,加入万人交流社群

请备注:云和恩墨大讲堂

  点个“在看”

你的喜欢会被看到❤

利用OGG实现Oracle到Kafka到Greenplum的增量数据同步相关推荐

  1. ogg oracle 测试kafka_利用ogg实现oracle到kafka的增量数据实时同步

    前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标 ...

  2. oracle oci.dll无法加载_基于OGG 实现Oracle到Kafka增量数据实时同步

    背景 在大数据时代,存在大量基于数据的业务.数据需要在不同的系统之间流动.整合.通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据.传统的数仓通过批量数据同步的 ...

  3. 使用ogg实现oracle到kafka的增量数据实时同步

    Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步. ...

  4. canal+Kafka实现mysql与redis数据同步

    前言 上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用.在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化.如果这时候数据库数据发生变更操作,就不 ...

  5. 基于 Kafka 与 Debezium 构建实时数据同步

    起源 在进行架构转型与分库分表之前,我们一直采用非常典型的单体应用架构:主服务是一个 Java WebApp,使用 Nginx 并选择 Session Sticky 分发策略做负载均衡和会话保持:背后 ...

  6. oracle同一个示例两个用户的数据同步_分享两个shell脚本--一键统计Oracle数据库用户信息...

    概述 今天主要分享一下两个shell脚本,主要是为了查看所有数据库用户及其表空间,统计某个指定用户的明细,下面一起来看看吧~ 数据库连接脚本 use script settdb.sh for DB l ...

  7. 利用ogg微服务版将oracle同步到kafka

    ogg微服务版可以再界面上配置抽取.复制进程,不必进入到shell中进行配置,并且图形化界面可以看到更多信息. 系统架构 源端安装ogg for oracle 19C , 目标端安装ogg for b ...

  8. 基于OGG Datahub插件将Oracle数据同步上云

    摘要:随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中,并利用大数据工具对数据进行分析. 一.背景介绍 随着数据规 ...

  9. oracle rac 主备 轮训,oracle集群(RAC)和主备数据同步(DataGuard)思路

    在线QQ客服:1922638 专业的SQL Server.MySQL数据库同步软件 Oracle主备库数据同步使用DataGuard.Data Guard 是Oracle的远程复制技术,它有物理和逻辑 ...

最新文章

  1. 数据挖掘公司D square nv 完成500万欧元B轮融资
  2. 写给新手:2021版调参上分手册!
  3. mysql平均分 面试_经典mysql面试题 - 学生成绩
  4. Ural(Timus) 1146. Maximum Sum
  5. java.net.URISyntaxException的解决办法
  6. 以DES的方式实现对称加密,并提供密钥
  7. Linux策略路由--原理、配置和应用
  8. js控制css 加载,CSS样式表的加载对于DOM解析,渲染,JS执行的阻塞问题
  9. 逸管家:别只共享单车,互联网时代还可以共享人才
  10. [Leetcode] Pascal's Triangle II
  11. JAVA中获取工程路径的方法
  12. 过计算机管理共享文件夹,局域网中怎么查看自己共享过的文件
  13. java 栈的输出顺序,栈在Java类库中的顺序表示及实现——Stack
  14. ibm笔记本修复计算机开机按,联想thinkpad重装系统按什么键_联想thinkpad电脑重装系统按哪个键-win7之家...
  15. xadmin里面的设置
  16. Failed to open \EFI\BOOT\mmx64.efi问题解决
  17. 谋杀之谜 良辰吉日 人物关系图
  18. euler和鸿蒙,euleros 鸿蒙
  19. cmgr linux命令,linux下利用GPRS模块发短信、打电话
  20. 比较一下 Spring Security 和 Shiro 各自的优缺点

热门文章

  1. 敏捷 橄榄球运动_为什么我为大学橄榄球博客选择Wordpress
  2. 作死 | 程序猿的花式作死,因为你的幽默,你的前途有了!
  3. 牛X | 一款比传统数据库快100-1000倍的数据库,认识一下
  4. React 组件性能优化之 PureComponent 的使用
  5. es6 属性的可枚举性和遍历
  6. linux手机用什么购物支付,商城系统的微信支付应该如何配置?
  7. CAN笔记(18) 对象字典
  8. TensorFlow笔记(5) 多元线性回归
  9. 北京理工大学计算机学院研究生培养方案,北京理工大学2018版学术型研究生培养方案.PDF...
  10. 转载:jsonp详解