目录

  • 1 DataX
  • 2 ODPS同步数据到HDFS
    • HA 配置
    • Kerberos 配置
    • 域外访问配置
  • 3 HDFS同步数据到另一个HDFS
  • 4 MongoDB同步数据到HDFS
  • 5 带 Kerberos 的域外数据传输

1. DataX

数据同步的工具有很多,比如Hadoop和结构化数据存储之间高效批量数据传输的工具Apache Sqoop,借助于 Hadoop集群可以并行的高效传输数据,但是这种方式往往需要依赖于一个Hadoop环境,在生产环境有时我们并没有直接的权限操作这个环境,而是只提供一个HDFS的端口,这时用Sqoop就不是很方便。Oracle数据和MySQL数据的同步工具会想到alibaba / yugong,基于MySQL binlog增量订阅和消费中间件使用起来非常方便,同时也执行消息队里模式(Apache Kafka、Apache RockMQ),但这个主要针对于特定场景和业务下。

DataX是阿里巴巴集团内被广泛使用的离线数据同步工具or平台,它支持MySQL、Oracle、SQLServer、PostgreSQL、HDFS、Hive、HBase、MaxCompute(ODPS)、MongoDB、Elasticsearch等各种异构数据源之间的高效的数据同步。Reader和Writer支持的列表可以访问文档Support Data Channels

使用起来也是非常方便,只需要下载datax.tar.gz压缩包,解压之后就可以使用,不需要设置任何依赖,放置到任何一个开通了数据源和目标端端口权限的节点,然后定义一个JSON文件,在JSON文件中定义Reader和Writer,就可以在本地以多线程并发高效的进行数据之间的同步功能。

例子:比如现在有1个100张分表,需要将数据从MySQL同步到odps,在json中设置了20个并发,DataX的调度决策如下:

2. ODPS同步数据到HDFS

ODPS现在已经更名为MaxCompute,它是一种大数据计算服务,一个快速、完全托管的TB/PB级数据仓库解决方案,具体可以查看官方文档MaxCompute。

在content中的reader处定义一个odpsreader,writer定义为hdfswriter。这里需要注意的是HdfsWriter插件目前只支持ORC和TEXT,而 HdfsReader插件目前支持ORC、 TEXT、CSV、SEQUENCE、RC

如果我们在写入HDFS时文件类型可以选择ORC、 TEXT、CSV、SEQUENCE、RC 格式,其中 ORC 格式是一种不错的选择,它既能对数据进行一定程度的压缩(一个原本165MB的文件,压缩后约为20MB),在数据量较大时能很有效的节省空间,此外如果使用 ORC 格式可以字段分割符可以支持的更广泛,可以有效解决 TEXT/CSV 类型的表只能使用 ASCII 中的单字符作为表分割符造成的列匹配错乱的常见问题,在Hive建表时指定文件存储格式STORED AS orc即可,但是比较尴尬的是如果使用的是CDH平台上的Impala来操作Hive的数据,在Cloudera Enterprise 6.0.x是不支持ORC格式的,不过在随后的Cloudera Enterprise 6.1.x支持了ORC格式,有这方面业务的需要考虑下这种特殊情况。

注意:如果reader是ODPS,字段信息的定义顺序没有限制,但尽量和writer中定义的字段信息的顺序一致,或者writer中定义字段信息时要和reader处的顺序保持一致。字段名不要包含空字符,否则DataX验证类型时会报错。

