什么是Transporter?

transporter 是一款简单而又强大的数据迁移工具。它通过一种的agnostic message format数据形式轻松的将不同数据来源不同格式的数据进行转换。

transporter 可以在不同数据库之间进行数据转换迁移,同时也可以将text文件迁移至其他数据库。transporter连接不同数据源的媒介称为Adaptor. Adaptor可以配置为读数据的Source端也可以配置为作为写目标的Sink端。典型的Transporter包含一个Source和一个Sink,通过数据管道pipeline进行转换传输。transporter包含一系列本地或者JavaScript函数形式的转换器(Transformers),通过转换器可以将源数据格式进行过滤、转换以便正确的写入Sink目标数据源。

Transporter 命令

transporter 包含以下命令:

  • init - Configure Transporter 生产配置文件pipeline.js
  • about - List available adaptors
  • run - Run Transporter 运行transporter
  • test - Test Transporter configuration
  • xlog - Manage the commit log
  • offset - Manage the offsets for sinks (see xlog)

init

transporter init [source adaptor name] [sink adaptor name]

运行init命令,在当前目录下生产基本的pipeline.js配置文件,如:

$ transporter init mongodb elasticsearch
Writing pipeline.js...
$ cat pipeline.js
var source = mongodb({"uri": "${MONGODB_URI}"// "timeout": "30s",// "tail": false,// "ssl": false,// "cacerts": ["/path/to/cert.pem"],// "wc": 1,// "fsync": false,// "bulk": false,// "collection_filters": "{}"
})var sink = elasticsearch({"uri": "${ELASTICSEARCH_URI}"// "timeout": "10s", // defaults to 30s// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
})t.Source(source).Save(sink)
// t.Source("source", source).Save("sink", sink)
// t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

编辑pipeline.js文件 配置相应的source数据源和sink目标数据源。

about

transporter about [adaptor name]

列出所有可用的adaptor, 通过adaptor name指定adaptor可以获得配置详情, 如:

$ transporter about
elasticsearch - an elasticsearch sink adaptor
file - an adaptor that reads / writes files
mongodb - a mongodb adaptor that functions as both a source and a sink
postgres - a postgres adaptor that functions as both a source and a sink
rabbitmq - an adaptor that handles publish/subscribe messaging with RabbitMQ
rethinkdb - a rethinkdb adaptor that functions as both a source and a sink$ transporter about rethinkdb
rethinkdb - a rethinkdb adaptor that functions as both a source and a sinkSample configuration:
{"uri": "${RETHINKDB_URI}"// "timeout": "30s",// "tail": false,// "ssl": false,// "cacerts": ["/path/to/cert.pem"]
}
$

run

transporter run [-log.level "info"] [<application.js>]

运行pipeline脚本,若未指定,则默认参数名为 pipelin.js 。

test

transporter test [-log.level "info"] [<application.js>]

test 一般用于调试,执行后只建立和source及sink数据源的连接,并不执行真正的数据迁移。

xlog

transporter xlog --log_dir=/path/to/log oldest|current|show [OFFSET]

offset

transporter offset --log_dir=/path/to/log list|show|mark|delete [SINK] [OFFSET]

switches

-log.level "info" - 设置日志级别,默认为info,可设置为debug或者error。

Adaptor

adaptor的作用就是从source读取数据或者写入数据至sink。Transporter 使用adaptor作为输入和输出的媒介。 可以通过 transporter about列出可用的adaptor列表。

elasticsearch

elasticsearch adaptor 仅作为sink,写入数据至elasticsearch。以json格式写入数据至elasticsearch索引。

MongoDB

MongoDB adaptor 既可以作为source又可以作为sink。写入时, MongoDB adaptor可通过bulk模式批量执行更新或者创建请求;

file

file adaptor 既可以作为source 读取硬盘文件又可以作为sink写入硬盘文件。写入时, 将transptorter adaptor 内部数据结构Message转为json string写入文件;读取时,adaptor默认磁盘文件每一行为一个json string, 解析并转为transporter 内部数据结构(Message)。

除此之外, transporter 的adaptor还支持postgres, rethinkdb以及rabbitmq。

Messages 数据结构

Messages 作为source到sink的中间过渡的数据结构。完成source到sink的数据转换(因为不同数据源之间无法直接转换)。 messages 是一个JavaScript的对象, 由4个字段组成。

{"ns":"message.namespace","ts":12345, // time represented in milliseconds since epoch"op":"insert","data": {"id": "abcdef","name": "hello world"}
}

data

包含source读取的文档数据,以key/value的json格式数据。

ns

ns(namespace), 为pipeline.js中配置的namespace参数。

This field contains the namespace string which is matched with the namespace parameters in the Transporter pipeline.

op

对sink执行的操作。

The operation that this data should be used to reflect when being written by a Sink. The op field is determined by the Source when being read and can be insert, update, delete, command or noop.

ts

ts(timestamp)时间戳。

The timestamp for the message. This is a Unix epoch time which reflects when the message was created.

Transformers--转换器

有两种形式的transformer:native 及 JavaScript function。 native transformer是go语言编写,内置于transporter中。

