DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

例子:

全量从MYSQL 同步到MYSQL

{
  "job": {
  "content":[
  {
    "reader":{
    "name":"mysqlreader",
    "parameter":{
    "connection":[
      {
        "querySql":["SELECT * FROM TABEL"],
        "jdbcUrl":["jdbc:mysql://127.0.0.1:3306/dsjglpt"],
      }
    ],

    "username":"root",
    "password":"123456",
    "mandatoryEncoding":"UTF-8"
    }
    },
    "writer":{
      "name":"mysqlwriter",
      "parameter":{
      "column": ["*"],
      "connection":[
        {
          "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/dsjglpt_tmp",
          "table":["TABEL"],
        }
      ],

    "username":"root",
    "password":"123456",
    "mandatoryEncoding":"UTF-8",
    "preSql": ["truncate table TABEL"],
    "writeMode": "insert"
    }
    }
   }
  ],
  "setting":{
  "errorLimit":{
  "record":0
  },
  "speed":{
  "channel":"1"
  }
  }
  }
}

从 MYSQL 同步到 PostgreSQL , 部分配置如下:

"name": "postgresqlwriter"

"jdbcUrl": "jdbc:postgresql://[target_server]:5432/[target_db]",

编写    ty_commit_datax_sjyc.sh   (并发任务数)

if [ $# -lt 1 ] ; then
echo "请输入正确的参数:并发任务数 "
exit 1;
fi
processNum=$1
##基础路径
base=/home/admin
##新版本的datax
dataxHome=/home/admin/datax/datax3
##配置文件目录
confBase=`ls ${base}/sjyc/*.json`
##全量json目录
qlJsonPath="${base}/sjyc"
##如果统计表记录数文件不存在,需先执行统计表记录数脚本
#if [ ! -f ${shellPath}/log/count/${syscode}_CountTable.conf ] ;then
# echo "表记录数文件${syscode}_CountTable.conf不存在,请先执行统计记录数脚本!"
# exit 1;
#fi
startTime=`date '+%Y%m%d%H%M%S'`
##输出日志路径
outpath="${base}/log/commit/sjyc/${startTime}"
if [ -d ${outpath} ] ;then
rm -rf ${outpath}
fi
mkdir -p ${outpath}
touch ${outpath}/commitJson.log
touch ${outpath}/succJson.log
touch ${outpath}/failJson.log
mkdir -p ${outpath}/succLog
mkdir -p ${outpath}/failLog
jsonNum=`ls ${qlJsonPath}/*.json | wc -l`
if [ ${processNum} -gt ${jsonNum} ] ;then
echo "并发任务数必须小于等于json文件数:${jsonNum}"
exit 1;
fi

##以当前进程号命名
fifoName="/tmp/$$.fifo"
mkfifo ${fifoName}
##定义文件描述符(fd是一个整数,可理解为索引或指针),以读写的方式绑定到文件描述符3中
exec 3<>"${fifoName}"
##此时可以删除管道文件,保留fd即可
rm -rf ${fifoName}
##定义进程数量,向管道文件中写入进程数量个空行
for ((i=1;i<=${processNum};i++)) do
echo >&3
done

##组装dataX 动态参数

##根据配置文件ty_createJson.conf中的表匹配json目录下的json文件,如果一个表匹配到多个json文件(分区表的情况),则依次提交
for file in `ls ${qlJsonPath}/*.json` ;do
##读入一个空行
read -u3
{
jsonName="${file##*/}"
fileName="${jsonName%%.*}"
echo ${jsonName}
echo "${file}" >> ${outpath}/commitJson.log
python ${dataxHome}/bin/datax.py ${file} 2>&1 >> ${outpath}/${fileName}.log
result=$?
if [ ${result} -eq 0 ] ;then
echo "${fileName}" >> ${outpath}/succJson.log
mv -f ${outpath}/${fileName}.log ${outpath}/succLog
else
echo "${fileName}" >> ${outpath}/failJson.log
mv -f ${outpath}/${fileName}.log ${outpath}/failLog
##记录失败的表配置
#echo "${line}" >> ${confBase}/ty_createJson_bsj.conf
fi
#sleep 1
##最后向管道文件中写入一个空行
echo >&3
}&
done
done
wait
echo "开始时间:${startTime}" >> ${outpath}/commitJson.log
echo "开始时间:${startTime}"
endTime=`date '+%Y%m%d%H%M%S'`
echo "结束时间:${endTime}" >> ${outpath}/commitJson.log
echo "结束时间:${endTime}"
#关闭文件描述符的读
exec 3<&-
#关闭文件描述符的写
exec 3>&-
exit 0

编写CRONTAB 脚本(每天6点半执行 每次执行10个JSON文件)

crontab-e

30 6 * * * /home/admin/ty_commit_datax_sjyc.sh 10

------------------------------------------------------------------

要实现增量更新, 首先要 PostgresqlReader 从目标数据库读取最大日期, 并用 TextFileWriter 写入到一个 csv 文件, 这一步我的配置如下所示:

{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://[target_server]:5432/[target_db]"
],
"querySql": [
"SELECT max(data_time) FROM public.minute_data"
]
}
],
"password": "...",
"username": "..."
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"fileName": "minute_data_max_time_result",
"fileFormat": "csv",
"path": "/scripts/",
"writeMode": "truncate"
}
}
}
],
"setting": { }
}
}

最后编写增量同步的 shell 文件, 内容如下:

#!/bin/bash
### every exit != 0 fails the script
set -e

