本篇博客说说DataX如何进行全量和增量数据同步,虽然用演示oracle同步到mysql,但其他数据库之间的同步都差不多

1.DataX介绍

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

Github主页地址:https://github.com/alibaba/DataX
DataX工具下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

DataX采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

下面是DataX3.0支持的插件,也就是能在这些数据源直接根据json脚本配置互相同步数据

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL 读 、写
Oracle 读 、写
OceanBase 读 、写
SQLServer 读 、写
PostgreSQL 读 、写
DRDS 读 、写
达梦 读 、写
通用RDBMS(支持所有关系型数据库) 读 、写
阿里云数仓数据存储 ODPS 读 、写
ADS
OSS 读 、写
OCS 读 、写
NoSQL数据存储 OTS 读 、写
Hbase0.94 读 、写
Hbase1.1 读 、写
MongoDB 读 、写
Hive 读 、写
无结构化数据存储 TxtFile 读 、写
FTP 读 、写
HDFS 读 、写
Elasticsearch

2.DataX实战

2.1.DataX基础环境搭建
  • 1.把下载好的 datax.tar.gz 上传到Linux

  • 2.解压 tar -xzvf datax.tar.gz ,会有/datax 目录,进入cd datax 目录

  • 3.先删除datax目录中的所有隐藏文件,否则执行脚本会失败

 find ./ -name '._*' -print0 |xargs -0 rm -rf
  • 4.执行测试脚本:./bin/datax.py job/job.json ,看到下图的效果,说明环境正常
2.2.DataX 全量同步数据,oracle 到 mysql

从上面的介绍中可以知道,datax是通过不同的插件去同步数据的,每个插件都有 reader 和 writer ,要从oracle同步数据到mysql,执行: ./bin/datax.py -r oraclereader -w mysqlwriter ,获取示例json配置,然后去修改里面的参数

#在job 目录中创建 vi oracle_to_mysql.json,这是改完后能同步的参数配置
{"job": {"content": [{"reader": {"name": "oraclereader", "parameter": {"column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], "splitPk": "INVESTOR_ID","where" : "INVESTOR_ID is not null","connection": [{"jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], "table": ["CXX.CUSTOMER"]}], "password": "123456", "username": "admin"}}, "writer": {"name": "mysqlwriter", "parameter": {"column": [ "customer_no","customer_name","id_type","id_no","create_time"], "connection": [{"jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", "table": ["customer_datax"]}], "username": "admin", "password": "123456", "preSql": [], "session": ["set session sql_mode='ANSI'"], "writeMode": "update"}}}], "setting": {"speed": {"channel": "3"}}}
}

执行全量同步:./bin/datax.py job/oracle_to_mysql.json ,可以看到有 1045条记录被同步到mysql了

2.3.DataX 增量同步数据,oracle 到 mysql

增量同步需要Linux的crontab定时任务配合,再通过shell脚本计算时间,并传递到json脚本的Where条件中,"where" : "CREATE_TIME > unix_to_oracle(${create_time}) and CREATE_TIME <= unix_to_oracle(${end_time})"

${create_time} 和 ${end_time}由shell脚本计算
vi /home/datax/servers/datax/job/oracle_to_mysql.json

{"job": {"content": [{"reader": {"name": "oraclereader", "parameter": {"column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], "splitPk": "INVESTOR_ID","where" : "CREATE_TIME > unix_to_oracle(${create_time}) and CREATE_TIME <= unix_to_oracle(${end_time})","connection": [{"jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], "table": ["CXX.CUSTOMER"]}], "password": "123456", "username": "admin"}}, "writer": {"name": "mysqlwriter", "parameter": {"column": [ "customer_no","customer_name","id_type","id_no","create_time"], "connection": [{"jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", "table": ["customer_datax"]}], "username": "admin", "password": "123456", "preSql": [], "session": ["set session sql_mode='ANSI'"], "writeMode": "update"}}}], "setting": {"speed": {"channel": "3"}}}
}

vi /home/datax/servers/datax/job/increment_sync.sh