JavaScript transformer 由用户通过编写JavaScript function组成。

基本的无transformer的pipeline:

t.Source(source).Save(sink);

通过.Transform()函数添加一个transformer:

t.Source(source).Transform(transformer({"param":"value"})).Save(sink);

如,添加一个omit transformer,用于忽略“internalid”字段:

t.Source(source).Transform(omit({"fields":["internalid"]})).Save(sink);

Native transformer

omit

omit() - 移除messages顶层字段, 把不需要迁移的字段过滤掉。

omit({"fields": ["name"]})

例如:

输入:

{"_id": 0,"name": "transporter","type": "function"
}

omit过滤 “type” 字段:

omit({"fields":["type"]})

输出:

{"_id": 0,"name": "transporter"
}

pick

选择需要迁移的字段,和omit相反。

pick({"fields": ["name"]})

例如:

输入:

{"_id": 0,"name": "transporter","type": "function"
}

pick 选择 _id 和name字段:

pick({"fields":["_id", "name"]})

输出:

{"_id": 0,"name": "transporter"
}

rename

rename() - 对输入字段重命名。

rename({"field_map": {"test":"renamed"}})

例如,输入:

{"_id": 0,"name": "transporter","type": "function","count": 10
}

rename重命名count字段为total字段:

rename({"field_map": {"count":"total"}})

输出:

{"_id": 0,"name": "transporter","type": "function","total": 10
}

skip

skip() - 跳过不满足条件的source记录,不进行迁移。

skip() will evalute the data based on the criteria configured and determine whether the message should continue down the pipeline or be skipped. When evaluating the data, true will result in the message being sent down the pipeline and false will result in the message being skipped.

skip({"field": "test", "operator": "==", "match": 10})

例如,输入:

{"_id": 0,"name": "transporter","type": "function","count": 10
}

skip选择count==10的source输入记录,置入pipeline中:

skip({"field": "count", "operator": "==", "match": 10})

输出:

{"_id": 0,"name": "transporter","type": "function","count": 10
}

skip选择count>20的source输入记录,置入pipeline中:

skip({"field": "count", "operator": ">", "match": 20})

上面ciount=10的记录将被跳过。

pretty

pretty 美化输出的json格式数据。

JavaScript transformer

JavaScript transformer 调用JavaScript 脚本处理message。transformer存在两种格式JavaScript 引擎:otto和goja。

There are two JavaScript engines currently in Transformer; the older otto and newer goja. The js() transformer function is actually an alias to the Goja JavaScript engine. The only external difference between the two is how functions should be created.

goja style

function transform(msg) {return msg
}

例如,存在如下数据:

{"ns":"message.namespace","ts":12345, // time represented in milliseconds since epoch"op":"insert","data": {"id": "abcdef","name": "hello world"}
}

NOTE when working with data from MongoDB, the _id field will be represented in the following fashion:

{"ns":"message.namespace","ts":12345, // time represented in milliseconds since epoch"op":"insert","data": {"_id": {"$oid": "54a4420502a14b9641000001"},"name": "hello world"}
}

goja调用js脚本:

goja({"filename": "/path/to/transform.js"})

如输入:

{"_id": 0,"name": "transporter","type": "function"
}

goja调用自定义的transform.js:

goja({"filename":"transform.js"})

transform.js 如下:

function transform(doc) {doc["data"]["name_type"] = doc["data"]["name"] + " " + doc["data"]["type"];return doc
}

输出:

{"_id": 0,"name": "transporter","type": "function","name_type": "transporter function"
}

可参考:https://github.com/compose/transporter/blob/master/function/gojajs/README.md

Otto style

module.exports = function(msg) {return msg;
};

可参考:https://github.com/compose/transporter/blob/master/function/ottojs/README.md

Transporter 配置

执行transporter init mongodb elasticsearch 命令初始化生成基本的pipeline.js配置文件(从MongoDB迁移至elasticsearch),或者直接编辑已存在的pipeline.js(名字随意).

pipeline.js:

var source = mongodb({"uri": "${MONGODB_URI}"// "timeout": "30s",// "tail": false,// "ssl": false,// "cacerts": ["/path/to/cert.pem"],// "wc": 1,// "fsync": false,// "bulk": false,// "collection_filters": "{}"
})var sink = elasticsearch({"uri": "${ELASTICSEARCH_URI}"// "timeout": "10s", // defaults to 30s// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
})t.Source(source).Save(sink)
// t.Source("source", source).Save("sink", sink)
// t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

pipelines

var source = mongodb({"uri": "${MONGODB_URI}"
})var sink = elasticsearch({"uri": "${ELASTICSEARCH_URI}"
})t.Source("source", source, "/.*/").Save("sink", sink, "/.*/") //默认的pipeline

t 代表 transporter对象,通过t调用三个JavaScript函数Source()、Transform()、Save()创建pipeline。

每个函数携带三个参数:name,adaptor,namespace。 如上,transporter配置了名为“source”的source数据源,指定变量source作为adaptor, 其命名空间为“/.*/”, ".*"为正则表达式,表示匹配所有MongoDB表。默认的namespace为“/.*/”。

