一 Datax概览

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

二 Datax框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

·Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

·Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

·Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

三 Datax插件体系

经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:

类型

数据源

Reader

Writer

文档

RDBMS 关系型数据库

MySQL

读 、写

Oracle

读 、写

SQLServer

读 、写

PostgreSQL

读 、写

DRDS

读 、写

达梦

读 、写

通用RDBMS(支持所有关系型数据库)

读 、写

阿里云数仓数据存储

ODPS

读 、写

ADS

OSS

读 、写

OCS

读 、写

NoSQL数据存储

OTS

读 、写

Hbase0.94

读 、写

Hbase1.1

读 、写

MongoDB

读 、写

Hive

读 、写

无结构化数据存储

TxtFile

读 、写

FTP

读 、写

HDFS

读 、写

Elasticsearch

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源

四 Datax核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

核心模块介绍:

1.DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2.DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3.切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

4.每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

5.DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

Datax调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:

1.DataXJob根据分库分表切分成了100个Task。

2.根据20个并发,DataX计算共需要分配4个TaskGroup。

3.4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

五 Datax核心优势

l可靠的数据质量监控

完美解决数据传输个别类型失真问题

DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。

提供作业全链路的流量、数据量运行时监控

DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。

提供脏数据探测

在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!

l丰富的数据转换功能

DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。

l精准的速度控制

新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。

"speed": {"channel": 5,"byte": 1048576,"record": 10000}

l强劲的同步性能

DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。

l健壮的容错机制

DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。

线程内部重试

不同的网络交互方式都有不同的重试策略。

线程级别重试

目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

l极简的使用体验

易用

下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。

详细

DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。

传输过程中打印传输速度、进度等

传输过程中会打印进程相关的CPU、JVM等

在任务结束之后,打印总体运行情况

六 样例

实际工作中,比较常用的是

MysqlReaderMysqlWriterHdfsReaderHdfsWriter

下面以datax抽取mysql数据写入hdfs为例:

{"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": [ 'id','name'],"where": "gmt_created>='$bizdate' and gmt_created

6.1 参数说明

6.1.1 reader参数说明

jdbcUrl

描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,MysqlReader报错。注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。

必选:是

默认值:无

username

描述:数据源的用户名

必选:是

默认值:无

password

描述:数据源指定用户名的密码

必选:是

默认值:无

table

描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。

必选:是

默认值:无

column

描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如['*']。

支持列裁剪,即列可以挑选部分列进行导出。

支持列换序,即列可以不按照表schema信息进行导出。

支持常量配置,用户需要按照Mysql SQL语法格式: ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,`table`为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

必选:是

默认值:无

splitPk

描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。

推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。

必选:否

默认值:空

where

描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。

where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

必选:否

默认值:无

querySql

描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户 可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。

必选:否

默认值:无

6.1.2 writer参数说明

defaultFS

描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9000

必选:是

默认值:无

fileType

描述:文件的类型,目前只支持用户配置为"text"或"orc"。

text表示textfile文件格式

orc表示orcfile文件格式

必选:是

默认值:无

path

描述:存储到Hadoop hdfs文件系统的路径信息,HdfsWriter会根据并发配置在Path目录下写入多个文件。为与hive表关联,请填写hive表在hdfs上的存储路径。例:Hive上设置的数据仓库的存储路径为:/user/hive/warehouse/,已建立数据库:test,表:hello;则对应的存储路径为:/user/hive/warehouse/test.db/hello

必选:是

默认值:无

fileName

描述:HdfsWriter写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。

必选:是

默认值:无

column

描述:写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型,其中:name指定字段名,type指定字段类型。

用户可以指定Column字段信息,配置如下:

"column":[{"name": "userName","type": "string"},{"name": "age","type": "long"}]

必选:是

默认值:无

writeMode

描述:hdfswriter写入前数据清理处理模式:

§append,写入前不做任何处理,DataX hdfswriter直接使用filename写入,并保证文件名不冲突。

§nonConflict,如果目录下有fileName前缀的文件,直接报错。

必选:是

默认值:无

fieldDelimiter

描述:hdfswriter写入时的字段分隔符,需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据

必选:是

默认值:无

compress

描述:hdfs文件压缩类型,默认不填写意味着没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)。

必选:否

默认值:无压缩

hadoopConfig

描述:hadoopConfig里可以配置与Hadoop相关的一些高级参数,比如HA的配置。

"hadoopConfig":{"dfs.nameservices": "testDfs","dfs.ha.namenodes.testDfs": "namenode1,namenode2","dfs.namenode.rpc-address.aliDfs.namenode1": "","dfs.namenode.rpc-address.aliDfs.namenode2": "","dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"}

必选:否

默认值:无

encoding

描述:写文件的编码配置。

必选:否

默认值:utf-8,慎重修改

haveKerberos

