引言:

《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)

《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)

《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已完成)

《异构数据源的CDC实时同步系统——最终选型实战》 系列第四篇(已完成)

7.debezium

debezium是由redhat支持的开源分布式CDC系统,支持多端数据源,如mysql、mongodb、postgresql、oracle、sql server和Cassandra,社区非常活跃,很多的新功能和新数据源都在快速发展中,源码地址:https://github.com/debezium/debezium

我们使用debezium主要是看中它支持多数据源,同时与kafka的整合,在CDC领域不能忽略的一个商用产品是kafka conflent,在它的产品中,连接源端的组件就是debezium,我们一度就想使用这个商用

组件,但是试用版本仅支持一个broker,无法在真正的的生产环境使用,它的优势在于配置的可视化,后来我们使用kafka eagle来进行kafka的管理后,才彻底下定决心自己使用开源版本搞一套。我们最终采用的整体方案是debezium+kafka+kafka-connect-jdbc,管理端使用的kafka eagle.

关于confluent的资料,网上很多,我们在实际配置的过程中也参考了很多它的建议。

注意事项:

1)debezium需要设置的mysql权限:GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
2)采用阿里云的rds的mysql数据源非常坑,默认是不开启SHOW DATABASES权限的,需要在debezium中单独配置属性database.history.store.only.monitored.tables.ddl:true
3)debezium配合kafaka启动使用properties方式,也就是说第一个源需要配置为文本模式,后续可采用动态增加源的方式动态增加,但是文件模式需要为json

8.kafka-connect-jdbc

开源地址:https://github.com/confluentinc/kafka-connect-jdbc,它是confluent开源的兼容jdbc的数据库同步数据的kafka connect连接器。

这个组件支持的目的端的源非常多,理论上有java客户端的基本都支持,所以基本上可以涵盖你能用到的绝大多数数据源,它的延迟非常好,比之前的bireme好太多了,毕竟是国外大厂支持的组件,是国内小公司开源组件所不能比拟的。

9.最终选型方案

上图为我们最终确定的方案,在实际生产中,除了直接DB层级的数据实时同步外,我们还有一套pulsar的比较灵活的数据接口方案,不在此次讨论范围之内,也就是说我们最终实现了基于DB和业务层级的实时数据同步方案。

业界其他公司的CDC方案:

=======.实际生产配置过程:==========

1.kafka安装配置,以standalone为例

需要单独说明的是:因为gpdb6目前还不支持upsert模式,debezium的新增和更新均会导致一条新增加的完整数据到kafka,默认kafka按批提交的模式会造成gpdb6的主键冲突,需要修改模式为逐条应用,同时配合自己单独写的check程序进行offset错误的自动修正

#1)安装kafka,注意2.30有个bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties    #单机版只需要消息存放路径即可log.dirs=/opt/kafka_2.12-2.4.0/kafka-logs#增加可以删除topicdelete.topic.enable=true#保留日志大小:1GB,不设置的话会日志撑爆log.retention.bytes=1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties设置为逐条应用consumer.max.poll.records=1#2)修改内置zk的配置vim config/zookeeper.properties#制定zk元数据存放路径dataDir=/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)启动服务,先启动zk再启动kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &nohup bin/kafka-server-start.sh config/server.properties —加守护进程启动bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties

2.kafka基本命令

#4)查看服务是够启动    jps#5)创建一个测试用的topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查询topic列表:bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test                          #删除topic(只会删除元数据):配置上面的delete.topic.enable=true后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手动删除文件:bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh  --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和删除群组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#从开始的消费信息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)创建控制台生产者生产数据bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新开一个进程创建消费者数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

3.debezium安装配置

#下载debezium-connector-mysql,将文件中的jar包copy到kafka的libs目录cd /opt/kafka_2.12-2.4.0/tableconfig  #tableconfig是新建目录,存放配置文件#######第一个启动的properties文件格式############name=authorization-mysql-connector-new-01connector.class=io.debezium.connector.mysql.MySqlConnectordatabase.hostname=mysql源IPdatabase.port=3306database.user=账号database.password=密码database.server.id=1database.server.name=debeziumdatabase.whitelist=platform_authorizationdatabase.serverTimezone=UTCtable.whitelist=platform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=auth.platform_authorizationinclude.schema.changes=false#使用table名作为topic名字,因为machine.db.table默认topictransforms=routetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=([^.]+).([^.]+).([^.]+)transforms.route.replacement=$3#不进行初始化,只获取当前的schema,初始化采用rds_dbsync比较方便,实际测试比init方式快几十倍,因为此处是逐条应用的snapshot.mode=schema_only##########json格式的文件#########{"name":"hanukkah-mysql-connector","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"mysql主机名","database.port":"3306","database.user":"用户名","database.password":"密码","database.server.id":"1","database.server.name":"debezium","database.whitelist":"hanukkah","database.serverTimezone":"UTC","table.whitelist":"hanukkah.cooperation_lawyer","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"mysql1.hanukkah","include.schema.changes":"false","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+).([^.]+).([^.]+)","transforms.route.replacement":"$3","snapshot.mode":"schema_only"}}                                            

