ETL工具 DataX数据同步,LINUX CRONTAB 定时调度
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
例子:
全量从MYSQL 同步到MYSQL
{
"job": {
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"connection":[
{
"querySql":["SELECT * FROM TABEL"],
"jdbcUrl":["jdbc:mysql://127.0.0.1:3306/dsjglpt"],
}
],
"username":"root",
"password":"123456",
"mandatoryEncoding":"UTF-8"
}
},
"writer":{
"name":"mysqlwriter",
"parameter":{
"column": ["*"],
"connection":[
{
"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/dsjglpt_tmp",
"table":["TABEL"],
}
],
"username":"root",
"password":"123456",
"mandatoryEncoding":"UTF-8",
"preSql": ["truncate table TABEL"],
"writeMode": "insert"
}
}
}
],
"setting":{
"errorLimit":{
"record":0
},
"speed":{
"channel":"1"
}
}
}
}
从 MYSQL 同步到 PostgreSQL , 部分配置如下:
"name": "postgresqlwriter"
"jdbcUrl": "jdbc:postgresql://[target_server]:5432/[target_db]",
编写 ty_commit_datax_sjyc.sh (并发任务数)
if [ $# -lt 1 ] ; then
echo "请输入正确的参数:并发任务数 "
exit 1;
fi
processNum=$1
##基础路径
base=/home/admin
##新版本的datax
dataxHome=/home/admin/datax/datax3
##配置文件目录
confBase=`ls ${base}/sjyc/*.json`
##全量json目录
qlJsonPath="${base}/sjyc"
##如果统计表记录数文件不存在,需先执行统计表记录数脚本
#if [ ! -f ${shellPath}/log/count/${syscode}_CountTable.conf ] ;then
# echo "表记录数文件${syscode}_CountTable.conf不存在,请先执行统计记录数脚本!"
# exit 1;
#fi
startTime=`date '+%Y%m%d%H%M%S'`
##输出日志路径
outpath="${base}/log/commit/sjyc/${startTime}"
if [ -d ${outpath} ] ;then
rm -rf ${outpath}
fi
mkdir -p ${outpath}
touch ${outpath}/commitJson.log
touch ${outpath}/succJson.log
touch ${outpath}/failJson.log
mkdir -p ${outpath}/succLog
mkdir -p ${outpath}/failLog
jsonNum=`ls ${qlJsonPath}/*.json | wc -l`
if [ ${processNum} -gt ${jsonNum} ] ;then
echo "并发任务数必须小于等于json文件数:${jsonNum}"
exit 1;
fi
##以当前进程号命名
fifoName="/tmp/$$.fifo"
mkfifo ${fifoName}
##定义文件描述符(fd是一个整数,可理解为索引或指针),以读写的方式绑定到文件描述符3中
exec 3<>"${fifoName}"
##此时可以删除管道文件,保留fd即可
rm -rf ${fifoName}
##定义进程数量,向管道文件中写入进程数量个空行
for ((i=1;i<=${processNum};i++)) do
echo >&3
done
##组装dataX 动态参数
##根据配置文件ty_createJson.conf中的表匹配json目录下的json文件,如果一个表匹配到多个json文件(分区表的情况),则依次提交
for file in `ls ${qlJsonPath}/*.json` ;do
##读入一个空行
read -u3
{
jsonName="${file##*/}"
fileName="${jsonName%%.*}"
echo ${jsonName}
echo "${file}" >> ${outpath}/commitJson.log
python ${dataxHome}/bin/datax.py ${file} 2>&1 >> ${outpath}/${fileName}.log
result=$?
if [ ${result} -eq 0 ] ;then
echo "${fileName}" >> ${outpath}/succJson.log
mv -f ${outpath}/${fileName}.log ${outpath}/succLog
else
echo "${fileName}" >> ${outpath}/failJson.log
mv -f ${outpath}/${fileName}.log ${outpath}/failLog
##记录失败的表配置
#echo "${line}" >> ${confBase}/ty_createJson_bsj.conf
fi
#sleep 1
##最后向管道文件中写入一个空行
echo >&3
}&
done
done
wait
echo "开始时间:${startTime}" >> ${outpath}/commitJson.log
echo "开始时间:${startTime}"
endTime=`date '+%Y%m%d%H%M%S'`
echo "结束时间:${endTime}" >> ${outpath}/commitJson.log
echo "结束时间:${endTime}"
#关闭文件描述符的读
exec 3<&-
#关闭文件描述符的写
exec 3>&-
exit 0
编写CRONTAB 脚本(每天6点半执行 每次执行10个JSON文件)
crontab-e
30 6 * * * /home/admin/ty_commit_datax_sjyc.sh 10
------------------------------------------------------------------
要实现增量更新, 首先要 PostgresqlReader 从目标数据库读取最大日期, 并用 TextFileWriter 写入到一个 csv 文件, 这一步我的配置如下所示:
{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://[target_server]:5432/[target_db]"
],
"querySql": [
"SELECT max(data_time) FROM public.minute_data"
]
}
],
"password": "...",
"username": "..."
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"fileName": "minute_data_max_time_result",
"fileFormat": "csv",
"path": "/scripts/",
"writeMode": "truncate"
}
}
}
],
"setting": { }
}
}
最后编写增量同步的 shell 文件, 内容如下:
#!/bin/bash
### every exit != 0 fails the script
set -e
# 获取目标数据库最大数据时间,并写入一个 csv 文件
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_max_time.json
if [ $? -ne 0 ]; then
echo "minute_data_sync.sh error, can not get max_time from target db!"
exit 1
fi
# 找到 DataX 写入的文本文件,并将内容读取到一个变量中
RESULT_FILE=`ls minute_data_max_time_result_*`
MAX_TIME=`cat $RESULT_FILE`
# 如果最大时间不为 null 的话, 修改全部同步的配置,进行增量更新;
if [ "$MAX_TIME" != "null" ]; then
# 设置增量更新过滤条件
WHERE="DataTime > '$MAX_TIME'"
sed "s/1=1/$WHERE/g" minute_data.json > minute_data_tmp.json
# 将第 45 行的 truncate 语句删除;
sed '45d' minute_data_tmp.json > minute_data_inc.json
# 增量更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_inc.json
# 删除临时文件
rm ./minute_data_tmp.json ./minute_data_inc.json
else
# 全部更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data.json
fi
转载于:https://www.cnblogs.com/misterzhaoyan/p/9366055.html
ETL工具 DataX数据同步,LINUX CRONTAB 定时调度相关推荐
- etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别
什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...
- java etl工具_一文带你入门ETL工具-datax的简单使用
什么是ETL? ETL负责将分布的.异构数据源中的数据如关系数据.平面数据文件等抽取到临时中间层后进行清洗.转换.集成,最后加载到数据仓库或数据集市中,成为联机分析处理.数据挖掘的基础. ETL是数据 ...
- 阿里出品的ETL工具dataX初体验
我的毕设选择了大数据方向的题目.大数据的第一步就是要拿到足够的数据源.现实情况中我们需要的数据源分布在不同的业务系统中,而这些系统往往是异构的,而且我们的分析过程不能影响原有业务系统的运行.为了把不同 ...
- datax数据同步问题(mysql2hive)汇总
文章摘要: 1.代码 2.搭建spark 3.使用datax 4.常见问题 5.指正补充 前言: git代码,有需要的可以参考 ![GitHub contributors](https://img.s ...
- Linux Crontab定时执行脚本出错,但手动执行脚本正常原因及解决方案
Linux Crontab定时执行脚本出错,但手动执行脚本正常原因及解决方案 实际开发场景 需要开发一个Flink监控程序,初步使用shell脚本进行监控,如果发现失败了,则自动重新运行Flink命令 ...
- springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
Datax安装及基本使用请查看上一篇文章: 文章目录 Datax概述 1.概述 2.功能清单 3.==说明==:本项目只支持mysql及hbase之间的数据同步 代码模块 配置文件 pom.xml D ...
- DataX数据同步工具使用
1.DataX 简介 DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步. DataX 致力于实现包括关系型数据库(MySQL.Oracle 等).HDF ...
- 数据同步结合Crontab
需求:定时同步mysql服务器上的表到另外个数据库内 解决方法:结合linux的contable和mysqldump命令完成该功能 Mysql&Shell #!/bin/bash PORT=& ...
- 阿里开源ETL工具——dataX简单上手
一.概述 1.是什么? DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase. ...
- mysql binlog查看工具_数据同步工具otter(一)谈谈binlog和canal
之前因为懒,没有针对otter做更多的解释和说明,在使用过程中,也发现了一些问题,此次补上一个完整的文档,方便大家使用. Otter是基于cannal开源的,canal又是基于mysql binlog ...
最新文章
- 页面显示 amp html6,浅谈HTML5 amp;amp; CSS3的新交互特性
- struts.xml中class路径错误报错的问题
- JMetro版本11.5.11和8.5.11发布
- Go语言-基本的http请求操作
- MySQL 索引类别与索引使用指南
- 多图详解Spring框架的设计理念与设计模式
- 稀疏矩阵相乘-Python版
- 在文件上传的基础上设计一个学生基本信息表
- 使用NetworkInterface类获得网络接口信息
- webform如何接收前端的ajax数据,HttpWebResponse Post 前端控件数据,后台如何接收?...
- ps无法启动,ps暂存盘满了
- audio音频使用天坑
- 使用openjtag和openocd操作IXP425的flash
- [每周心学]示弟立志说(附译文)
- ssrs报表服务器数据库配置文件,ReportingServicesService 配置文件
- 献给小白的笔记day6
- android如何编程红外遥控,全志A20[android教程]-红外遥控器调试
- 数字图像处理作业文档整合
- Matlab,solve函数出错,问题的解决
- 迄今为止最好的 Windows,更新四月版 17134 系统
热门文章
- Oracle11gR1中细粒度访问网络服务(转)
- WordPress的wp-cumulus插件------------标签云插件
- 【从C到C++学习笔记】bool类型/const限定符/#define//结构体对齐
- Cuda-convnet配置指南 on Windows8.1+CUDA6.5+VS2013
- 黑暗森林:知识图谱的前世今生
- 卡方分布、T分布和F分布
- Python类中的__init__,__del__和__call__方法
- python解析GF1卫星数据.xml文件
- Flutter之Stepper源码浅析
- linux将passwd文件拷贝到,Linux命令