namespace

namespace配置为指定的正则表达式。以MongoDB为例,transporter将迁移任何匹配namespace正则表达式的集合数据。若要精确指定某一MongoDB集合进行迁移,如指定迁移table集合,则namespace可以指定为“/^table$/”,即以table开头(^)以table结尾($)。若要将robo开头的MongoDB结合迁移,则source()的namespace参数可以设为“/^robo.*/”, 如果MongoDB存在robocop, robosaurusrobbietherobot集合,则robocop和robosaurus将会被迁移。

对于Save()和Transform(),namespace作为过滤器,只有满足过滤条件的message进入sink或者transform处理。

For Save() sinks and Transform() transformers, the namespace setting then acts as a filter on which messages those sink or transformers are applied to. The namespace of an incoming message has to match the sink's namespace setting to be processed by the sink or the transformer before being handed to them. If not specified, then the sinks and transformers will accept any message.

参考:https://github.com/compose/transporter/wiki/Pipelines

Running with Docker

参考:https://github.com/compose/transporter/wiki/Running-with-Docker

参考文档

https://github.com/compose/transporter

ElasticSearch数据迁移工具Transporter相关推荐

  1. Elasticsearch数据迁移工具elasticdump工具

    1.工具安装 #wget https://nodejs.org/dist/v8.11.2/node-v8.11.2-linux-x64.tar.xz#tar xf node-v8.11.2-linux ...

  2. elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...

  3. Elasticsearch数据备份与恢复(基于HDFS)

    Elasticsearch数据备份与恢复(基于HDFS) 1.(所有机子上)安装es hdfs仓库插件repository-hdfs # repository-hdfs一定要和es版本匹配 # 在线 ...

  4. 达梦数据迁移工具的使用

    作为国产数据库,达梦是比较典型的一种,开发工作中会用到其他数据库表迁移到达梦数据库的问题,在此记录一下迁移方法也为他人提供方便. 该工具仅用于其他数据库表及文本文件迁移达梦,或达梦数据库表迁移到其他数 ...

  5. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    一.为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数 ...

  6. Sqoop数据迁移工具的使用

    文章作者:foochane 原文链接:https://foochane.cn/article/2019063001.html Sqoop数据迁移工具的使用 sqoop简单介绍 sqoop数据到HDFS ...

  7. 用于Elasticsearch数据可视化和分析的强大工具

    The goal is to turn data into information, and information into insight. 目标是将数据转化为信息,并将信息转化为洞察力. ―Ca ...

  8. 【Elasticsearch】如何设计可扩展的 Elasticsearch 数据存储的架构

    1.概述 转载:如何设计可扩展的 Elasticsearch 数据存储的架构 Elasticsearch 允许您存储.搜索和分析大量的结构化或非结构化数据.因为在速度.可扩展性和灵活性方面拥有优势,E ...

  9. 数据迁移工具 - Flyway

    对于数据迁移的概念,相信大家已经都比较熟悉.那么,什么是数据迁移?为什么需要数据迁移?在这里就不再做相关分享啦~.接下来主要分享一下数据迁移工具 Flyway 使用. Flyway is the Ap ...

最新文章

  1. Can't create handler inside thread Thread that has not called Looper.prepare()
  2. mysql5.7 too many_Mysql 错误too many connections解决方案
  3. 快速理解编码,unicode与utf-8
  4. 文巾解题 100. 相同的树
  5. 成功解决 \tensorflow\…\datasets\mnist.py:290: DataSet.__init__ (from tensorflow.contrib.learn.python.lea
  6. python有向图_Python 中的垃圾回收机制
  7. Alamofire源码导读二:发起请求及内部加锁的逻辑
  8. win8学习--------File
  9. 从无到有整合SpringMVC-MyBatis项目(3):整合SpringMVC+Mybatis
  10. flash player for linux 64,64-bit linux下装什么adobe flash player
  11. Oracle中Number类型字段使用.netTiers和CodeSmith问题的解决方案
  12. 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 路由(二)之分库分表路由...
  13. CruiseControl服务器安装配置
  14. CAD中通过用户交互来选择对象
  15. 万能的林萧说:我来告诉你,一个草根程序员如何进入BAT。
  16. K8S调用GPU资源配置指南
  17. 区别:KL散度,JS散度,Wasserstein距离(EMD)
  18. (转)活出生命的成就
  19. 手把手学爬虫第六弹——数据可视化
  20. 数学图形(1.22) 蔓叶线

热门文章

  1. 基础——IOT(物联网)的七大通信协议
  2. 轻松实现在windows平台搭建Nexus私服
  3. 美团杯2020:查查查乐乐(dp)
  4. 专业清洁工 八款系统垃圾清理工具横向测评
  5. java实现文件压缩下载----压缩下载zip
  6. 「蓝玫瑰不会安眠」读后感
  7. 怎么恢复格式化的u盘文件
  8. AutoCAD中导入Inventor模型
  9. Computer:互联网开放平台项目知识补充之开发-运维-网络-网关等术语(DMZ、负载均衡、F5、Nginx、容器)的简介、使用方法之详细攻略
  10. 最全的android图片加密