#!/bin/bash
source /etc/profile
# 截至时间设置为当前时间戳
end_time=$(date +%s)
# 开始时间设置为300s前时间戳
create_time=$(($end_time - 300))
# 执行datax脚本,传入时间范围
/home/datax/servers/datax/bin/datax.py /home/datax/servers/datax/job/oracle_to_mysql.json -p "-Dcreate_time=$create_time -Dend_time=$end_time" &

并给increment_sync.sh赋可执行权限:chmod -R 777 increment_sync.sh

然后设置 crontab 定时任务,每5分钟执行一次,和上面脚本中的300s对应

crontab -e*/5 * * * * /home/datax/servers/datax/job/increment_sync.sh >/dev/null 2>&1

tip:oralce是没有 unix_to_oracle 函数的,需要自行在oracle中创建

 create or replace function unix_to_oracle(in_number NUMBER) return date isbeginreturn(TO_DATE('19700101','yyyymmdd') + in_number/86400 +TO_NUMBER(SUBSTR(TZ_OFFSET(sessiontimezone),1,3))/24);end unix_to_oracle;

好了,此时就完成了增量同步

3.DataX同步流程:

  • 1.第一次部署datax时,手动执行全量同步脚本,同步已有客户数据
  • 2.再进行增量同步,用Linux的crontab和脚本配合,能按时间进行增量同步
  • 3.oracle同步mysql时,有几种同步模式,建议"writeMode"设置为"update":
    • 3.1.mysql的"writeMode"设置为"insert",在有重复数据记录时,不会同步,直接跳过,就算oracle中该条数据已经修改了,也不会同步
    • 3.2.mysql的"writeMode"设置为"replace",在有重复数据记录时,会先删除mysql中的记录,再把oracle中的记录新增进去
    • 3.3.mysql的"writeMode"设置为"update",在有重复数据记录时,会把oracle中的列,覆盖mysql中的列,未配置同步的列,不会覆盖
  • 4.需要在oracle库创建uninx_to_date() 函数

4.增量同步方式优化

上面需要在oracle库创建一个uninx_to_date()函数,下面用shell脚本把uninx时间戳转为yyyy-MM-dd hh:mm:ss 类型,然后传到oracle_to_mysql.json配置中,就不用创建这个uninx_to_date()函数了

increment_sync.sh 脚本获取字符串类型时间:

#!/bin/bash
source /etc/profile
#当前时间戳
cur_time=$(date +%s)
#结束时间
end_time="'$(date -d @$cur_time +"%Y-%m-%d %H:%M:%S")'"
#开始时间,为当前时间的前300s
create_time="'$(date -d @$(($cur_time-120)) +"%Y-%m-%d %H:%M:%S")'"# 执行datax脚本,传入时间范围
/home/datax/servers/datax/bin/datax.py /home/datax/servers/datax/job/oracle_to_mysql.json -p "-Dcreate_time=$create_time -Dend_time=$end_time" &

修改oracle_to_mysql.json的where参数,去掉uninx_to_date()函数

{"job": {"content": [{"reader": {"name": "oraclereader", "parameter": {"column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], "splitPk": "INVESTOR_ID","where" : "CREATE_TIME >to_date('${create_time}','yyyy-mm-dd hh24:mi:ss')  and CREATE_TIME <= to_date('${end_time}','yyyy-mm-dd hh24:mi:ss')","connection": [{"jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], "table": ["CXX.CUSTOMER"]}], "password": "123456", "username": "admin"}}, "writer": {"name": "mysqlwriter", "parameter": {"column": [ "customer_no","customer_name","id_type","id_no","create_time"], "connection": [{"jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", "table": ["customer_datax"]}], "username": "admin", "password": "123456", "preSql": [], "session": ["set session sql_mode='ANSI'"], "writeMode": "update"}}}], "setting": {"speed": {"channel": "3"}}}
}

Windows操作系统,Python3使用datax需要下载 Python3版的datax.py,点我下载

