1. 背景

我司目前数据库之间的数据同步都是oracle goldengate(ogg)方案,该方案的特点:
优点:

  • 基于数据库的变更日志同步(oracle redo\mysql binlog),速度很快,对数据库性能影响很小,适合大量数据同步的场景

缺点:

  • 同步表变更字段、新增表,需要修改数据库服务器上的很多配置文件,比较繁琐,在exact、pump、replicate进程很多的情况下,易误操作;
  • 如果某个表同步失败,重建配置比较复杂;
  • 需要在每个数据库服务器上安装软件;
  • 无界面,不直观,配置分散……

为了解决ogg的上述缺点,研究了新的同步方案:kettle
kettle是通过sql,基于主键、时间戳增量同步数据,不需在数据库服务器上做任何配置,只需在kettle服务器上创建配置JOB即可,有简单直观的CS平台。
今后将以kettle为主,同步数据量很大的表(如单表日同步100万记录以上),会考虑ogg。

2. 安装

版本:pdi8.3
OS:因为linux的界面失真,用起来实在蓝瘦香菇,而且每次启动spoon都很费时,因此用windows平台。
安装过程很简单:

  • 安装jdk1.8;
  • 解压pdi.zip;
  • 下载驱动oracle(ojdbc14.jar)、mysql(mysql-connector-java-5.1.9.jar、mysql-connector-java-6.0.6.jar)至目录pdi\data-integration\lib:

3. 配置

kettle的几个关键组件说明

  • SQL:顾名思义,就是执行一段SQL,可以包含多条
  • 转换:包含表输入、设置变量、排序、合并、switch、表输出、更新、插入/更新等功能,是kettle的精华所在,实现关键逻辑
  • 作业:一个同步的完整逻辑,包含SQL、转换等,用“作业定时调度”和“成功”分别开始和结束作业
  • DB连接:数据库连接,可以在转换或作业下创建,默认在该转换或作业下生效;支持“共享”,将该连接在全局范围生效

对表的要求

  1. 表有主键;
  2. 应用不能物理删除数据,只能逻辑删除数据,设置字段(delete_flag tinyint/number(1):0:未删除,1,已删除),可通过定时任务在源和目标端物理删除delete_flag=1的数据;
  3. 统一时间戳字段(update_time oracle:date/mysql datetime),所有数据变更(包含delete_flag)必须同时修改update_time;
  4. Oracle number类型同步到mysql bigint时,支持最大长度18位,不能用默认number(默认38位);
  5. 手动维护数据时需要源和目标端同时处理(truncate,delete,update without update_time);

原理说明

  1. 创建表sync_timestamp(table_name,time_stamp),记录每张表完成同步的时间戳,第一次同步前,手工设置为源表中update_time最小值;
  2. 每次同步时,先设置环境变量TIME_STAMP = sync_timestamp中的time_stamp;
  3. 将源表大于等于TIME_STAMP的数据对目标表插入/更新;
  4. 用目标表update_time字段的最大值,更新sync_timestamp中的time_stamp字段;

作业:job_sync_t1(oracle->mysql)

转换:trans_get_timestamp

表输入:

select DATE_FORMAT(time_stamp,'%Y-%m-%d %H:%i:%S') time_stamp
from sync_timestamp
where table_name='sync_t1'

设置变量:

转换:trans_sync_data

表输入:

select *
from sync_t1
where update_time >= to_date('${TIME_STAMP}','yyyy-mm-dd hh24:mi:ss')

插入/更新:

4. 调度

kettle job通过kitchen命令执行;每次执行JOB时,kitchen要初始化几百M的内存,耗时10秒以上,一个kitchen进程启动后只能执行1次JOB;如果大量kitchen同时启动,会消耗大量内存,对OS内存配置要求很高。

为了合理化利用资源,将JOB调度按调度频率划分到不同文件,每个文件根据具体同步数据的压力情况安排3-5个job:

1分钟执行1次的JOB:

file1: job_1min_1.bat,包含job1-job3

file2: job_1min_2.bat,包含job4-job6

file3: job_1min_3.bat,包含job7-job9

……

job_1min_1.bat的内容:

e:
cd e:\pdi\data-integration
kitchen /file:e:\mykettle\job1\job.kjb /level:Error
kitchen /file:e:\mykettle\job2\job.kjb /level:Error
kitchen /file:e:\mykettle\job3\job.kjb /level:Error

通过kettle服务器的任务调度定期执行 job_1min_1.bat文件即可。

备注:kettle自身没有防止JOB并发执行的机制(如某JOB执行频率每分钟1次,但一次执行耗时超过1分钟,就会存在并发执行的情况);并发执行时,插入重复数据报错,可以监控到JOB错误,从而优化JOB或调度。

5. 监控

JOB配置日志表,记录JOB执行情况;

创建日志表

