前言

基于Java和DataX工具实现数据同步的后台管理,包括数据同步任务的生成,任务的管理,查看任务的执行日志,解析任务的执行结果等功能。
内含一些技术实现方案、心得体会和填坑经验等干货。
阅读本文之前,需要提前了解一下DataX的含义、使用场景和基本使用方法。

1.项目描述

公司要做一个数据中心的项目,包括数据标准平台,数据集成平台,数据监控平台,数据共享平台。主要从两大技术方向去实现:

  • 消息队列 – 选型为 Kafka。负责微服务之间实时的消息共享
  • 数据同步 – 选型为DataX。数据同步负责大数据量的全量/增量同步,同时也是对消息队列的一种补救措施,即使消息共享失败,在下一次数据同步时,也能够确保全部数据的一致性。

2.思路

DataX的介绍此文不做说明,如需了解,请查阅相关资料。本文仅记录相关使用心得。Datax开源地址

Datax实现数据同步任务的方式是执行一个python命令 操作一个配置了同步信息的json文件:

python [datax.py的路径]  [同步任务的json文件路径]
python datax.py D:\Software\install\Environment\DataX\datax\job\mysql2mysql.json

Datax的数据同步,落到实处就是执行上述的python命令。但是对于一个数据中心项目来说,我们需要的不仅仅只是完成这个数据同步的操作。我们需要实现的是全流程的可配置,可视化,可监控,可定时

全流程描述:

  • 前端传入生成json文件所需要的数据、cron表达式
  • 后端解析前端入参,在linux服务器上生成一个json文件,一个包含上述python命令输出日志路径shell脚本,一个linux定时任务。通过定时任务执行shell脚本,即可完成定时执行数据同步任务的操作。(所有文件的命名都与本次的任务ID相关)。
  • 后端在数据库中记录本次的数据同步任务信息,包括任务ID,所有文件的存放路径,cron表达式等信息
  • 前端需要查看数据同步任务的执行结果和执行日志。每次查看的时候,后端根据数据库中记录的当前任务的日志存放路径,读取该路径下的所有文件,解析并记录其中的内容,输出执行成功与否和全部的日志信息,并将解析结果存入数据库中,方便下次查阅。
  • 每个数据同步任务,都可进行暂停、开始和删除操作。通过删除和新增linux的定时任务实现任务的暂停和开始。同步任务只有在暂停状态下可进行删除操作,包括删除数据库记录,删除linux存放的日志文件等。

3.DataX数据同步任务的生成

3.1 环境准备

在 linux服务器上的准备:

  • jdk1.6及以上
  • python2.6及以上
  • datax3.0压缩包下载,解压后需要进行授权:chmod –R 755 {你的目录}

3.2 json生成

  • 创建实体类接收前端参数并生成datax需要的标准json数据。

    分享一个在线JSON转实体类的网址