4.sink配置

#首先下载kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目录即可{    "name": "sink-cooperation_lawyer-ins",    "config": {        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max": "1",        "topics": "cooperation_lawyer",        "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名",        "transforms": "unwrap",        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        "transforms.unwrap.drop.tombstones": "false",        "auto.create": "true",        "insert.mode": "insert",        "delete.enabled": "true","table.name.format": "platform.cooperation_lawyer",        "pk.fields": "id",        "pk.mode": "record_key"    }}

需要额外说明的是:在目的是greenplum数仓环境下:
1)如果mysql源端字段类型是timestamp,则需要在gpdb端配置字段类型为timestamptz后无需额外配置sink项
2)如果mysql源端字段类型是datetime,则目的端字段类型需要配置为timestamp,同时需要sink文件中增补TimestampConverter配置项,有几个datetime字段配置几个配置项
3)如果mysql源端datetime配置了精度,需要debezium配置增加time.precision.mode=connect
4) "auto.evolve": "true" 则源端表结构变更后会自动在目的端创建对应数据结构 "auto.create": "true" 则源端新增表后会自动同步到目的端

{    "name": "sink-pa_course-ins",    "config": {        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max": "1",        "topics": "pa_course",        "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名",        "transforms": "unwrap,TimestampConverter",        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        "transforms.unwrap.drop.tombstones": "false",        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",        "transforms.TimestampConverter.field": "create_time",        "transforms.TimestampConverter.target.type": "string",        "auto.create": "true",        "auto.evolve": "true",        "insert.mode": "insert",        "delete.enabled": "true",        "pk.fields": "id",        "pk.mode": "record_key"    }}

5.启动服务

#启动kafka,进程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#启动第一个sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>connector-logs/connector.log 2>&1 增补其他sourcecurl -X POST -H  "Content-Type:application/json" -d @tableconfig/authorization-source.json http://localhost:8083/connectors/#启动sinkcurl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ...#查看所有的connectors:     curl -X GET  http://127.0.0.1:8083/connectors 

6.使用eagle进行topic状态查看和管理

关于kafka eagle的下载安装,特别简单,就不单独说明了,这里仅贴出效果图

从上图非常清楚的能看到哪些topic是有问题的(红色),绝大多数问题在于offset的错误导致的,在实际使用中我们通过一个简单python守护进程的代码进行了管理

import requestsimport loggingimport psycopg2import jsonimport reimport time# 获取数仓连接def get_gp_conn():    conn = psycopg2.connect(host="192.168.2.175", port=5432, user="name", password="password",dbname='datawarehouse')    return conn'''删除目的端的主键ID'''def del_dup_id(tablefullname,dup_id):    db = get_gp_conn()    cursor = db.cursor()    sql = "delete from "+ tablefullname +" where id='" + dup_id+"'"    cursor.execute(sql)    db.commit()    cursor.close()    db.close()'''重启sink'''def restart_sink(sinkname,configname):    '''delurl = 'http://127.0.0.1:8083/connectors/'+ sinkname    del_res = requests.delete(delurl)    print("del resp:",del_res)    url = 'http://127.0.0.1:8083/connectors/'    headers = 'Content-Type:application/json,Accept:application/json'    datas = 'tableconfig/' + configname    start_res = requests.post(url,data=datas,headers=headers)    print("start resp:",start_res)    #checkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status'    '''    url = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/restart'    requests.post(url)'''检测任务状态'''def check_sink_status(sinkname,tablefullname,configname):    sinkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status'    print(sinkurl)    resp = requests.get(sinkurl)    status = json.loads(resp.text)    state = status['state']    if state == 'FAILED':        trace = status['trace']        pattern = re.compile(r'Key (id)=((.+)) already exists')        search = re.search(pattern, trace)        #print(search)        if search:            del_id = search.group(1)            print('duplicate key is {}, now to del this record of target database'.format(del_id))            del_dup_id(tablefullname,del_id)            restart_sink(sinkname,configname)'''获取任务列表'''def get_sink_list():    conn = get_gp_conn()    cur = conn.cursor()    cur.execute("select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null")    print("current time is:",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))    rows = cur.fetchall()    for row in rows:        taskname = row[0]        schema = row[1]        tablename =row[2]        configname = row[3]        tablefullname = schema +'.'+tablename        check_sink_status(taskname,tablefullname,configname)    cur.close()    conn.close()if __name__ == '__main__':    get_sink_list()

同时为了避免standalone进程的异常终止,我们用shell的守护进行进行了监控

#! /bin/bashfunction check(){      count=`ps -ef |grep $1 |grep -v "grep" |wc -l`      #echo $count      if [ 0 == $count ];then        time=$(date "+%Y-%m-%d %H:%M:%S")        echo "standalone restart at:${time}"        cd /opt/kafka_2.12-2.4.0/        bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>>connector-logs/connector.log 2>&1 &        sleep 60scurl -X POST -H  "Content-Type:application/json" -d @tableconfig/platform-source.json http://localhost:8083/connectors/    ......        sleep 10s        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties     .....      fi    }    check ConnectStandalone