DataX oracle同步mysql(全量和增量)相关推荐

  1. ETL异构数据源Datax_Oracle同步MySQL(全量)_04

    文章目录 一.Oracle同步Mysql 1. 构建json 2. 执行数据同步 3. 查看同步数据 4. 同步数据正确性和准确性 5. 同步日志分析 一.Oracle同步Mysql 1. 构建jso ...

  2. mysql全量和增量备份脚本

    全量: [root@master leo]# cat DBfullBak.sh #!/bin/bash #use mysqldump to fully backup mysql dataBakDir= ...

  3. 理论+实验 详解MySQL全量,增量备份与恢复

    目录 一 数据库备份的分类 1.1造成数据丢失的原因 1.2 数据库备份的分类 1.3 常见的备份方法 1.3.1 物理备份 1.3.2 专用备份工具mydump或者mysqlhotcopy 1.3. ...

  4. python 读取文件夹 增量文件_Python实现目录文件的全量和增量备份

    目标: 1.传入3个参数:源文件路径,目标文件路径,md5文件 2.每周一实现全量备份,其余时间增量备份 1.通过传入的路径,获取该路径下面的所有目录和文件(递归) 方法一:使用os.listdir ...

  5. 关于全量与增量 的思考

    一.数据同步:全量与增量 1.背景 数据如果保留多份,就会存在一致性问题,就需要同步,同步分为两大类:全量和增量 2. 概述 数据如果要保留副本,要么同时写(就是多写),或者进行复制:异步写(即从主数 ...

  6. MySQL数据以全量和增量方式,向ES搜索引擎同步流程

    本文源码:GitHub·点这里 || GitEE·点这里 一.配置详解 场景描述:MySQL数据表以全量和增量的方式向ElasticSearch搜索引擎同步. 1.下载内容 elasticsearch ...

  7. liunx系统mysql全量备份和增量备份

    前提 ​ 在互联网项目中最终还是读数据进行操作,都离不开曾删改查,那么数据是重中之重,数据库的备份就显得格外重要. ​ 但是每次都直接导出整个数据库的sql文件,显然是不现实的.对数据库的性能影响比较 ...

  8. mysql全量备份、增量备份实现方法

    mysql全量备份.增量备份.开启mysql的logbin日志功能.在/etc/my.cnf文件中加入以下代码: ? 1 2 3 4 5 6 7 [mysqld] log-bin = "/h ...

  9. mysql 数据增量备份_MySQL数据库之mysql全量备份、增量备份实现方法

    本文主要向大家介绍了MySQL数据库之mysql全量备份.增量备份实现方法 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. mysql全量备份.增量备份.开启mysql的logb ...

最新文章

  1. dbcp2和dbcp 1.4在API层面的差异
  2. python编程100例画图-Python练习实例56 | 画图,学用circle画圆形
  3. Hibernate基础学习2
  4. 我马上会重新利用这个博客的
  5. 深度学习100例 - 常见错误 及 处理办法
  6. 安装(python 版)
  7. 【高德地图API】从零开始学高德JS API(四)搜索服务——POI搜索|自动完成|输入提示|行政区域|交叉路口|自有数据检索...
  8. LCD1602液晶显示屏驱动文件
  9. 全球新冠疫情可视化图表制作
  10. 一个cv大师的摆烂之旅
  11. DSSD : Deconvolutional Single Shot Detector
  12. 两个电脑主机共用一个显示器
  13. Ubuntu16.04安装graph-tool采坑指南
  14. Edraw Max教程】如何有趣的创建梦幻般的思维导图
  15. 各大券商提供的量化终端怎么样?
  16. 虚拟计算机可以开机吗,如何让虚拟机在电脑开机时也自动启动
  17. 比Smallpdf好用的在线转换工具有没有呢?
  18. C++ 单元测试工具 Catch
  19. 视频教程-79节PS入门基础视频教程-Photoshop
  20. ESP8266-Arduino编程实例-MQ-7一氧化碳传感器驱动

热门文章

  1. 攻防世界_难度8_happy_puzzle
  2. “网约护士”进行时:有疑惑和担忧,在观察中前进
  3. 中国联通再次下调国际漫游资费 最高降幅达90.42%
  4. mallet java_Mallet:自然语言处理工具包
  5. 给理工男女的一个神奇网站!
  6. VMWare workstation 在打开虚拟机时出现 Unable to find the VXM binary
  7. java 打码_java基础(二)
  8. android apk 减小apk的大小
  9. py2neo的neo4j数据库增删改查节点node、关系relationship、属性property操作
  10. HDU 1560 sequence