{"job":{"content":[{"reader":{"parameter":{"password":"YourPassword","column":["id","school_code","link_id"              ],"connection":[{"jdbcUrl":["YourUrl"],"table":["c_pass_dormitory_1"]}],"where":"","splitPk":"","username":"YourUserName"},"name":"mysqlreader"},"writer":{"parameter":{"postSql":[],"password":"YourPassWord","column":["id","school_code","link_id"],"connection":[{"jdbcUrl":"YourUrl","table":["c_pass_dormitory"]}],"writeMode":"insert","batchSize":"1024","preSql":[],"username":"YourUserName"},"name":"mysqlwriter"}}],"setting":{"errorLimit":{"percentage":0.15,"record":0},"speed":{"record":100,"channel":1,"byte":0}}}
注意:在根据json生成实体类的时候,speed设置中的byte参数转为实体类会有点小问题


转为实体类就是:

private int byte;

这是不合法的,这里可以重命名一下:

@Data
public class Speed {private int channel;private int record=100;private int sByte;
}

但是要在生成json文件的时候,将sByte替换为byte:

  • 将json文件存放在linux指定的目录下

java代码:

//生成json文件
private String json(String jsonString) throws Exception {// 拼接文件完整路径String fullPath = dataxJsonAddress + Tools.getId() + ".json";// 生成json格式文件// 保证创建一个新文件File file = createFile(fullPath);// 格式化json字符串jsonString = JsonFormatTool.formatJson(jsonString);// 将格式化后的字符串写入文件java.io.Writer write = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");write.write(jsonString);write.flush();write.close();return fullPath;
}

linux存放:

  • 记录json文件的路径和json数据存表:

3.3 shell脚本生成

Datax是通过python命令执行Datax任务,因此所生成的shell脚本中 要包括 datax.py、json文件和log的路径信息。

生成shell脚本的代码:

 //创建.sh文件private String createShell(String jsonString, String logAddress) throws Exception {//拼接shell脚本描述String sellString = "#!/bin/bash\n" +"source /etc/profile\n" +"# 截至时间设置为当前时间戳\n" +"end_time=$(date +%s)\n" +"# 开始时间设置为60s前时间戳\n" +"create_time=$(($end_time - 60))\n" +"[dataxPy] [dataxJson]  >>[dataxLog].`date +%Y%m%d%H%M`  2>&1 &";sellString = sellString.replace("[dataxPy]", dataxPyAddress).replace("[dataxJson]", jsonString).replace("[dataxLog]", logAddress);// 拼接文件完整路径String fullPath = dataxShellAddress + Tools.getId() + ".sh";File file = createFile(fullPath);// 将格式化后的字符串写入文件java.io.Writer write = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");write.write(sellString);write.flush();write.close();return fullPath;}

服务器上生成的shell脚本:

有了shell脚本后,就可以根据前台传递的cron表达式,创建linux的定时任务,定时执行shell脚本,实现数据同步。因此必须要给新生成的shell脚本添加 可执行权限,Java代码如下:

//给shell文件添加 X 权限,shellAddress:shell脚本的全路径
new ProcessBuilder("/bin/chmod", "755", shellAddress).start();

最终的shell脚本内容:

#!/bin/bash
source /etc/profile
# 截至时间设置为当前时间戳
end_time=$(date +%s)
# 开始时间设置为60s前时间戳
create_time=$(($end_time - 60))
/root/datax/bin/datax.py /root/datax/datax_json/fce1411e47ef45649c5ff8f8930db5a0.json  >>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M`  2>&1 &

内容说明:

# datax的python可执行文件路径
/root/datax/bin/datax.py
# json文件的路径
/root/datax/datax_json/fce1411e47ef45649c5ff8f8930db5a0.json
# 本次任务的输出日志路径
>>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M`  2>&1 &
>>/root/datax/datax_log/d32f460e1339480cb84c286af8223370.`date +%Y%m%d%H%M`  2>&1 &日志文件的命名采用 任务ID.时间 方式,这样就可以根据任务ID获取该任务下的所有执行日志,用于数据监控。

3.4 定时任务生成

Java代码:

 //配置 linux上的定时任务private void createCron(String cron,String shellAddress) throws Exception{//执行操作指令 (echo "*/1 * * * * /root/datax/datax_sh/text.sh >>/root/crontab 2>&1 &";crontab -l )| crontabString cmd = "(echo \"" + cron + " " + shellAddress + " >>/root/crontab 2>&1 &\";crontab -l )| crontab";System.out.println("cmd命令" + cmd);Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});}

部分定时任务信息:

3.5 定时任务执行

定时任务的执行脚本里,已经配置了执行日志的输出路径,每次执行后都会在指定目录下生成指定命名规则的日志文件。
部分日志文件如图:

上文也描述了,日志命名的规范,这样的命名方式是为后期数据监控做准备。可以解析出每次数据同步任务的执行状态和执行日志。

3.6 任务信息存入任务表

任务表的字段信息:

CREATE TABLE `c_info_datax_task` (`id` varchar(36) NOT NULL DEFAULT '' COMMENT '主键',`school_code` varchar(20) DEFAULT NULL COMMENT '学校编码',`task_name` varchar(50) DEFAULT '' COMMENT '任务名',`task_log_address` varchar(300) DEFAULT '' COMMENT '日志地址',`task_json_address` varchar(300) DEFAULT '' COMMENT 'json地址',`task_json_data` varchar(2000) DEFAULT NULL COMMENT 'json字符串信息',`task_shell_address` varchar(300) DEFAULT '' COMMENT 'shell地址',`task_cron` varchar(300) DEFAULT '' COMMENT 'cron表达式',`status` varchar(10) DEFAULT '1' COMMENT '状态',`deleted` varchar(1) DEFAULT '0' COMMENT '1:已删除,0:正常',`remarks` varchar(255) DEFAULT '' COMMENT '备注',`create_user` varchar(36) DEFAULT NULL COMMENT '创建者',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_user` varchar(36) DEFAULT NULL COMMENT '更新人',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='数据同步-任务表';

部分数据信息:

数据同步任务信息查看:

总结

至此,一个基于DataX的数据同步任务就配置好了,大体步骤:

  • 生成json文件
  • 配置shell脚本
  • 设置linux定时任务
  • 所需信息入库,生成一条任务信息。

基于dataX的数据同步平台搭建相关推荐

  1. DataX离线数据同步工具/平台

    DataX离线数据同步工具/平台 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.SQL Server.Oracle.PostgreSQL.HDFS.Hive.O ...

  2. 基于debezium实时数据同步(Oracle为主)

    基于debezium实时数据同步 全部需要下载的内容链接 1.下载zookeeper-3.4.10 2.下载kafka_2.13-2.8.0 3.下载Kafka Connector:建议使用1.6以上 ...

  3. 去哪儿网数据同步平台技术演进与实践

    作者介绍 井显生,2019年加入去哪儿,现负责国内机票出票.退款.改签核心业务.在领域驱动设计(DDD).高并发有大量实践经验. 一.前言 去哪儿网国内机票售后是为用户提供退票.改签.航班变动.行程服 ...

  4. SAP工具箱 数据同步平台(九 与PO整合)

    点击蓝字 关注我们 一 前言 数据同步平台是在ABAP中开发的一个数据同步工具,类似于LTRC,通过配置实现任意两个数据库的数据同步(ABAP需要配置相关的外部数据库连接). 数据同步平台的底层通过调 ...

  5. 马云:未必每个企业都要转型,但每个企业都要升级 | 杭州启用全国首个基于人工智能的数据资源平台

    每一个企业级的人 都置顶了 中国软件网 中国软件网 为你带来最新鲜的行业干货 小编点评 衣服今天是新的 明天就旧了 企业改革面临风险 不改就要被淘汰 不管做什么都很有挑战的样子 趋势洞察 马云:未必每 ...

  6. DataLink 数据同步平台

    文章目录 一.数据同步平台 概述 核心能力 工作原理 详细流程 二.快速接入 部署中间件 程序配置 创建数据库表 启动应用 注意事项 三.扩展:四种 CDC 方案比较优劣 一.数据同步平台 在项目开发 ...

  7. 异地多活数据同步平台

    摘自<大型企业微服务架构实践与运营> 薛浩 编著 异地多活数据同步平台 1. 异地多活架构 接入层应用可通过同步调用或异步消息实现相互的调用.通过相关的服务注册和发现机制保障寻址.路由.熔 ...

  8. PPT报告直接领,这份51页「大数据决策分析平台搭建方案」真的很值

    回复「大数据方案」,直接领取51页<大数据决策分析平台搭建方案>PPT. 对于国内企业而言,企业数据的常见问题至少包含以下几点: 数据分散,多套系统并行,数据孤岛问题严重 数据体量大,存储 ...

  9. 使用 DataX 实现数据同步(高效的同步工具)

    DataX 使用介绍 前言 一.DataX 简介 1.DataX3.0 框架设计 2.DataX3.0 核心架构 二.使用 DataX 实现数据同步 1.Linux 上安装 DataX 软件 2.Da ...

最新文章

  1. Java数据类型及变量作业_day02、Java变量与数据类型
  2. c语言文件可用代码存放,C语言 文件(示例代码)
  3. QT学习:网络应用开发练习(文件下载)
  4. Windows7下注册OCX的注意事项
  5. Android 学习Kotlin吗?
  6. NLP、炼丹技巧和基础理论文章索引
  7. 快手上的cosplay大师有多野?
  8. 阶段3 2.Spring_02.程序间耦合_6 工厂模式解耦
  9. java定时每小时_java 定时任务,每日运行和每小时运行。
  10. UI设计师必备技能——点击进入的网页设计全攻略
  11. 华为2022硬件工程师招聘全程经验
  12. 实例详解ISA防火墙策略元素:ISA2006系列之五
  13. 数显之家快讯:【SHIO世硕心语】董明珠北大演讲:十大掌声雷动的精彩看点!
  14. 评论:Dremel 3D打印机和HP Sprout的初步印象
  15. 最快最有效的锻炼出腹肌来
  16. 解决H5 audio自动播放无效问题(应用于一切环境的一切浏览器)
  17. 能上QQ却打不开网页的原因及解决办法
  18. 视觉-惯性SLAM入门与实践教程(基于VINS-Fusion)
  19. JAVA PHP 按位异或运算_对php位运算^(按位异或)的理解
  20. JAVA珠宝首饰进销存管理系统计算机毕业设计Mybatis+系统+数据库+调试部署

热门文章

  1. 为什么阿里巴巴天天招人,但又很难进,真的缺人吗?
  2. 80道前端面试经典选择题
  3. JQuery与Ajax(上)
  4. SpringBoot_Web开发基础内容
  5. robocode 机器人编码
  6. go单元测试踩坑记录
  7. Zotero使用指南06:Markdown笔记
  8. vscode中嵌入cppcheck进行静态检查,包含插件使用方法
  9. 2014年教育领域五大事件立业公信
  10. 2019高考江苏卷语文作文