DataX离线数据同步
目录
- 1 DataX
- 2 ODPS同步数据到HDFS
- HA 配置
- Kerberos 配置
- 域外访问配置
- 3 HDFS同步数据到另一个HDFS
- 4 MongoDB同步数据到HDFS
- 5 带 Kerberos 的域外数据传输
1. DataX
- 执行一次数据同步作业称为一个Job,DataX接受到这个Job之后,开始启动一个进程来完成这个Job的同步过程;
- 这个进程会根据不同的源端切分策略,将Job进行切分为多个小的Task,Task就是DataX作业的最小单元,每个Task都负责一部分数据的同步工作,这些Task是并发执行的;
- 切分完数据之后,DataX Job会调用Scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装为TaskGroup,每一个TaskGroup负责以一定的并发执行Task,默认单个TaskGroup的并发数为5;
- Task启动后会固定启动Reader -> Channel -> Writer的线程来完成任务同步工作;
- Task运行时,Job开始监控并等待各个TaskGroup模块任务完成,等所有TaskGroup成功完成后Job进程才退出,否则异常退出。
例子:比如现在有1个100张分表,需要将数据从MySQL同步到odps,在json中设置了20个并发,DataX的调度决策如下:
- DataX Job将表拆分为100个Task;
- 由20个并发,可以计算出需要4个TaskGroup(一个TaskGroup运行5个Task);
- 将100个任务平均分配个4个TaskGroup,也就是说每个TaskGroup会负责5个并发运行25个Task。
2. ODPS同步数据到HDFS
ODPS现在已经更名为MaxCompute,它是一种大数据计算服务,一个快速、完全托管的TB/PB级数据仓库解决方案,具体可以查看官方文档MaxCompute。
{"job": {"setting": {"speed": {"channel": "3"}},"content": [{"reader": {"name": "odpsreader","parameter": {"accessId": "3oL**********BDZ","accessKey": "Myk********************WY9UTly","project": "targetProjectName","table": "tableName","column": ["*"],"partition": ["pt=20141010000000,year=2014"],"odpsServer": "http://xxx/api","tunnelServer": "http://xxx","splitMode": "record"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://cdh1:8020","fileType": "TEXT","path": "/user/hive/warehouse/kudu_test.db/dataxtype_test","fileName": "xxxx","writeMode": "append","fieldDelimiter": "?","column": [{"name": "f1","type": "BIGINT"},{"name": "f2","type": "STRING"},{"name": "f3","type": "BOOLEAN"},{"name": "f4","type": "DOUBLE"},{"name": "f5","type": "FLOAT"},{"name": " f6","type": "TIMESTAMP"}]}}}]}
}
在
writer
中的column
一定要对读取的每个字段定义字段名和字段类型。关于分隔符,可以使用一些非常特殊的单字符,比如输入法中提供的特殊字符(最好测试一下,有些虽然是单字符但也不支持),如果想使用多字符作为分隔符,可以查看我的另一篇blog Hive中的自定义分隔符。在writer中会定义path
和fileName
,这里可能有些人会有个疑问,就是为什么定义fileName
,这个文件名并不是严格的文件名,只是文件名的一个前缀标识,多个并行时会生成fileName__随机字符串
的文件。如果 HDFS 开启了 HA ,当 NameNode 发生切换是会造成 读取 HDFS 或者写入 HDFS 数据失败,此时可以配置上 Hadoop HA 相关配置,DataX 本身也是支持这样配置的,在 json 的 reader 或者 writer 下的
parameter
中添加如下配置:"hadoopConfig":{"dfs.nameservices": "cluster2","dfs.ha.namenodes.cluster2": "nn1,nn2","dfs.namenode.rpc-address.cluster2.nn1": "x.xx.xxx.101:8020","dfs.namenode.rpc-address.cluster2.nn2": "x.xx.xxx.102:8020","dfs.client.failover.proxy.provider.cluster2": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
如果开启 Kerberos 认证,则需要在 json 的 reader 或者 writer 下的
parameter
中添加如下配置,更新见下文 5. 带 Kerberos 的域外数据传输"haveKerberos": true, "kerberosKeytabFilePath": "/xxx/xxxx/xxx.keytab", "kerberosPrincipal": "xxx/xxxx@XXX.COM",
如果是在外域访问,直接读取或写入 DataNode 时原集群解析的内网 IP 可能无法直接访问,则需要在 json 的 reader 或者 writer 下的
parameter
中添加如下配置:"hadoopConfig":{"dfs.http.policy": "HTTPS_ONLY","dfs.data.transfer.protection": "integrity","dfs.client.use.datanode.hostname": true,"dfs.datanode.address": "x.x.x.x:1004" }
3. HDFS同步数据到另一个HDFS
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "hdfsreader","parameter": {"path": "/user/hive/warehouse/hive_test.db/dataxtype_test/*","defaultFS": "hdfs://one-hdfs:8020","fileType": "text","encoding": "UTF-8","fieldDelimiter": "|","column": [{"type": "Long","index": "0"},{"type": "String","index": "1"},{"type": "Boolean","index": "2"},{"type": "Double","index": "3"},{"type": "Double","index": "4"},{"type": "Date","index": "5"}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://cdh1:8020","fileType": "TEXT","path": "/user/hive/warehouse/kudu_test.db/dataxtype_test","fileName": "xxxx","writeMode": "append","fieldDelimiter": "?","column": [{"name": "f1","type": "BIGINT"},{"name": "f2","type": "STRING"},{"name": "f3","type": "BOOLEAN"},{"name": "f4","type": "DOUBLE"},{"name": "f5","type": "FLOAT"},{"name": " f6","type": "TIMESTAMP"}]}}}]}
}
4. MongoDB同步数据到HDFS
{"job": {"setting": {"speed": {"channel": "3"}},"content": [{"reader": {"name": "mongodbreader","parameter": {"address": ["xx.xxx.x.xx:27017"],"userName": "","userPassword": "","dbName": "tag_per_data","collectionName": "tag_data12","column": [{"name": "_id","type": "string"},{"name": "r_cert_no","type": "string"},{"name": "r_name","type": "string"},{"name": "b_cert_no","type": "string"},{"name": "b_name","type": "string"},{"name": "r_flag","type": "string"},{"name": "body_md5","type": "Array","spliter": ""}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "file:///","fieldDelimiter": "╬","fileName": "m_cpws_info_w3","fileType": "orc","path": "/root/mongo_data/m_cpws_info_w3","writeMode": "append","column": [{"name": "_id","type": "string"},{"name": "r_cert_no","type": "string"},{"name": "r_name","type": "string"},{"name": "b_cert_no","type": "string"},{"name": "b_name","type": "string"},{"name": "r_flag","type": "string"},{"name": "body_md5","type": "string"}]}}}]}
}
# 登陆远程MongoDB
$MONGO_HOME/bin/mongo --host xx.xxx.x.xx --port 27017 -u "用户名" -p "密码" --authenticationDatabase "admin"
# 切换到给定库下
use 库名
# 查看表字段信息,不显示 details 字段值
db.getCollection("表名").find({},{details: 0}).limit(1)
然后用MongoDB自带的数据导出工具mongoexport导出数据为csv文件
$MONGO_HOME/bin/mongoexport --host xx.xxx.x.xx --port 27017 \
-d 库名 -c 表名 -u "用户名" -p "密码" --authenticationDatabase "admin" \
-o ./data/表名.csv --type csv \
-f "_id,field_01,field_02,field_03,……"
也许这个数据文件非常大,可以对这个文件进行切分,使用Linux的sed
命令。
#每100万条数据切分为一个文件
sed -n '1,1000000'p axd_tao_ord.csv >> axd_tao_ord_0000001-1000000.csv
sed -n '1000001,2000000'p axd_tao_ord.csv >> axd_tao_ord_1000001-2000000.csv
sed -n '2000001,3000000'p axd_tao_ord.csv >> axd_tao_ord_2000001-3000000.csv
……# 这里也可以使用split命令
split -l 1000000 axd_tao_ord.csv axd_tao_ord_
5. 带 Kerberos 的域外数据传输
<property><name>dfs.client.use.datanode.hostname</name><value>true</value><description>only cofig in clients</description>
</property>
对于DataX 只需要在 hadoopConfig
中添加配置参数 "dfs.client.use.datanode.hostname": true
,完整的配置如下:
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "odpsreader","parameter": {"accessId": "3o************DZ","accessKey": "Myk************************Tly","odpsServer": "http://xx.x.xxx.xxx/api","tunnelServer": "http://xx.x.xxx.xxx","project": "targetProjectName","table": "tableName","column": ["customer_id","nickname"]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://xx.x.xxx.xxx:8020","path": "/user/hive/warehouse/xxx.db/xxx_tb","fileType": "orc","compress": "NONE","fieldDelimiter": "|","fileName": "xxx","column": [{"name": "col1","type": "BIGINT"},{"name": "col2","type": "STRING"},{"name": "col3","type": "TIMESTAMP"},{"name": "col4","type": "date"}],"haveKerberos": true,"kerberosKeytabFilePath": "/xxx/xxxx/xxx.keytab","kerberosPrincipal": "xxx/xxxx@XXX.COM","hadoopConfig": {"dfs.http.policy": "HTTPS_ONLY","dfs.data.transfer.protection": "integrity","dfs.client.use.datanode.hostname": true,"dfs.datanode.address": "x.x.x.x:1004"},"writeMode": "append"}}}]}
}
DataX离线数据同步相关推荐
- DataX离线数据同步工具/平台
DataX离线数据同步工具/平台 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.SQL Server.Oracle.PostgreSQL.HDFS.Hive.O ...
- 离线数据同步神器:DataX,支持几乎所有异构数据源的离线同步到MaxCompute
2019独角兽企业重金招聘Python工程师标准>>> 摘要: 概述 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlSer ...
- 离线数据同步平台datax+报表可视化平台metabase
datax DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase.TableS ...
- datax 持续数据同步_DataX数据同步
因为选择了ElasticSearch做全文检索,需要把数据从数据库同步到es,此处选择开源方案DataX作为数据同步工具. 官方文档很详细,可以直接参考. DataXDataX 是一个异构数据源离线同 ...
- 使用 DataX 实现数据同步(高效的同步工具)
DataX 使用介绍 前言 一.DataX 简介 1.DataX3.0 框架设计 2.DataX3.0 核心架构 二.使用 DataX 实现数据同步 1.Linux 上安装 DataX 软件 2.Da ...
- 基于dataX的数据同步平台搭建
前言 基于Java和DataX工具实现数据同步的后台管理,包括数据同步任务的生成,任务的管理,查看任务的执行日志,解析任务的执行结果等功能. 内含一些技术实现方案.心得体会和填坑经验等干货. 阅读本文 ...
- datax 持续数据同步_采用DataX实现多表增量数据同步
这两天验证了一下阿里的DataX数据同步工具,觉得DataX可以用来做管理数据的多级数据同步.DataX用来做批量数据迁移很适合,能够保证数据的一致性,性能也很好,结合时间戳字段,用来实现数据定时增量 ...
- 【Android】【功能设计】离线数据同步方案
这里考虑的是本地数据不会被其它用户修改的情况,类似于微信,离线任务都是自己要发出的消息,不会和网络数据产生冲突 对于多个离线用户,同时修改同一份数据的情况,不适合使用此方案,大多时候也不允许离线使用 ...
- 基于文件的离线数据同步方案
产品此前的数据备份方案,存在不少问题,所以需要设计一个新的方案.本文总结一下新旧方案的优劣 首先APP是一个支持离线的应用.本地数据保存在sqlite,在离线环境下,在本地数据库里读写记录,在有网络的 ...
最新文章
- 使用python画CDF
- Linux下的shell编程入门
- 四种高性能数据类型,Python collections助你优化代码、简洁任务
- MVC4项目中验证用户登录一个特性就搞定
- MyBatis与Hibernate比较
- Spring Boot和Hibernate:打印查询和变量
- 基于TCP的在线聊天程序
- PHP连接sql seaver数据库
- 【初学线段树,看这篇文章准没错】线段树(单点修改and区间修改)acm寒假集训日记22/1/10
- 测试面试题,自动化测试与性能测试篇(附答案)
- LoudMiner:伪装在VST软件中的跨平台挖矿软件
- VS实现格式化代码及代码缩进
- python医学科研中能做什么-科研画图都用什么软件?
- QGC调试px4固件飞控
- 外卖小程序邀请入口获取推广路径
- viterbi 中文分词-超简单版
- 简单五步看懂伦敦的银标
- STM32H743开发板移植micropython并外扩32M的SQPI flash和32M的SDRAM
- TFN D550S 100G SDH综合测试分析仪性能如何
- 任务诱发的瞳孔反应(The Task-evoked Pupillary Response)