一、DataX部署

  1. 上传DataX压缩文件至/opt/software/目录下,并解压文件至/opt/module/下。

  2. 自测检查DataX,出现如下截图内容,说明安装成功

    二、DataX使用

  3. DataX使用概述
    DataX使用还是十分简单的,用户只需要根据自己同步数据的数据源和目的地来选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行下述命令进行提交数据同步任务即可。

[yili@hadoop102 datax]$ python bin/datax.py job/job.json
  1. DataX配置文件格式
    可以通过下述命令进行查看DataX配置文件模板
[yili@hadoop102 datax]$ python bin/datax.py -d mysqlreader -w hdfswriter

https://gitee.com/mirrors/DataX/blob/master/README.md

  1. 同步MySQL数据到HDFS案例
    (1)案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
    (2)需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
    下面分别使用两种模式进行演示。
    1)MySQLReader之TableMode
    Json配置文件内容:
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"where": "id>=3","connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"table": ["base_province"]}],"password": "123456","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/user/yili/gmall/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

2)MySQLReader之QuerySQLMode
Json配置文件内容:

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
  1. DataX传参
    【说明】通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
    DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值。
    1)具体示例如下:
[yili@hadoop102 datax]$ hadoop fs -mkdir /user/yili/gmall/base_province/2020-07-08
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/user/yili/gmall/base_province/${dt}","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

2)执行脚本

[yili@hadoop102 datax]$ python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json

3)查看结果

[yili@hadoop102 datax]$ hadoop fs -ls  /user/yili/gmall/base_province/
Found 2 items
drwxr-xr-x   - atguigu supergroup          0 2022-07-08 21:41 /user/yili/gmall/base_province/2020-07-08
  1. 同步HDFS数据到MySQL案例实操
    (1)案例要求:同步HDFS上的/user/yili/gmall/base_province目录下的数据到MySQL gmall 数据库下的test_province表。
    (2)需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。
    配置文件如下所示:
{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"defaultFS": "hdfs://hadoop102:8020","path": "/base_province","column": ["*"],"fileType": "text","compress": "gzip","encoding": "UTF-8","nullFormat": "\\N","fieldDelimiter": "\t",}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"table": ["test_province"],"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"}],"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"writeMode": "replace"}}}],"setting": {"speed": {"channel": 1}}}
}

(3)查看Mysql目标表输出

  1. DataX配置文件生成脚本(改进:对json串输出增加格式化操作 -> 增加程序文件内容可读性)
    (1)[yili@hadoop102 datax]$ vi /home/yili/bin/gen_import_config.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "Wkz57Heu(h"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"#获取mysql连接
def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):connection = get_connection()cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchall#获取mysql表的列名
def get_mysql_columns(database, table):return map(lambda x: x[0], get_mysql_meta(database, table))#将获取的元数据中mysql的数据类型转换为hive的数据类型  写入到hdfswriter中
def get_hive_columns(database, table):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table)return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)#生成json文件
def generate_json(source_database, source_table):job = {"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk": "","connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode": "append","fieldDelimiter": "\t","compress": "gzip","path": "${targetdir}"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f, sort_keys=True, indent=4, separators=(',', ':'))def main(args):source_database = ""source_table = ""options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valuegenerate_json(source_database, source_table)if __name__ == '__main__':main(sys.argv[1:])

(2)执行脚本

[yili@hadoop102 bin]$ python gen_import_config.py -d gmall -t activity_info

(3)生成json结果文件

[yili@hadoop102 datax]$ python bin/datax.py job/import/gmall.activity_info.json -p "-Dtargetdir=/user/yili/gmall/db/activity_info_full/2022-07-06"

(4)执行日志显示成功

(5)HDFS上生成文件