描述:是否有Kerberos认证,默认false

例如如果用户配置true,则配置项kerberosKeytabFilePath,kerberosPrincipal为必填。

必选:haveKerberos 为true必选

默认值:false

kerberosKeytabFilePath

描述:Kerberos认证keytab文件路径,绝对路径

必选:否

默认值:无

kerberosPrincipal

描述:Kerberos认证Principal名,如xxxx/hadoopclient@xxx.xxx

必选:haveKerberos 为true必选

默认值:无

6.2 类型转换

6.2.1 reader类型转换

目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出MysqlReader针对Mysql类型转换列表:

DataX 内部类型

Mysql 数据类型

Long

int, tinyint, smallint, mediumint, int, bigint

Double

float, double, decimal

String

varchar, char, tinytext, text, mediumtext, longtext, year

Date

date, datetime, timestamp, time

Boolean

bit, bool

Bytes

tinyblob, mediumblob, blob, longblob, varbinary

请注意:

·除上述罗列字段类型外,其他类型均不支持。

·tinyint(1) DataX视作为整形。

·year DataX视作为字符串类型

·bit DataX属于未定义行为。

6.2.2 writer类型转换

目前 HdfsWriter 支持大部分Hive类型,请注意检查你的类型。

下面列出 HdfsWriter 针对Hive数据类型转换列表:

DataX 内部类型

HIVE 数据类型

Long

TINYINT,SMALLINT,INT,BIGINT

Double

FLOAT,DOUBLE

String

STRING,VARCHAR,CHAR

Boolean

BOOLEAN

Date

DATE,TIMESTAMP

7 源码修改,功能优化

7.1 解决datax抽取mysql数据到hdfs后null值变成‘’(空字符串)的问题

原理:在hdfs中,null值存的是\N

7.2解决datax抽hdfs数据到mysql之null值变成\N或者 转换错误 的问题

修改datax源码plugin-unstructured-storage-util下的UnstructuredStorageReaderUtil.class

加上一个判断,因为在hdfs中,null值存储的是\N ,所以需要把它转换成null存储到Mysql中

7.3解决datax抽取hdfs文件有空文件时报错

// add by liuzcif (f.getLen() == 0) {String message = String.format("code add by liuzc _文件[%s]长度为0,将会跳过不作处理!", f.getPath().toString());LOG.warn(message);}else {addSourceFileByType(f.getPath().toString());}

7.4datax抽取mongoDB只需要访问隐藏节点源码修改

背景:由于在mongoDB的业务库数据量较大,且使用方比较多,搜索,业务方和数仓,使得主节点和其他从节点压力很大,为了不影响正常的业务,DBA新增了一台隐藏节点作为搜索和数仓抽取数据用,用datax抽取mongo的数据默认是主节点优先,所以需要修改datax-mongodbreader的源码,使得抽数据时只访问隐藏节点