{"job": {"setting": {"speed": {"channel": "3"}},"content": [{"reader": {"name": "odpsreader","parameter": {"accessId": "3oL**********BDZ","accessKey": "Myk********************WY9UTly","project": "targetProjectName","table": "tableName","column": ["*"],"partition": ["pt=20141010000000,year=2014"],"odpsServer": "http://xxx/api","tunnelServer": "http://xxx","splitMode": "record"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://cdh1:8020","fileType": "TEXT","path": "/user/hive/warehouse/kudu_test.db/dataxtype_test","fileName": "xxxx","writeMode": "append","fieldDelimiter": "?","column": [{"name": "f1","type": "BIGINT"},{"name": "f2","type": "STRING"},{"name": "f3","type": "BOOLEAN"},{"name": "f4","type": "DOUBLE"},{"name": "f5","type": "FLOAT"},{"name": " f6","type": "TIMESTAMP"}]}}}]}
}

注意

  1. writer中的column一定要对读取的每个字段定义字段名和字段类型。关于分隔符,可以使用一些非常特殊的单字符,比如输入法中提供的特殊字符(最好测试一下,有些虽然是单字符但也不支持),如果想使用多字符作为分隔符,可以查看我的另一篇blog Hive中的自定义分隔符。在writer中会定义pathfileName,这里可能有些人会有个疑问,就是为什么定义fileName,这个文件名并不是严格的文件名,只是文件名的一个前缀标识,多个并行时会生成fileName__随机字符串的文件。

  2. 如果 HDFS 开启了 HA ,当 NameNode 发生切换是会造成 读取 HDFS 或者写入 HDFS 数据失败,此时可以配置上 Hadoop HA 相关配置,DataX 本身也是支持这样配置的,在 json 的 reader 或者 writer 下的 parameter 中添加如下配置:

    "hadoopConfig":{"dfs.nameservices": "cluster2","dfs.ha.namenodes.cluster2": "nn1,nn2","dfs.namenode.rpc-address.cluster2.nn1": "x.xx.xxx.101:8020","dfs.namenode.rpc-address.cluster2.nn2": "x.xx.xxx.102:8020","dfs.client.failover.proxy.provider.cluster2": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    }
    
  3. 如果开启 Kerberos 认证,则需要在 json 的 reader 或者 writer 下的 parameter 中添加如下配置,更新见下文 5. 带 Kerberos 的域外数据传输

    "haveKerberos": true,
    "kerberosKeytabFilePath": "/xxx/xxxx/xxx.keytab",
    "kerberosPrincipal": "xxx/xxxx@XXX.COM",
    
  4. 如果是在外域访问,直接读取或写入 DataNode 时原集群解析的内网 IP 可能无法直接访问,则需要在 json 的 reader 或者 writer 下的 parameter 中添加如下配置:

    "hadoopConfig":{"dfs.http.policy": "HTTPS_ONLY","dfs.data.transfer.protection": "integrity","dfs.client.use.datanode.hostname": true,"dfs.datanode.address": "x.x.x.x:1004"
    }
    

3. HDFS同步数据到另一个HDFS

将下面json保存为 xxx.json,然后执行 $DATAX_HOME/bin/datax.py xxx.json
注意:reader定义字段类型时一定要和HDFS上的文件的列的顺序一致(也就是建表时字段的顺序),writer时和reader列顺序一致。

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "hdfsreader","parameter": {"path": "/user/hive/warehouse/hive_test.db/dataxtype_test/*","defaultFS": "hdfs://one-hdfs:8020","fileType": "text","encoding": "UTF-8","fieldDelimiter": "|","column": [{"type": "Long","index": "0"},{"type": "String","index": "1"},{"type": "Boolean","index": "2"},{"type": "Double","index": "3"},{"type": "Double","index": "4"},{"type": "Date","index": "5"}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://cdh1:8020","fileType": "TEXT","path": "/user/hive/warehouse/kudu_test.db/dataxtype_test","fileName": "xxxx","writeMode": "append","fieldDelimiter": "?","column": [{"name": "f1","type": "BIGINT"},{"name": "f2","type": "STRING"},{"name": "f3","type": "BOOLEAN"},{"name": "f4","type": "DOUBLE"},{"name": "f5","type": "FLOAT"},{"name": " f6","type": "TIMESTAMP"}]}}}]}
}

4. MongoDB同步数据到HDFS

将下面json保存为 xxx.json,然后执行 $DATAX_HOME/bin/datax.py xxx.json。如果测试数据可以先导到本地"defaultFS": "file:///",如果格式没问题再改为"defaultFS": "hdfs://bigdata001:8020",。或者因为端口问题只能通过某个服务上传,可以直接将远程连接读入MongoDB的数据通过FTP方式发送到指定服务上,这里只需将hdfswriter改为ftpwriter。

{"job": {"setting": {"speed": {"channel": "3"}},"content": [{"reader": {"name": "mongodbreader","parameter": {"address": ["xx.xxx.x.xx:27017"],"userName": "","userPassword": "","dbName": "tag_per_data","collectionName": "tag_data12","column": [{"name": "_id","type": "string"},{"name": "r_cert_no","type": "string"},{"name": "r_name","type": "string"},{"name": "b_cert_no","type": "string"},{"name": "b_name","type": "string"},{"name": "r_flag","type": "string"},{"name": "body_md5","type": "Array","spliter": ""}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "file:///","fieldDelimiter": "╬","fileName": "m_cpws_info_w3","fileType": "orc","path": "/root/mongo_data/m_cpws_info_w3","writeMode": "append","column": [{"name": "_id","type": "string"},{"name": "r_cert_no","type": "string"},{"name": "r_name","type": "string"},{"name": "b_cert_no","type": "string"},{"name": "b_name","type": "string"},{"name": "r_flag","type": "string"},{"name": "body_md5","type": "string"}]}}}]}
}

Info:但在MongoDB右密码时,用但在MongoDB右密码时,用$MONGO_HOME/bin/mongo --host xx.xxx.x.xx --port 27017 -u "用户名" -p "密码" --authenticationDatabase "admin"可以正常访问给定的库,但是在上述mongodbreader中配置上用户名和免密依然没有权限访问。

这是可以临时这样迁移数据:

# 登陆远程MongoDB
$MONGO_HOME/bin/mongo --host xx.xxx.x.xx --port 27017 -u "用户名" -p "密码"  --authenticationDatabase "admin"
# 切换到给定库下
use 库名
# 查看表字段信息,不显示 details 字段值
db.getCollection("表名").find({},{details: 0}).limit(1)

然后用MongoDB自带的数据导出工具mongoexport导出数据为csv文件

$MONGO_HOME/bin/mongoexport --host xx.xxx.x.xx --port 27017 \
-d 库名 -c 表名 -u "用户名" -p "密码"  --authenticationDatabase "admin"  \
-o ./data/表名.csv --type csv \
-f "_id,field_01,field_02,field_03,……"

也许这个数据文件非常大,可以对这个文件进行切分,使用Linux的sed命令。

#每100万条数据切分为一个文件
sed -n '1,1000000'p axd_tao_ord.csv >> axd_tao_ord_0000001-1000000.csv
sed -n '1000001,2000000'p axd_tao_ord.csv >> axd_tao_ord_1000001-2000000.csv
sed -n '2000001,3000000'p axd_tao_ord.csv >> axd_tao_ord_2000001-3000000.csv
……# 这里也可以使用split命令
split -l 1000000  axd_tao_ord.csv axd_tao_ord_

5. 带 Kerberos 的域外数据传输

加入现在数据源为 ODPS ,需要定时将数据写入 HDFS ,但是 HDFS 是在另外一个集群且网络是隔离的,此时 ODPS 相对于 HDFS 就类似与域外的访问。在 ODPS 所处的环境下有一个调度系统,调度系统的某个 Worker 申请了权限可以成功对接访问 HDFS,HDFS 集群开启了SASL和 Kerberos访问认证。

对于这种情况 DataX 也可以成功实现数据的传输离线同步,需要配置上 Kerberos 的参数,这里重点是对于这种域外访问,直接写入数据时,数据会写入到远程的 DataNode 节点,但是这个节点的 ip 是一个集群内网的ip,这样在连接访问DataNode 时就会超时异常,如下图,写入数据时会报6000 mills timeout while waiting for channel to be ready for connect. ch:java.nio.channels.SocketChannel[connection-pending remote=xxx.xxx.xxx.xxx:1004] 而remote连接的那个ip正式 Hadoop 内网集群的ip,对于这种需要指定如下的参数(在hdfs-site.xml),如果是程序中,可以通过这种方式设定 Configuration conf=new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true");

<property><name>dfs.client.use.datanode.hostname</name><value>true</value><description>only cofig in clients</description>
</property>

对于DataX 只需要在 hadoopConfig 中添加配置参数 "dfs.client.use.datanode.hostname": true,完整的配置如下:

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "odpsreader","parameter": {"accessId": "3o************DZ","accessKey": "Myk************************Tly","odpsServer": "http://xx.x.xxx.xxx/api","tunnelServer": "http://xx.x.xxx.xxx","project": "targetProjectName","table": "tableName","column": ["customer_id","nickname"]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://xx.x.xxx.xxx:8020","path": "/user/hive/warehouse/xxx.db/xxx_tb","fileType": "orc","compress": "NONE","fieldDelimiter": "|","fileName": "xxx","column": [{"name": "col1","type": "BIGINT"},{"name": "col2","type": "STRING"},{"name": "col3","type": "TIMESTAMP"},{"name": "col4","type": "date"}],"haveKerberos": true,"kerberosKeytabFilePath": "/xxx/xxxx/xxx.keytab","kerberosPrincipal": "xxx/xxxx@XXX.COM","hadoopConfig": {"dfs.http.policy": "HTTPS_ONLY","dfs.data.transfer.protection": "integrity","dfs.client.use.datanode.hostname": true,"dfs.datanode.address": "x.x.x.x:1004"},"writeMode": "append"}}}]}
}

DataX离线数据同步相关推荐

  1. DataX离线数据同步工具/平台

    DataX离线数据同步工具/平台 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.SQL Server.Oracle.PostgreSQL.HDFS.Hive.O ...

  2. 离线数据同步神器:DataX,支持几乎所有异构数据源的离线同步到MaxCompute

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 概述 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlSer ...

  3. 离线数据同步平台datax+报表可视化平台metabase

    datax DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase.TableS ...

  4. datax 持续数据同步_DataX数据同步

    因为选择了ElasticSearch做全文检索,需要把数据从数据库同步到es,此处选择开源方案DataX作为数据同步工具. 官方文档很详细,可以直接参考. DataXDataX 是一个异构数据源离线同 ...

  5. 使用 DataX 实现数据同步(高效的同步工具)

    DataX 使用介绍 前言 一.DataX 简介 1.DataX3.0 框架设计 2.DataX3.0 核心架构 二.使用 DataX 实现数据同步 1.Linux 上安装 DataX 软件 2.Da ...

  6. 基于dataX的数据同步平台搭建

    前言 基于Java和DataX工具实现数据同步的后台管理,包括数据同步任务的生成,任务的管理,查看任务的执行日志,解析任务的执行结果等功能. 内含一些技术实现方案.心得体会和填坑经验等干货. 阅读本文 ...

  7. datax 持续数据同步_采用DataX实现多表增量数据同步

    这两天验证了一下阿里的DataX数据同步工具,觉得DataX可以用来做管理数据的多级数据同步.DataX用来做批量数据迁移很适合,能够保证数据的一致性,性能也很好,结合时间戳字段,用来实现数据定时增量 ...

  8. 【Android】【功能设计】离线数据同步方案

    这里考虑的是本地数据不会被其它用户修改的情况,类似于微信,离线任务都是自己要发出的消息,不会和网络数据产生冲突 对于多个离线用户,同时修改同一份数据的情况,不适合使用此方案,大多时候也不允许离线使用 ...

  9. 基于文件的离线数据同步方案

    产品此前的数据备份方案,存在不少问题,所以需要设计一个新的方案.本文总结一下新旧方案的优劣 首先APP是一个支持离线的应用.本地数据保存在sqlite,在离线环境下,在本地数据库里读写记录,在有网络的 ...

最新文章

  1. 使用python画CDF
  2. Linux下的shell编程入门
  3. 四种高性能数据类型,Python collections助你优化代码、简洁任务
  4. MVC4项目中验证用户登录一个特性就搞定
  5. MyBatis与Hibernate比较
  6. Spring Boot和Hibernate:打印查询和变量
  7. 基于TCP的在线聊天程序
  8. PHP连接sql seaver数据库
  9. 【初学线段树,看这篇文章准没错】线段树(单点修改and区间修改)acm寒假集训日记22/1/10
  10. 测试面试题,自动化测试与性能测试篇(附答案)
  11. LoudMiner:伪装在VST软件中的跨平台挖矿软件
  12. VS实现格式化代码及代码缩进
  13. python医学科研中能做什么-科研画图都用什么软件?
  14. QGC调试px4固件飞控
  15. 外卖小程序邀请入口获取推广路径
  16. viterbi 中文分词-超简单版
  17. 简单五步看懂伦敦的银标
  18. STM32H743开发板移植micropython并外扩32M的SQPI flash和32M的SDRAM
  19. TFN D550S 100G SDH综合测试分析仪性能如何
  20. 任务诱发的瞳孔反应(The Task-evoked Pupillary Response)

热门文章

  1. 台式计算机cpu是什么,台式电脑CPU是几核怎么查看
  2. 梯度下降法的三种形式BGD、SGD、MBGD及python实现
  3. 我的世界java笔刷指令_《我的世界》基本笔刷教程
  4. 开源词典,开源阅读器
  5. 关闭windows键盘
  6. 中国玩具及少儿用品市场趋势报告、技术动态创新及市场预测
  7. ROS 创建工作空间流程
  8. Python 机器人学 —— 机械臂工作空间分析
  9. Thunderbird使用记
  10. TAE V3.0 — 全新的通用自动化测试软件