另外,在实际运行过程中会出现offset错误的情况,极其特殊情况下使用上面的方法无法快速解决问题,建议使用kafkacat查看详细信息,人为跳过offset,具体细节不再赘述。

如喜欢此专题,请关注并提问,技术人驱动自身的是永不停歇的渴望。

oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战相关推荐

  1. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  2. mysql数据对比同步_跨数据库mysql语句同步数据和对比运算

    首先,A数据库[需要同步的数据库]A_product数据表[产品基本信息]product_id产品唯一IDprice产品价格A_product_option_value数据表[产品选项]product ...

  3. 易语言mysql数据同步程序_易语言mssql和mysql数据自动同步源码

    易语言mssql和mysql数据自动同步源码 易语言mssql和mysql数据自动同步源码 系统结构:RefreshTask,ComputeEndTime,ComputeOneTime,Compute ...

  4. oracle tns 代理配置_OGG实现Oracle到MySQL数据平滑迁移

    技术分享 | 使用OGG实现Oracle到MySQL数据平滑迁移 - 爱可生开源社区​opensource.actionsky.com 一.OGG概述 OGG全称为Oracle GoldenGate, ...

  5. 使用OGG/Kettle实现Oracle到MySQL数据平滑迁移

    本文目录: 一.OGG概述 (一)OGG逻辑架构 二.迁移方案 (一)环境信息 (二)表结构迁移 (三)数据迁移 1.源端OGG配置 (1)Oracle数据库配置 (2)Oracle数据库OGG用户创 ...

  6. Linux MySQL主主复制(Replication)(MySQL数据双向同步)配置

    http://www.centos.bz/2011/07/linux-mysql-replication-two-way-sync/#配置当前从服务器 Linux MySQL主主复制(Replicat ...

  7. mysql数据迁移 脚本_PHP将数据从Oracle向Mysql数据迁移实例

    为什么要迁移? 首先从运营成本考虑,用Mysql可以节约不少的费用.另一方面,Mysql的稳定性及功能不断地提高与增强,基本上可以满足客户的需求,如支持多 节点部署,数据分区等.还有就是Mysql使用 ...

  8. oracle 向mysql数据迁移

      为什么要迁移? 首先从运营成本考虑,用Mysql可以节约不少的费用.另一方面,Mysql的稳定性及功能不断地提高与增强,基本上可以满足客户的需求,如支持多 节点部署,数据分区等.还有就是Mysql ...

  9. oracle和mysql数据实时同步_FileYee文件实时同步备份软件,再不怕数据丢失

    日常工作中你是如何保存您的数据文件?简单的保存在电脑或者硬盘吗,其实数据如果单纯的放在一个终端存储是一件危险系数非常高的事情,尤其是企业的重要数据.今天小编给大家带来一款国产良心文件实时同步备份软件- ...

最新文章

  1. 30+博士、100+硕士整理的超全深度强化学习资源清单
  2. sound.js # pixi辅助插件 — 中文翻译教程
  3. python列表怎么写文件_python中以字典为元素的列表怎么写入文本文件
  4. 了解mysql的三种不同安装方式的区别
  5. 12-继承与多态(下)
  6. JVM 类加载机制:编译器常量与初始化
  7. Jsp 页面添加动态水印
  8. python使用长ping命令_在Python中调用Ping命令,批量IP的方法
  9. js 日期操作 (转载:http://blog.sina.com.cn/s/blog_699d2e170101q6iz.html)
  10. win10 python ffmpeg推流到b站
  11. FPGA——sdram控制器1
  12. linux 安装mysql(rpm文件安装)
  13. 图解react设计理论基础
  14. python求组合数c(m、n)编程题_c语言编程问题,计算出从n 个不同元素中取出m 个元素(m≤n)的组合数。编写程序...
  15. 全世android手机,全世界最安全手机:黑莓第二款Android手机DTEK 50
  16. MySql定期备份数据到历史表的解决方案
  17. 算法习题---5-3卡牌游戏(UVa10935)
  18. 旅游社交APP开发解决方案
  19. clickhouse连接Tableau
  20. Mysql 索引 与 多表查询性能优化

热门文章

  1. python带参数装饰器 函数名_python 全栈开发,Day11(函数名应用,闭包,装饰器初识,带参数以及带返回值的装饰器)...
  2. 通用计算机系统的工作方式,通用计算机操作系统典型体系结构综述
  3. gem for onenote安装教程
  4. 【渝粤教育】国家开放大学2018年春季 0149-21T现代汉语 参考试题
  5. 【渝粤教育】 国家开放大学2020年春季 1001中国法制史 参考试题
  6. 【渝粤教育】电大中专公共基础课程_1作业 题库
  7. java 注解scheduler_使用spring的@Scheduled注解执行定时任务,启动项目不输出警告
  8. jquery开关灯案例_全屋开关插座布局讲解,自己规划怕遗漏,手把手教你,很详细...
  9. mysql 大于号 优化_SQL优化 MySQL版 - 避免索引失效原则(二)
  10. 丙烯怎么做成流体丙烯_韧性好强度高的聚丙烯复合材料怎么做?让人工智能来帮忙...