package com.alibaba.datax.plugin.reader.mongodbreader;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.Iterator;import java.util.List;import com.alibaba.datax.common.element.BoolColumn;import com.alibaba.datax.common.element.DateColumn;import com.alibaba.datax.common.element.DoubleColumn;import com.alibaba.datax.common.element.LongColumn;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.element.StringColumn;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordSender;import com.alibaba.datax.common.spi.Reader;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil;import com.alibaba.datax.plugin.reader.mongodbreader.util.MongoUtil;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.google.common.base.Joiner;import com.google.common.base.Strings;import com.mongodb.*;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoCursor;import com.mongodb.client.MongoDatabase;import org.bson.Document;import org.bson.types.ObjectId;/*** Created by jianying.wcj on 2015/3/19 0019.* Modified by mingyan.zc on 2016/6/13.* Modified by mingyan.zc on 2017/7/5.*/public class MongoDBReader extends Reader {public static class Job extends Reader.Job {private Configuration originalConfig = null;private MongoClient mongoClient;private String userName = null;private String password = null;@Overridepublic List split(int adviceNumber) {return CollectionSplitUtil.doSplit(originalConfig,adviceNumber,mongoClient);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();//add by liuzcList addressList = originalConfig.getList(KeyConstant.MONGO_ADDRESS);String host = "";int port = 0;try {List serverAddressesList = MongoUtil.parseServerAddress(addressList);if (serverAddressesList.size() > 0){ServerAddress serverAddress = serverAddressesList.get(0);host = serverAddress.getHost();port = serverAddress.getPort();}} catch (UnknownHostException e) {throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"lzc---不合法的地址");}this.userName = originalConfig.getString(KeyConstant.MONGO_USER_NAME, originalConfig.getString(KeyConstant.MONGO_USERNAME));this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD, originalConfig.getString(KeyConstant.MONGO_PASSWORD));String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE));String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database);if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {//this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb);//update by liuzcMongoClientURI mongoClientURI = new MongoClientURI("mongodb://"+userName+":"+password+"@"+host+":"+port+"/"+database+"?readPreference=secondaryPreferred");mongoClient = new MongoClient(mongoClientURI);} else {this.mongoClient = MongoUtil.initMongoClient(originalConfig);}}@Overridepublic void destroy() {}}public static class Task extends Reader.Task {private Configuration readerSliceConfig;private MongoClient mongoClient;private String userName = null;private String password = null;private String authDb = null;private String database = null;private String collection = null;private String query = null;private JSONArray mongodbColumnMeta = null;private Object lowerBound = null;private Object upperBound = null;private boolean isObjectId = true;@Overridepublic void startRead(RecordSender recordSender) {if(lowerBound== null || upperBound == null ||mongoClient == null || database == null ||collection == null || mongodbColumnMeta == null) {throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());}MongoDatabase db = mongoClient.getDatabase(database);MongoCollection col = db.getCollection(this.collection);MongoCursor dbCursor = null;Document filter = new Document();if (lowerBound.equals("min")) {if (!upperBound.equals("max")) {filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound));}} else if (upperBound.equals("max")) {filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound));} else {filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound).append("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound));}if(!Strings.isNullOrEmpty(query)) {Document queryFilter = Document.parse(query);filter = new Document("$and", Arrays.asList(filter, queryFilter));}dbCursor = col.find(filter).iterator();while (dbCursor.hasNext()) {Document item = dbCursor.next();Record record = recordSender.createRecord();Iterator columnItera = mongodbColumnMeta.iterator();while (columnItera.hasNext()) {JSONObject column = (JSONObject)columnItera.next();Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME));if (tempCol == null) {if (KeyConstant.isDocumentType(column.getString(KeyConstant.COLUMN_TYPE))) {String[] name = column.getString(KeyConstant.COLUMN_NAME).split("\\.");if (name.length > 1) {Object obj;Document nestedDocument = item;for (String str : name) {obj = nestedDocument.get(str);if (obj instanceof Document) {nestedDocument = (Document) obj;}}if (null != nestedDocument) {Document doc = nestedDocument;tempCol = doc.get(name[name.length - 1]);}}}}if (tempCol == null) {//continue; 这个不能直接continue会导致record到目的端错位record.addColumn(new StringColumn(null));}else if (tempCol instanceof Double) {//TODO deal with Double.isNaN()record.addColumn(new DoubleColumn((Double) tempCol));} else if (tempCol instanceof Boolean) {record.addColumn(new BoolColumn((Boolean) tempCol));} else if (tempCol instanceof Date) {record.addColumn(new DateColumn((Date) tempCol));} else if (tempCol instanceof Integer) {record.addColumn(new LongColumn((Integer) tempCol));}else if (tempCol instanceof Long) {record.addColumn(new LongColumn((Long) tempCol));} else {if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);if(Strings.isNullOrEmpty(splitter)) {throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());} else {ArrayList array = (ArrayList)tempCol;String tempArrayStr = Joiner.on(splitter).join(array);record.addColumn(new StringColumn(tempArrayStr));}} else {record.addColumn(new StringColumn(tempCol.toString()));}}}recordSender.sendToWriter(record);}}@Overridepublic void init() {//add by liuzcthis.readerSliceConfig = super.getPluginJobConf();List addressList = readerSliceConfig.getList(KeyConstant.MONGO_ADDRESS);String host = "";int port = 0;try {List serverAddressesList = MongoUtil.parseServerAddress(addressList);if (serverAddressesList.size() > 0){ServerAddress serverAddress = serverAddressesList.get(0);host = serverAddress.getHost();port = serverAddress.getPort();}} catch (UnknownHostException e) {throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"lzc---不合法的地址");} catch (Exception e) {throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"lzc---未知异常");}this.userName = readerSliceConfig.getString(KeyConstant.MONGO_USER_NAME, readerSliceConfig.getString(KeyConstant.MONGO_USERNAME));this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD, readerSliceConfig.getString(KeyConstant.MONGO_PASSWORD));this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME, readerSliceConfig.getString(KeyConstant.MONGO_DATABASE));this.authDb = readerSliceConfig.getString(KeyConstant.MONGO_AUTHDB, this.database);if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {//mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb);//update by liuzcMongoClientURI mongoClientURI = new MongoClientURI("mongodb://"+userName+":"+password+"@"+host+":"+port+"/"+database+"?readPreference=secondaryPreferred");mongoClient = new MongoClient(mongoClientURI);} else {mongoClient = MongoUtil.initMongoClient(readerSliceConfig);}this.collection = readerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);this.query = readerSliceConfig.getString(KeyConstant.MONGO_QUERY);this.mongodbColumnMeta = JSON.parseArray(readerSliceConfig.getString(KeyConstant.MONGO_COLUMN));this.lowerBound = readerSliceConfig.get(KeyConstant.LOWER_BOUND);this.upperBound = readerSliceConfig.get(KeyConstant.UPPER_BOUND);this.isObjectId = readerSliceConfig.getBool(KeyConstant.IS_OBJECTID);}@Overridepublic void destroy() {}}}

datax 高级_Datax-数据抽取同步利器相关推荐

  1. datax 高级_DATAx上海峰会-百格活动

    DATAx上海峰会由英国领先的会议主办方Innovation Enterprise倾力打造.前身为大数据与分析创新峰会,该峰会已在欧美的多个国家和地区成功举办近10年.今年,通过重新对品牌的思考和定义 ...

  2. datax 高级_datax日志

    {"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],&q ...

  3. 通过零代码ETLCloud实现马帮ERP数据自动化同步

    马帮ERP介绍 马帮ERP是一款云端跨境电商管理软件.与传统的ERP系统不同,马帮ERP专注于跨境电商领域,为电商企业提供一站式管理解决方案,包括财务管理.采购管理.进销存管理.订单管理等功能模块.该 ...

  4. datax参数设置_DataX Web数据增量同步配置说明

    一.根据日期进行增量数据抽取 1.页面任务配置 打开菜单任务管理页面,选择添加任务 按下图中5个步骤进行配置 1.任务类型选DataX任务 2.辅助参数选择时间自增 3.增量开始时间选择,即sql中查 ...

  5. 数据抽取工具比对:Kettle、Datax、Sqoop、StreamSets

    数据抽取工具比对:Kettle.Datax.Sqoop.StreamSets 工具比对: Kettle 特性:纯Java编写 优点:可在Windows.linux.Unix上执行:数据抽取高效稳定:子 ...

  6. 大数据常用同步工具(DataX/Sqoop/Nifi/Canal等)

    一.离线数据同步 DataX 阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单,操作简单通常只需要两步: 创建作业的配置文件(json格式配 ...

  7. etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别

    什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...

  8. datax实现mysql数据同步

    前言 DataX 是阿里内部广泛使用的离线数据同步工具/平台,可以实现包括 MySQL.Oracle.HDFS.Hive.OceanBase.HBase.OTS.ODPS 等各种异构数据源之间高效的数 ...

  9. 数据实时同步或抽取上收的技术分析(转)

    1 实现数据集中的技术手段分析比较 根据业界提供数据同步或抽取的解决方案来看,主要包括以下几大类: l 存储复制技术 l 数据库复制技术 l ETL抽取技术 1.1 存储复制技术 实现原理 存储复制技 ...

最新文章

  1. JavaFX项目jar使用javafxpackager生成exe
  2. cacti config.php,cacti监控系统的安装配置
  3. javascript --- repeat的用处
  4. 用户模块开发 分类模块 商品模块 购物车模块
  5. String类、StringBuffer类、StringBuilder类的区别
  6. 华为交换机S3700端口基本配置
  7. 还在用ViT的16x16 Patch分割方法吗?中科院自动化所提出Deformable Patch-based方法,涨点显著!...
  8. php的setinc方法,thinkphp3.2.0 setInc方法 源码全面解析
  9. CCNA之单臂路由实验
  10. UVA11877 The Coco-Cola Store【模拟】
  11. 一题多解(六)—— 一个数二进制形式 1 的个数
  12. [图:知识竞赛题库PPT制作] 为上海棒约翰餐饮管理有限公司定制的的知识竞赛题目及展示界面-PPT格式-双屏展示。
  13. 和平精英有电脑版吗_和平精英电脑版灵敏度调多少 CP端灵敏度设置攻略
  14. 一次破解TP-Link WAR308路由器的经历(2)
  15. word去掉万恶的域代码
  16. CF1428F-Fruit Sequences
  17. 三极管PNP NPN 的判别
  18. 【WAF技巧拓展】————8、我的WafBypass之道(Misc篇)
  19. 怎么看越努力,越幸运?
  20. 活动安排问题(贪心算法)

热门文章

  1. java.io.IOException: exception unwrapping private key - java.security.InvalidKeyException
  2. 【Milvus的以文搜图】
  3. Power Query M语言所有Number函数,分类汇总掌握
  4. c语言笔试题大全,C语言面试题汇总(持续更)
  5. 初学nodejs——实现一个简易论坛(完成简单的登录、注册、发帖、评论功能)
  6. 完美通行证身份证号格式_如何渲染3D足球通行证网络
  7. 【Elsevier投稿】解决投稿系统latex编译不出来的问题!!!
  8. 医疗器械与服务行业:医美经济时代,国货崛起正当时(20210103).PDF
  9. Artistic Style详解
  10. C语言基础选择题100道(附答案)03