目录

Transporter 安装

Go安装

Git安装

Clone transporter源码

Build transporter源码

准备MongoDB 数据

建立elasticsearch索引

配置Transporter

1. 初始化基本的pipeline

2. 设置pipeline的source,transform,sink

3.测试连接

4. 迁移数据

参考文档


Transporter 安装

transporter 是通过go语言编写的,所以先要安装Go。以下通过windows进行举例。

Go安装

  1. 去https://golang.org/dl/下载Go的安装包, 如go1.10.3.windows-amd64.zip。
  2. 解压下载的安装包go1.10.3.windows-amd64.zip。
  3. 设置Go的系统环境变量,GO_HOME=E:\softwares\go, 具体路径根据安装路径而定。
  4. 在安装路径下新建pkg,bin, src目录, 一般解压之后就已经存在了。
  5. 在src目录下新建目录:Go安装路径\src\github.com\compose。如E:\softwares\go\src\github.com\compose
C:\Users\flypig>go version
go version go1.10.3 windows/amd64

执行go version 命令查看是否设置正确,出现上图则Go安装成功。

Git安装

若机器上已经安装git,则忽略;没安装则安装git。git下载:https://git-for-windows.github.io/

Clone transporter源码

进入刚刚建立的路径:E:\softwares\go\src\github.com\compose 拉取transporter源码。

git clone https://github.com/compose/transporter

Build transporter源码

transporter源码拉下来之后进入 transporter目录,E:\softwares\go\src\github.com\compose\transporter

go get -a ./...
go build ./cmd/transporter

build之后,在当前目录将出现transporter.exe文件。

到此,transporter就安装好了。

准备MongoDB 数据

在MongoDB中, 创建一个测试collection: testcol。

插入几条数据至testcol, 如:

/* 1 */
{"_id" : ObjectId("5b47061b7228335bdf97ba88"),"firstName" : "Robert","lastName" : "Baratheon"
}/* 2 */
{"_id" : ObjectId("5b4706327228335bdf97ba89"),"firstName" : "John","lastName" : "Snow"
}/* 3 */
{"_id" : ObjectId("5b4745d97228335bdf97ba8a"),"firstName" : "张三","lastName" : "李四"
}

接下来,将要通过transporter 将testcol数据迁移至elasticsearch。

建立elasticsearch索引

transporter在迁移时会自动为testcol建立默认的索引,一般不建议这么做,所以先在elasticsearch自定义为testcol创建索引。put http://localhost:9200/index_gltestcol_v1/, aliases用于设置索引index_gltestcol_v1的别名为index_testcol。设置了3个分片,每个分片1个副本。mapping为testcol设置索引映射,其中,为firstName设置了ik_max_word中文分词器,若elasticsearch没有安装ik插件,则改成你想要的即可。此处的索引可根据实际情况而定,testcol只是个例子。

{"aliases": {"index_testcol": {}},"settings": {"index": {"refresh_interval": "30s","number_of_shards": "3","number_of_replicas": "1"}},"mappings": {"testcol": {"properties": {"id": {"type": "string"},"firstName": {"type": "string","store": "true","analyzer": "ik_max_word"},"lastName": {"type": "string","index": "not_analyzed","store": true}}}}
}

索引建立好之后,可通过http://localhost:9200/index_gltestcol_v1?pretty进行查看。

配置Transporter

1. 初始化基本的pipeline

进入E:\softwares\go\src\github.com\compose\transporter目录,执行transporter init mongodb elasticsearch,执行之后,当前目录出现pipeline.js配置文件。

transporter init mongodb elasticsearch 

pipeline.js:

var source = mongodb({"uri": "${MONGODB_URI}"// "timeout": "30s",// "tail": false,// "ssl": false,// "cacerts": ["/path/to/cert.pem"],// "wc": 1,// "fsync": false,// "bulk": false,// "collection_filters": "{}",// "read_preference": "Primary"
})var sink = elasticsearch({"uri": "${ELASTICSEARCH_URI}"// "timeout": "10s", // defaults to 30s// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service// "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

2. 设置pipeline的source,transform,sink

var source = mongodb({"uri": "mongodb://user:pasword$@host:30011/mytestdb"// "timeout": "30s",// "tail": false,// "ssl": false,// "cacerts": ["/path/to/cert.pem"],// "wc": 1,// "fsync": false,// "bulk": false,"collection_filters": "{\"testcol\":{\"firstName\":\"Robert\"}}",// "read_preference": "Primary"
})var sink = elasticsearch({"uri": "http://localhost:9200/index_testcol"// "timeout": "10s", // defaults to 30s// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
})//t.Source(source).Save(sink)
// t.Source("source", source).Save("sink", sink)
//source的MongoDB collection 名要和 索引的type名称一样, 而且是将gl_topic*匹配的表都导进去了,没法指定精确的某个表迁移至ESt.Source("source", source, "/^testcol$/").Transform(goja({"filename":"mytransform/addfullname.js"})).Save("sink", sink, "/^testcol$/")

collection_filters 是个过滤器,如只迁移firstName=“Robert”的,可设置为"collection_filters": "{\"testcol\":{\"firstName\":\"Robert\"}}"。MongoDB作为source,elasticsearch作为sink。index_testcol此处采用别名,方便变更elasticsearch索引指向新的索引。Transform(goja({"filename":"mytransform/addfullname.js"})) 为transformer配置,通过native transformer goja执行JavaScript脚本addfullname.js。addfullname.js如下:

function transform(doc) {//doc._id = doc._id['$oid']; doc["data"]["fullName"] = doc["data"]["firstName"] + " " + doc["data"]["lastName"];return doc;
}

新增fullName字段,其值为firstName 和lastName通过空格连接。

3.测试连接

执行transporter test pipeline-test.js 测试连接是否正常。

e:\softwares\go\src\github.com\compose\transporter>transporter test pipeline-test.js
Transporter:- Source:         source                                   mongodb         ^testcol$- Sink:          sink                                     elasticsearch   ^testcol$

4. 迁移数据

执行transporter run pipeline-test.js启动数据迁移。

e:\softwares\go\src\github.com\compose\transporter>transporter run pipeline-test.js
[36mINFO[0m[0000] adaptor Listening...                          [36mname[0m=sink [36mpath[0m=source/sink [36mtype[0m=elasticsearch
[36mINFO[0m[0000] starting with metadata map[]                  [36mname[0m=source [36mpath[0m=source [36mtype[0m=mongodb
[36mINFO[0m[0000] boot map[sink:elasticsearch source:mongodb]   [36mts[0m=1533038590485455300
[36mINFO[0m[0000] adaptor Starting...                           [36mname[0m=source [36mpath[0m=source [36mtype[0m=mongodb...[36mINFO[0m[0000] metrics source records: 1                     [36mpath[0m=source [36mts[0m=1533038591316015900
[36mINFO[0m[0000] metrics source/sink records: 1                [36mpath[0m=source/sink [36mts[0m=1533038591316015900
[36mINFO[0m[0000] exit map[source:mongodb sink:elasticsearch]   [36mts[0m=1533038591316015900

records:1 表明命中一条记录,并执行迁移。

执行没有报错,则可以去elasticsearch查看是否成功迁移。

其中 fullName为新增字段。当然除了增加字段,transformer还可以执行很多事情,如omit,pick,skip等,可以参考前一篇博客关于transporter的介绍https://blog.csdn.net/zhujq_icode/article/details/81297388。

参考文档

1. https://blog.csdn.net/zhujq_icode/article/details/81297388

2. https://github.com/compose/transporter/blob/master/READMEWINDOWS.md

3. https://www.digitalocean.com/community/tutorials/how-to-sync-transformed-data-from-mongodb-to-elasticsearch-with-transporter-on-ubuntu-16-04

通过Transporter迁移MongoDB 数据至elasticsearch相关推荐

  1. ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch

    概述 把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合一些开源的工具比如阿里的 cana ...

  2. 完美数据迁移-MongoDB Stream的应用

    目录 一.背景介绍 二.常见方案 1. 停机迁移 2. 业务双写 3. 增量迁移 三.Change Stream 介绍 监听的目标 变更事件 四.实现增量迁移 五.后续优化 小结 附参考文档 一.背景 ...

  3. mongodb 导出到sqlserver_迁移sqlserver数据到MongoDb的方法

    迁移sqlserver数据到MongoDb的方法 前言 随着数据量的日积月累,数据库总有一天会不堪重负的,除了通过添加索引.分库分表,其实还可以考虑一下换个数据库.我强烈推荐使用MongoDb,我举例 ...

  4. python脚本迁移数据库_Python迁移MySQL数据到MongoDB脚本

    MongoDB是一个文档数据库,在存储小文件方面存在天然优势.随着业务求的变化,需要将线上MySQL数据库中的行记录,导入到MongoDB中文档记录. 一.场景:线上MySQL数据库某表迁移到Mong ...

  5. docker版mongodb数据同步到elasticsearch

    MongoDB  和 Elasticsearch 同步方式有两种,一种为搭建集群,另一种为配置MongoDB的副本集,本案在docker上部署,所以选择配置副本集方式. 说明:为了能够使得 Mongo ...

  6. mongodb数据同步到elasticsearch的中间件,支持全量,增量,实时同步等多种同步情景。(syncs MongoDB to Elasticsearch in realtime) (Mong

    GitHub - levonmo/mongo-sync-elasticsearch: mongodb数据同步到elasticsearch的中间件,支持全量,增量(新增修改删除),实时同步等多种同步情景 ...

  7. MongoDB数据迁移之迁移工具Kettle

    MongoDB数据迁移之迁移工具Kettle ETL:简介   ETL(Extract-Transform-Load的缩写,即数据抽取.转换.装载的过程),对于企业或行业应用来说,我们经常会遇到各种数 ...

  8. 获取mongodb数据变更_MongoDB Stream是如何实现完美数据增量迁移的?

    原标题:MongoDB Stream是如何实现完美数据增量迁移的? 作者介绍唐卓章(zale),华为技术专家,多年互联网研发/架设经验,关注NoSQL中间件高可用及弹性扩展,在分布式系统架构性能优化方 ...

  9. mongodb数据合并设计_「时间序列数据」和MongoDB(二)-模式设计最佳实践

    在上一篇博客文章时间序列数据与MongoDB:第一部分-简介中,我们介绍了时间序列数据的概念,然后介绍了一些可以用于帮助收集时间序列应用程序需求的发现问题.对这些问题的回答有助于指导支持大容量生产应用 ...

最新文章

  1. 使用AsyncTask实现图片加载
  2. android log system
  3. 8皇后以及N皇后算法探究,回溯算法的JAVA实现,非递归,数据结构“栈”实现
  4. 哈夫曼字符串编码c语言实现,基于哈夫曼(haffuman)算法的文件压缩的实现(C语言)(原创)...
  5. Mouse Event (Java AWT)
  6. 机器学习8/100天-Logistic回归原理与实现
  7. DHCP服务、NFS、vsftp服务的简单搭建
  8. 使用MediaCodec硬解码h.265视频及音频进行播放
  9. CentOS设置工作区个数
  10. codevs 4189 字典
  11. 文件夹复制命令 linux,linux拷贝文件夹命令
  12. 淘宝天猫返利查券机器人搭建
  13. Spark综合学习笔记(五)SparkStreaming介绍
  14. win10电脑风扇一直转解决方法
  15. 计算机专业课顺序,计算机专业课程安排顺序 计算机专业课程安排
  16. 人脸面部情绪识别(一)
  17. 2021-10-30 字典
  18. IBM X 3650 M3服务器RAID0设置
  19. 痞子衡嵌入式:我被邀请做科锐国际旗下数科同道主办的技术沙龙嘉宾
  20. Action Recognition(行为识别)

热门文章

  1. 电子商务系统的测试(十四)
  2. 《C++ Primer》第15章 15.2节习题答案
  3. Qt 如何将QPushButton弹起
  4. win7系统做网站服务器,win7系统做网站服务器
  5. GPUImage.h简单说明
  6. usb的device模式hid配置错误点
  7. 万代南梦宫(中国)旗下数字娱乐、玩具娱乐业务整合;IBM简化混合云上关键任务应用的现代化进程 | 全球TMT...
  8. 急!急!急!如何申请公网ip
  9. 【C/C++】从API学习STL algorithm 001(for_each、find、find_if、find_end、find_first_of 快到碗里来(◕ᴗ◕✿)
  10. 修改新网域名DNS服务器到DNSPOD解析服务