CREATE TABLE `sync_log` (`ID_JOB` bigint(20) NOT NULL,`JOBNAME` varchar(100) COLLATE utf8mb4_bin DEFAULT NULL,`STATUS` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,`ERRORS` bigint(20) DEFAULT NULL,`LOGDATE` datetime DEFAULT NULL,`LOG_FIELD` blob,`LINES_READ` bigint(20) DEFAULT NULL,`LINES_WRITTEN` bigint(20) DEFAULT NULL,`LINES_UPDATED` bigint(20) DEFAULT NULL,`LINES_INPUT` bigint(20) DEFAULT NULL,`LINES_OUTPUT` bigint(20) DEFAULT NULL,`LINES_REJECTED` bigint(20) DEFAULT NULL,`STARTDATE` datetime DEFAULT NULL,`DEPDATE` datetime DEFAULT NULL,`REPLAYDATE` datetime DEFAULT NULL,`ENDDATE` datetime DEFAULT NULL,PRIMARY KEY (`ID_JOB`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

job指定日志表:

sync_log

sync_timestamp

通过上述2个表的关联,确认JOB是否正常执行,配置到zabbix监控项。

判断标准:sync_timestamp表记录的table_name,在sync_log中都有最新的成功执行的日志(status=end,error=0,logdate<now-interval_seconds)

select count(*)-(select count(*) from dbcopy.sync_timestamp) errorjob
from dbcopy.sync_log a, dbcopy.sync_timestamp b
where (a.jobname,a.logdate) in  (select jobname,max(logdate) from dbcopy.sync_log where errors=0 and status='end' group by jobname)
and a.jobname=b.table_name
and a.logdate > DATE_SUB(now(),INTERVAL (b.interval_seconds + 60) second)

以上SQL返回值>0即说明JOB执行有异常。

over ~

kettle实现数据增量同步方案相关推荐

  1. hive数据增量同步方案

    目录 1-每天全量同步 2-每天增量同步 3-不变的数据增量同步 1-每天全量同步 如人员表.订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进 ...

  2. SpringBoot + xxl-job 多数据源异构数据增量同步

    SpringBoot + xxl-job 多数据源异构数据增量同步 文章目录 SpringBoot + xxl-job 多数据源异构数据增量同步 一.概述 二.实现步骤 2.1 项目搭建 2.2 接口 ...

  3. 实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

    数据同步一直是一个令人头疼的问题.在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方. 但是随着业务 ...

  4. Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

    数据同步一直是一个令人头疼的问题.在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方. 但是随着业务 ...

  5. Canal 实现数据增量同步

    数据同步一直是一个令人头疼的问题.在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方. 但是随着业务 ...

  6. mysql增量同步_在两个MySQL数据库之间实现数据增量同步

    在线QQ客服:1922638 专业的SQL Server.MySQL数据库同步软件 在两个数据库中实现数据增量同步,令数据库之间的数据能够同步更新. Oracle数据库IP:192.168.0.1(源 ...

  7. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  8. ODPS 数据全量/增量同步方案

    随着业务量增加,原采用的mysql 对大量业务数据的处理效率降低,公司采购的ODPS(MaxCompute) 阿里数据处理平台,进行数据的处理. 一.源数据库 -> ODPS 全量同步,直接同步 ...

  9. canal mysql从库_canal中间件|数据增量同步解决方案

    上一文中提到延时双删等策略实现数据一致性的时候,可能存在删除缓存失败的情况,就会出现缓存和数据库不一致的问题.为了应对删除缓存失败而导致数据不一致的问题,可以通过回溯数据库日志文件,提供一个保障的重试 ...

最新文章

  1. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)
  2. 互联网协议 — TCP — 性能问题解析
  3. 数据结构(严蔚敏)之五——循环队列(c语言实现)
  4. javascript动态创建可拖动、最大化、最小化的层
  5. SAP UI的加载动画效果和幽灵设计(Ghost Design)
  6. Linux/unix 查看端口占用
  7. Angular使用Console.log()打印出来的数据没问题,点击详情后数据变了
  8. matlab fig生成exe,MATLAB GUI多个m文件和fig如何生成exe文件
  9. 混凝土地坪机器人_地面整平机器人:精准又高效,轻松摆“平”混凝土
  10. MAT分析android内存泄漏
  11. python 字符串%和format_Python必懂知识点,格式化字符串,到底用.format还是%
  12. socket结构和几个IP地址转换函数
  13. 【MySQL】MySQL 中的函数
  14. 多表关联更新,UPDATE FROM用法
  15. CentOS-6.5-x86_64 最小化安装,已安装包的总数,这些包?
  16. ACL 2021 | 火山翻译成绩斐然
  17. linux代码运行流程,Linux中程序执行的流程分析工具——strace
  18. Linux svn服务器自身回退版本
  19. gc cr block lost
  20. ai人工智能让女神_人工智能可能只会让你兴奋不已

热门文章

  1. 如何让自己时刻冷静的方法_高三怎么让自己心静下来
  2. 网客 网络真正的守护神!!!!!
  3. JS实现漂亮的窗口拖拽效果(可改变大小、最大化、最小化、关闭)
  4. Python-opencv学习第二课:图像色彩
  5. GitHub集成Circle CI(附 Circle CI 配置示例文件)
  6. Android仿微信图片浏览
  7. 233网校题库数据提取分析
  8. FUJINON镜头伺服器EPD-21A-A02拆机记录
  9. 七夕活动主题html邮件,网易邮箱发起七夕活动 这些情书让人动容
  10. 玩客云 Linux系统emmc直刷恢复