数据同步工具之DataX实操相关推荐

  1. DataX离线数据同步工具/平台

    DataX离线数据同步工具/平台 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.SQL Server.Oracle.PostgreSQL.HDFS.Hive.O ...

  2. 异构数据库数据同步工具DataX教程,安装、数据同步、java执行

    前言 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase.TableStor ...

  3. 高效数据同步工具DataX的使用

    一.DataX 简介 DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步. DataX 致力于实现包括关系型数据库(MySQL.Oracle 等).HDF ...

  4. 数据同步工具—DataX 初识

    DataX 初识 DataX 是阿里云 DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台.DataX 实现了包括 MySQL.Oracle.OceanBase. ...

  5. 开源oracle同步图形工具,阿里开源数据同步工具--DataX

    阿里开源数据同步工具--DataX 是啥?: 是异构数据源离线同步工具 能干啥?: 能够将MySQL sqlServer Oracle Hive HBase  FTP 之间进行稳定高效的数据同步. 设 ...

  6. 这款 MySQL、Oracle、HDFS 数据同步工具,有点牛逼!

    点击上方"Java基基",选择"设为星标" 做积极的人,而不是积极废人! 每天 14:00 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java ...

  7. 【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

    欢迎关注博客主页:微信搜:import_bigdata,大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发 ...

  8. 离线数据同步神器:DataX,支持几乎所有异构数据源的离线同步到MaxCompute

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 概述 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlSer ...

  9. ETL的数据同步工具调研(持续更新中)

    扯白了,数据同步工具就是"导数据 "的 名称 社区响应 国内使用情况(以前程无忧为参考) SQOOP 更新缓慢,对于hbase2.x以上版本使用时需要老版本的jar包 9页 Dat ...

最新文章

  1. java鼠标样式设置,设置Echarts鼠标悬浮样式
  2. 一文掌握 C 智能指针的使用
  3. 软件测试——语句覆盖
  4. activiti 生命周期_一文让你读懂什么是Activiti工作流
  5. python单元测试断言_Python单元测试框架之pytest -- 断言-阿里云开发者社区
  6. [******] 链表问题:将单向链表按某值划分成左边小、中间相等、右边大的形式...
  7. html分页器的实现原理,Django之分页器
  8. html 多页面合并,让多个HTML页面 使用 同一段HTML代码
  9. Bailian4120 硬币【0-1背包】
  10. 使用 ADO.NET 访问 Oracle 9i 存储过程
  11. 经典数据结构视频教程下载 (清华大学 严蔚敏)
  12. charles抓包踩过的坑之一 ——手机开了代理,不能访问网络
  13. [视频教程]macOS运行MAME
  14. oracle几何体数据类型,4.2 SDO_GEOMETRY类型、属性和值
  15. Bandwagon centos 6 32位配置SVN,apache,mysql和php
  16. 毛利率逼近茅台,巨子生物赴港IPO市场会买单吗?
  17. 未能打开这台计算机的策略组对象,Windows7 组策略错误:“未能打开这台计算机上的组策略对象。您可能没有合适的权限。”...
  18. postfix中间件--Amavisd-new--反垃圾和反病毒配置
  19. 绝对收获满满的干货回放 | 百度超级链学院首场线下沙龙到底讲了什么?
  20. refresh方法_Android Shake to Refresh教程

热门文章

  1. HY68 RGB DM蓝牙5.2双模热插拔PCB
  2. 计算机没有音频驱动程序,电脑有驱动却没有高清晰音频管理器华硕主板应该装什么声卡驱动...
  3. 资金流向┃权证行情┃股票黑马┃股票IIQ┃股票分析┃股票市场分析/股票
  4. 企业做CMMI3级认证,不满足条件可以做吗?
  5. 2022最新最全的pytest配置文件pytest.ini
  6. Java多线程模拟运动比赛
  7. 2021年危险化学品经营单位主要负责人最新解析及危险化学品经营单位主要负责人新版试题
  8. 16 Python总结之舆情时间序列可视化
  9. 2020年全国计算机二级考试大纲改变,2020年3月计算机二级考试大纲内容
  10. Mysql数据库备份和还原