# 获取目标数据库最大数据时间,并写入一个 csv 文件
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_max_time.json
if [ $? -ne 0 ]; then
echo "minute_data_sync.sh error, can not get max_time from target db!"
exit 1
fi
# 找到 DataX 写入的文本文件,并将内容读取到一个变量中
RESULT_FILE=`ls minute_data_max_time_result_*`
MAX_TIME=`cat $RESULT_FILE`
# 如果最大时间不为 null 的话, 修改全部同步的配置,进行增量更新;
if [ "$MAX_TIME" != "null" ]; then
# 设置增量更新过滤条件
WHERE="DataTime > '$MAX_TIME'"
sed "s/1=1/$WHERE/g" minute_data.json > minute_data_tmp.json
# 将第 45 行的 truncate 语句删除;
sed '45d' minute_data_tmp.json > minute_data_inc.json
# 增量更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_inc.json
# 删除临时文件
rm ./minute_data_tmp.json ./minute_data_inc.json
else
# 全部更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data.json
fi

转载于:https://www.cnblogs.com/misterzhaoyan/p/9366055.html

ETL工具 DataX数据同步,LINUX CRONTAB 定时调度相关推荐

  1. etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别

    什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...

  2. java etl工具_一文带你入门ETL工具-datax的简单使用

    什么是ETL? ETL负责将分布的.异构数据源中的数据如关系数据.平面数据文件等抽取到临时中间层后进行清洗.转换.集成,最后加载到数据仓库或数据集市中,成为联机分析处理.数据挖掘的基础. ETL是数据 ...

  3. 阿里出品的ETL工具dataX初体验

    我的毕设选择了大数据方向的题目.大数据的第一步就是要拿到足够的数据源.现实情况中我们需要的数据源分布在不同的业务系统中,而这些系统往往是异构的,而且我们的分析过程不能影响原有业务系统的运行.为了把不同 ...

  4. datax数据同步问题(mysql2hive)汇总

    文章摘要: 1.代码 2.搭建spark 3.使用datax 4.常见问题 5.指正补充 前言: git代码,有需要的可以参考 ![GitHub contributors](https://img.s ...

  5. Linux Crontab定时执行脚本出错,但手动执行脚本正常原因及解决方案

    Linux Crontab定时执行脚本出错,但手动执行脚本正常原因及解决方案 实际开发场景 需要开发一个Flink监控程序,初步使用shell脚本进行监控,如果发现失败了,则自动重新运行Flink命令 ...

  6. springboot项目集成dolphinscheduler调度器 实现datax数据同步任务

    Datax安装及基本使用请查看上一篇文章: 文章目录 Datax概述 1.概述 2.功能清单 3.==说明==:本项目只支持mysql及hbase之间的数据同步 代码模块 配置文件 pom.xml D ...

  7. DataX数据同步工具使用

    1.DataX 简介 DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步. DataX 致力于实现包括关系型数据库(MySQL.Oracle 等).HDF ...

  8. 数据同步结合Crontab

    需求:定时同步mysql服务器上的表到另外个数据库内 解决方法:结合linux的contable和mysqldump命令完成该功能 Mysql&Shell #!/bin/bash PORT=& ...

  9. 阿里开源ETL工具——dataX简单上手

    一.概述 1.是什么? DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase. ...

  10. mysql binlog查看工具_数据同步工具otter(一)谈谈binlog和canal

    之前因为懒,没有针对otter做更多的解释和说明,在使用过程中,也发现了一些问题,此次补上一个完整的文档,方便大家使用. Otter是基于cannal开源的,canal又是基于mysql binlog ...

最新文章

  1. 页面显示 amp html6,浅谈HTML5 amp;amp; CSS3的新交互特性
  2. struts.xml中class路径错误报错的问题
  3. JMetro版本11.5.11和8.5.11发布
  4. Go语言-基本的http请求操作
  5. MySQL 索引类别与索引使用指南
  6. 多图详解Spring框架的设计理念与设计模式
  7. 稀疏矩阵相乘-Python版
  8. 在文件上传的基础上设计一个学生基本信息表
  9. 使用NetworkInterface类获得网络接口信息
  10. webform如何接收前端的ajax数据,HttpWebResponse Post 前端控件数据,后台如何接收?...
  11. ps无法启动,ps暂存盘满了
  12. audio音频使用天坑
  13. 使用openjtag和openocd操作IXP425的flash
  14. [每周心学]示弟立志说(附译文)
  15. ssrs报表服务器数据库配置文件,ReportingServicesService 配置文件
  16. 献给小白的笔记day6
  17. android如何编程红外遥控,全志A20[android教程]-红外遥控器调试
  18. 数字图像处理作业文档整合
  19. Matlab,solve函数出错,问题的解决
  20. 迄今为止最好的 Windows,更新四月版 17134 系统

热门文章

  1. Oracle11gR1中细粒度访问网络服务(转)
  2. WordPress的wp-cumulus插件------------标签云插件
  3. 【从C到C++学习笔记】bool类型/const限定符/#define//结构体对齐
  4. Cuda-convnet配置指南 on Windows8.1+CUDA6.5+VS2013
  5. 黑暗森林:知识图谱的前世今生
  6. 卡方分布、T分布和F分布
  7. Python类中的__init__,__del__和__call__方法
  8. python解析GF1卫星数据.xml文件
  9. Flutter之Stepper源码浅析
  10. linux将passwd文件拷贝到,Linux命令