基于dataX的数据同步平台搭建
前言
基于Java和DataX工具实现数据同步的后台管理,包括数据同步任务的生成,任务的管理,查看任务的执行日志,解析任务的执行结果等功能。
内含一些技术实现方案、心得体会和填坑经验等干货。
阅读本文之前,需要提前了解一下DataX的含义、使用场景和基本使用方法。
1.项目描述
公司要做一个数据中心的项目,包括数据标准平台,数据集成平台,数据监控平台,数据共享平台。主要从两大技术方向去实现:
- 消息队列 – 选型为
Kafka
。负责微服务之间实时的消息共享 - 数据同步 – 选型为
DataX
。数据同步负责大数据量的全量/增量同步,同时也是对消息队列的一种补救措施,即使消息共享失败,在下一次数据同步时,也能够确保全部数据的一致性。
2.思路
DataX的介绍此文不做说明,如需了解,请查阅相关资料。本文仅记录相关使用心得。Datax开源地址
Datax实现数据同步
任务的方式是执行一个python命令
操作一个配置了同步信息的json文件:
python [datax.py的路径] [同步任务的json文件路径]
python datax.py D:\Software\install\Environment\DataX\datax\job\mysql2mysql.json
Datax的数据同步,落到实处就是执行上述的python命令。但是对于一个数据中心项目来说,我们需要的不仅仅只是完成这个数据同步的操作。我们需要实现的是全流程的可配置,可视化,可监控,可定时
。
全流程描述:
- 前端传入生成json文件所需要的数据、
cron表达式
。 - 后端解析前端入参,在linux服务器上生成一个
json文件
,一个包含上述python命令
和输出日志路径
的shell脚本
,一个linux定时任务
。通过定时任务执行shell脚本,即可完成定时执行数据同步任务的操作。(所有文件的命名
都与本次的任务ID
相关)。 - 后端在数据库中记录本次的
数据同步任务信息
,包括任务ID,所有文件的存放路径,cron表达式等信息 - 前端需要查看数据同步任务的
执行结果和执行日志
。每次查看的时候,后端根据数据库中记录的当前任务的日志存放路径,读取该路径下的所有文件,解析并记录其中的内容,输出执行成功与否和全部的日志信息,并将解析结果存入数据库中,方便下次查阅。 - 每个数据同步任务,都可进行
暂停、开始和删除
操作。通过删除和新增linux的定时任务实现任务的暂停和开始。同步任务只有在暂停状态下可进行删除操作,包括删除数据库记录,删除linux存放的日志文件等。
3.DataX数据同步任务的生成
3.1 环境准备
在 linux服务器上的准备:
- jdk1.6及以上
- python2.6及以上
- datax3.0压缩包下载,解压后需要进行授权:
chmod –R 755 {你的目录}
3.2 json生成
创建实体类接收前端参数并生成datax需要的标准json数据。
分享一个在线JSON转实体类的网址
{"job":{"content":[{"reader":{"parameter":{"password":"YourPassword","column":["id","school_code","link_id" ],"connection":[{"jdbcUrl":["YourUrl"],"table":["c_pass_dormitory_1"]}],"where":"","splitPk":"","username":"YourUserName"},"name":"mysqlreader"},"writer":{"parameter":{"postSql":[],"password":"YourPassWord","column":["id","school_code","link_id"],"connection":[{"jdbcUrl":"YourUrl","table":["c_pass_dormitory"]}],"writeMode":"insert","batchSize":"1024","preSql":[],"username":"YourUserName"},"name":"mysqlwriter"}}],"setting":{"errorLimit":{"percentage":0.15,"record":0},"speed":{"record":100,"channel":1,"byte":0}}}
注意:在根据json生成实体类的时候,speed设置中的byte参数转为实体类会有点小问题
转为实体类就是:
private int byte;
这是不合法的,这里可以重命名一下:
@Data
public class Speed {private int channel;private int record=100;private int sByte;
}
但是要在生成json文件的时候,将sByte替换为byte:
- 将json文件存放在linux指定的目录下
java代码:
//生成json文件
private String json(String jsonString) throws Exception {// 拼接文件完整路径String fullPath = dataxJsonAddress + Tools.getId() + ".json";// 生成json格式文件// 保证创建一个新文件File file = createFile(fullPath);// 格式化json字符串jsonString = JsonFormatTool.formatJson(jsonString);// 将格式化后的字符串写入文件java.io.Writer write = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");write.write(jsonString);write.flush();write.close();return fullPath;
}
linux存放:
- 记录json文件的路径和json数据存表:
3.3 shell脚本生成
Datax是通过python命令执行Datax任务,因此所生成的shell脚本中 要包括 datax.py、json文件和log的路径信息。
生成shell脚本的代码:
//创建.sh文件private String createShell(String jsonString, String logAddress) throws Exception {//拼接shell脚本描述String sellString = "#!/bin/bash\n" +"source /etc/profile\n" +"# 截至时间设置为当前时间戳\n" +"end_time=$(date +%s)\n" +"# 开始时间设置为60s前时间戳\n" +"create_time=$(($end_time - 60))\n" +"[dataxPy] [dataxJson] >>[dataxLog].`date +%Y%m%d%H%M` 2>&1 &";sellString = sellString.replace("[dataxPy]", dataxPyAddress).replace("[dataxJson]", jsonString).replace("[dataxLog]", logAddress);// 拼接文件完整路径String fullPath = dataxShellAddress + Tools.getId() + ".sh";File file = createFile(fullPath);// 将格式化后的字符串写入文件java.io.Writer write = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");write.write(sellString);write.flush();write.close();return fullPath;}
服务器上生成的shell脚本:
有了shell脚本后,就可以根据前台传递的cron表达式
,创建linux的定时任务
,定时执行shell脚本,实现数据同步。因此必须要给新生成的shell脚本添加 可执行权限
,Java代码如下:
//给shell文件添加 X 权限,shellAddress:shell脚本的全路径
new ProcessBuilder("/bin/chmod", "755", shellAddress).start();
最终的shell脚本内容:
#!/bin/bash
source /etc/profile
# 截至时间设置为当前时间戳
end_time=$(date +%s)
# 开始时间设置为60s前时间戳
create_time=$(($end_time - 60))
/root/datax/bin/datax.py /root/datax/datax_json/fce1411e47ef45649c5ff8f8930db5a0.json >>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M` 2>&1 &
内容说明:
# datax的python可执行文件路径
/root/datax/bin/datax.py
# json文件的路径
/root/datax/datax_json/fce1411e47ef45649c5ff8f8930db5a0.json
# 本次任务的输出日志路径
>>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M` 2>&1 &
>>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M` 2>&1 &日志文件的命名采用 任务ID.时间 方式,这样就可以根据任务ID获取该任务下的所有执行日志,用于数据监控。
3.4 定时任务生成
Java代码:
//配置 linux上的定时任务private void createCron(String cron,String shellAddress) throws Exception{//执行操作指令 (echo "*/1 * * * * /root/datax/datax_sh/text.sh >>/root/crontab 2>&1 &";crontab -l )| crontabString cmd = "(echo \"" + cron + " " + shellAddress + " >>/root/crontab 2>&1 &\";crontab -l )| crontab";System.out.println("cmd命令" + cmd);Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});}
部分定时任务信息:
3.5 定时任务执行
定时任务的执行脚本里,已经配置了执行日志的输出路径
,每次执行后都会在指定目录下生成指定命名规则
的日志文件。
部分日志文件如图:
上文也描述了,日志命名的规范,这样的命名方式是为后期数据监控
做准备。可以解析出每次数据同步任务的执行状态
和执行日志。
3.6 任务信息存入任务表
任务表的字段信息:
CREATE TABLE `c_info_datax_task` (`id` varchar(36) NOT NULL DEFAULT '' COMMENT '主键',`school_code` varchar(20) DEFAULT NULL COMMENT '学校编码',`task_name` varchar(50) DEFAULT '' COMMENT '任务名',`task_log_address` varchar(300) DEFAULT '' COMMENT '日志地址',`task_json_address` varchar(300) DEFAULT '' COMMENT 'json地址',`task_json_data` varchar(2000) DEFAULT NULL COMMENT 'json字符串信息',`task_shell_address` varchar(300) DEFAULT '' COMMENT 'shell地址',`task_cron` varchar(300) DEFAULT '' COMMENT 'cron表达式',`status` varchar(10) DEFAULT '1' COMMENT '状态',`deleted` varchar(1) DEFAULT '0' COMMENT '1:已删除,0:正常',`remarks` varchar(255) DEFAULT '' COMMENT '备注',`create_user` varchar(36) DEFAULT NULL COMMENT '创建者',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_user` varchar(36) DEFAULT NULL COMMENT '更新人',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='数据同步-任务表';
部分数据信息:
数据同步任务信息查看:
总结
至此,一个基于DataX的数据同步任务就配置好了,大体步骤:
- 生成json文件
- 配置shell脚本
- 设置linux定时任务
- 所需信息入库,生成一条任务信息。
基于dataX的数据同步平台搭建相关推荐
- DataX离线数据同步工具/平台
DataX离线数据同步工具/平台 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.SQL Server.Oracle.PostgreSQL.HDFS.Hive.O ...
- 基于debezium实时数据同步(Oracle为主)
基于debezium实时数据同步 全部需要下载的内容链接 1.下载zookeeper-3.4.10 2.下载kafka_2.13-2.8.0 3.下载Kafka Connector:建议使用1.6以上 ...
- 去哪儿网数据同步平台技术演进与实践
作者介绍 井显生,2019年加入去哪儿,现负责国内机票出票.退款.改签核心业务.在领域驱动设计(DDD).高并发有大量实践经验. 一.前言 去哪儿网国内机票售后是为用户提供退票.改签.航班变动.行程服 ...
- SAP工具箱 数据同步平台(九 与PO整合)
点击蓝字 关注我们 一 前言 数据同步平台是在ABAP中开发的一个数据同步工具,类似于LTRC,通过配置实现任意两个数据库的数据同步(ABAP需要配置相关的外部数据库连接). 数据同步平台的底层通过调 ...
- 马云:未必每个企业都要转型,但每个企业都要升级 | 杭州启用全国首个基于人工智能的数据资源平台
每一个企业级的人 都置顶了 中国软件网 中国软件网 为你带来最新鲜的行业干货 小编点评 衣服今天是新的 明天就旧了 企业改革面临风险 不改就要被淘汰 不管做什么都很有挑战的样子 趋势洞察 马云:未必每 ...
- DataLink 数据同步平台
文章目录 一.数据同步平台 概述 核心能力 工作原理 详细流程 二.快速接入 部署中间件 程序配置 创建数据库表 启动应用 注意事项 三.扩展:四种 CDC 方案比较优劣 一.数据同步平台 在项目开发 ...
- 异地多活数据同步平台
摘自<大型企业微服务架构实践与运营> 薛浩 编著 异地多活数据同步平台 1. 异地多活架构 接入层应用可通过同步调用或异步消息实现相互的调用.通过相关的服务注册和发现机制保障寻址.路由.熔 ...
- PPT报告直接领,这份51页「大数据决策分析平台搭建方案」真的很值
回复「大数据方案」,直接领取51页<大数据决策分析平台搭建方案>PPT. 对于国内企业而言,企业数据的常见问题至少包含以下几点: 数据分散,多套系统并行,数据孤岛问题严重 数据体量大,存储 ...
- 使用 DataX 实现数据同步(高效的同步工具)
DataX 使用介绍 前言 一.DataX 简介 1.DataX3.0 框架设计 2.DataX3.0 核心架构 二.使用 DataX 实现数据同步 1.Linux 上安装 DataX 软件 2.Da ...
最新文章
- Java数据类型及变量作业_day02、Java变量与数据类型
- c语言文件可用代码存放,C语言 文件(示例代码)
- QT学习:网络应用开发练习(文件下载)
- Windows7下注册OCX的注意事项
- Android 学习Kotlin吗?
- NLP、炼丹技巧和基础理论文章索引
- 快手上的cosplay大师有多野?
- 阶段3 2.Spring_02.程序间耦合_6 工厂模式解耦
- java定时每小时_java 定时任务,每日运行和每小时运行。
- UI设计师必备技能——点击进入的网页设计全攻略
- 华为2022硬件工程师招聘全程经验
- 实例详解ISA防火墙策略元素:ISA2006系列之五
- 数显之家快讯:【SHIO世硕心语】董明珠北大演讲:十大掌声雷动的精彩看点!
- 评论:Dremel 3D打印机和HP Sprout的初步印象
- 最快最有效的锻炼出腹肌来
- 解决H5 audio自动播放无效问题(应用于一切环境的一切浏览器)
- 能上QQ却打不开网页的原因及解决办法
- 视觉-惯性SLAM入门与实践教程(基于VINS-Fusion)
- JAVA PHP 按位异或运算_对php位运算^(按位异或)的理解
- JAVA珠宝首饰进销存管理系统计算机毕业设计Mybatis+系统+数据库+调试部署