数据同步-从MySQL到Tablestore

DataX是阿里集团广泛使用的离线数据导出工具, 本文将详细介绍如何从MySQL导出全量数据到Tablestore(OTS)中。

一、导出步骤

DataX工具目前已经在github上开源,可以从github上拉到源代码进行本地编译,也可以直接下载编译好的压缩包进行解压直接使用,这里选择本地编译方式。

1.下载源代码或压缩包

本机装好git工具后,直接执行以下操作:

git clone https://github.com/alibaba/DataX.git

如果要选择下载压缩包的方式,可以从DataX的github地址上获得下载链接:

https://github.com/alibaba/DataX

或者直接下载: DataX下载地址

下载完压缩包之后请直接解压缩,并直接进入步骤3。

2. Maven打包

cd到下载的源码目录,然后执行:

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

编译完成后, 在/target/datax/datax目录下会观察到如下几个目录:

bin    conf     job     lib    log    log_perf    plugin    script    tmp

bin目录中存放着可执行的datax.py文件,是整个DataX工具的入口

plugin目录中是支持各种类型数据源的reader和writer

conf中主要是存放core.json文件,文件中定义了一些缺省参数值如channle流控、buffer大小等参数,一般不随意修改。

注意:此步骤会在本地编译各种数据源的writer和reader,会花费比较长的时间,需要耐心等待。

3. 准备全量导出的json文件

{

"job": {

"content": [

{

"reader": {

"name": "mysqlreader", #指定使用mysqlreader读取

"parameter": {

"username": "username",#mysql用户名

"password": "password",#mysql密码

"connection": [

{

"querySql": [ #指定执行的SQL语句

"select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' "

],

"jdbcUrl": ["jdbc:mysql://10.10.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc连接串

]

}

]

}

},

"writer": {

"name": "otswriter",#指定使用otswriter进行写入

"parameter": {#数据源配置

"endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#ots实例的endpoint

"accessId":"xxxx",

"accessKey":"xxxx",

"instanceName":"smoke-test",#实例名

"table":"vip_quota",#写入目标的table名称

"primaryKey":[#主键名称和类型

{"name":"bucket_name", "type":"string"},

{"name":"delta", "type":"int"}

{"name":"timestamp", "type":"int"}

],

"column":[#其它column的名称和类型

{"name":"cdn_in","type":"int"},

{"name":"cdn_out","type":"int"},

{"name":"total_request","type":"int"},

],

"writeMode":"UpdateRow"#写入模式

}

}

}

]

}

}

以上为querySql模式导出。

或者,也可以配置成如下的table模式导出:

{

"job": {

"setting": {

"speed": {

"channel": 3 #指定channel个数,这个参数跟并发数密切相关

},

"errorLimit": {#容错限制

"record": 0,

"percentage": 0.02

}

},

"content": [

{

"reader": {

"name": "mysqlreader",#指定使用mysqlreader读取

"parameter": {

"username": "username",#mysql用户名

"password": "password",#mysql密码

"column": [ #table模式下可以指定需要查询哪些列

"bucket_name",

"timestamp" ,

"delta" ,

"cdn_in",

"cdn_out" ,

"total_request"

],

"splitPk": "timestamp",#指定split字段

"connection": [

{

"table": [#导出的表名

"vip_quota"

],

"jdbcUrl": ["jdbc:mysql://10.10.1.7:3306/db1"#jdbc连接串

]

}

]

}

},

"writer": {

"name": "otswriter",#指定使用otswriter进行写入

"parameter": {#数据源配置

"endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#ots实例的endpoint

"accessId":"xxx",

"accessKey":"xxx",

"instanceName":"smoke-test",#实例名

"table":"vip_quota",#写入目标的table名称

"primaryKey":[#主键名称和类型

{"name":"bucket_name", "type":"string"},

{"name":"delta", "type":"int"}

{"name":"timestamp", "type":"int"}

],

"column":[#其它column的名称和类型

{"name":"haha","type":"int"},

{"name":"hahah","type":"int"},

{"name":"kengdie","type":"int"},

],

"writeMode":"UpdateRow"#写入模式

}

}

}

]

}

}

上述配置文件中,可以看到,该json文件定义了一次数据导出导入的数据源信息和少量系统配置。

配置主要分两部分:

setting部分: 主要是speed(跟速率、并发相关)和errorLimit(容错限制)

content部分:主要是数据源信息, 包含reader和writer两部分

同时,配置中的MySQL应该确保执行DataX任务的机器能够正常访问;目标Tablestore表,可以通过控制台或则SDK提前建好。本例中Tablestore的表名为vip_quota, 定义了由3个column组成的PrimaryKey。

4. 执行同步命令

python datax.py -j"-Xms4g -Xmx4g" mysql_to_ots.json

-j"-Xms4g -Xmx4g"  可以限制jvm占用内存的大小,如果不指定,将会使用conf/core.json中的配置,默认是1G。

二、原理介绍

DataX进行数据同步的过程主要包括三部分:

数据源读取

DataX中的数据交换

数据目标端写入

在MySQL导出到Tablestore的场景中,对于MySQL数据源来说,DataX通过MySQL驱动使用reader中的MySQL连接串配置,直接发送SQL语句获取到查询数据,这些数据会缓存在本地jvm中,而后由writer线程将这些数据写入到Tablestore表中。

在DataX中,mysqlreader配置有两种模式,一种是table模式,另外一种是querySql模式,两种模式使用起来略有差别。

table模式

table模式的json配置文件请观察导出步骤的第3部分。

table模式下,用户不再需要自己写select语句,而是由DataX根据json中的的column、table、spliPk配置项,自行拼接SQL语句,观察执行日志如下:

在table模式下, channel个数决定了reader和writer的个数上限,假设为m个:

如果指定了splitPk字段,DataX会将mysql表中数据按照splitPk切分成n段,n大致为5倍的channel个数(有兴趣的同学可以去阅读一下DataX的源码)。splitPk的字段限制了必需是整型或者字符串类型。由于DataX的实现方式是按照spliPk字段分段查询数据库表,那么spliPk字段的选取应该尽可能的选择分布均匀且有索引的字段,比如主键id、唯一键等字段。DataX会启动m个reader线程,消费DataX切分好的n个查询sql语句(task), 对应的会有m个writer线程将查询出来的数据写入OTS表中,并行度为m(也就是配置的channel个数)。

如果不指定splitPk字段,DataX将不会进行数据的切分,并行度直接退化成1。

需要指出的是,table模式下,如果用户指定了spliPk将数据切分成了n段,由于这些task不是在同一个事务下进行select,那么最终取出的全量数据很有可能是不一致的。为了拿到一致性数据,要么不要配置spliPk使用单线程,要么确保mysql中要导出的数据不会再发生变化。

querySql模式

querySql模式一般用于有条件的数据导出。准备步骤中的第一个json文件就是一个典型的querySql模式配置。

在此模式下,DataX不会再按照指定的column、table参数进行sql的拼接,而是会直接略过这些配置(如果有),直接执行querySql语句,task数量总是1,因此在此模式下channel的配置不再有多线程的效果。

三、性能调优

有人肯定会有疑问,有什么办法可以尽可能加速数据的导出呢?

一般来说,大家首先想到的是提高并发度。在DataX中channel的个数决定了并发数量,但是要使channel参数生效,并不是简单配一下channel就完事了。在MySQL导入Tablestore表的场景下,channel生效仅在能够split出多个SQL语句的场景下,也就是table模式+spliPk下有用。

前面提到,DataX的数据同步涉及三部分:

1.数据读取

2.数据交换

3.数据写入

对于以上三个环节,都有不同的优化方式,分析如下。

1.数据读取

对于数据源读取,导出的两种模式:table模式和sqlQuery模式前面做了阐述,这里不再重复。

2. 数据交换

对于数据交换,前面提到,发送给MySQL数据库SQL语句后会得到查询的数据集,缓存在DataX的buffer中;除此之外,每个channel也维护了自己的record队列,如果存在并发,channel的个数越多,也会需要更多的内存。因此首先需要考虑的是jvm的内存大小参数,在导出步骤这一节中, -j参数可以用来指定jvm的内存大小。

除此之外,有几个控制channel的关键参数:

以上配置位于conf/core.json中:

capacity限制了channel中队列的大小(也就是最多缓存record的个数)

byteCapacity限制了record占用的内存大小,core.json中的默认配置是64MB,若不指定将会被配置为8MB

这两个参数决定了每个channel能buffer的记录数量和内存占用情况,如果有需要调整,用户应该按照DataX实际的运行环境予以配置。例如MySQL中每个record都比较大,那么可以考虑适当调高byteCapacity,当然调整这个参数还要考虑机器的内存情况。

一般情况下,channel队列本身配置的调整并不会很常见,但是对于另外几个流控参数,在使用DataX的时候应该注意。有两个常用的流控参数:

a. byte  限制通道的默认传输速率, -1表示不限制

b. record 限制通道的传输记录数,-1表示不限制

这两个参数都是在flowControlInterval间隔里采样后根据采样值来决定是否流控的。

{

"core": { #定义了全局的系统参数,不指定会使用默认值

"transport": {

"channel": {

"speed": {

"record": 5000,

"byte": 102400

}

}

}

},

"job": {

"setting": {

"speed": { #定义了单个channel的控制参数

"record": 10000,

},

"errorLimit": {

"record": 0,

"percentage": 0.02

}

},

"content": [

{

"reader": {

.....#省略

},

"writer": {

.....#省略

}

}

]

}

}

3.数据写入

对于数据写入,Tablestore是基于LSM设计的高性能高吞吐的分布式数据库产品,每一张表,都会被切分成很多的数据分区,分布在不同的服务器上,吞吐能力十分强悍。如果写入能够打散在所有的服务器上面,就能够利用所有服务器的服务能力,更高速地写入,也就是说表分区数量和吞吐能力是正相关的。正常情况下,新建的表默认分区数量都是1,这个数目会随着表的不断写入自动分裂不断增长,但是自动分裂的周期较长,对于新建表马上进行数据导入的情况,单分区很可能不够用导致导入不够顺畅。 推荐的做法,一般是在建表的时候,对表进行预分区,这样可以在一开始导入的时候就获得极好的性能,而不用等自动分裂。

另外适当的提高批量写入的批次大小(batchWriteCount),也可以有效地提高吞吐率。相关关键配置如下:

{

"job": {

"setting": {

....#省略

},

"content": [

{

"reader": {

.....#省略

},

"writer": {

"name": "otswriter",

"parameter": {

.......

"writeMode":"UpdateRow",

"batchWriteCount":100

}

}

}

]

}

}

4.总结

综合以上叙述,调优可以从以下几个方面着手:

1.在可能的情况下,无论是table模式还是sqlQuery模式,选好spliPk,写好where条件, 保证SQL的高效执行

2.jvm的内存大小要考虑进来,尤其在多channel生效的情况下,内存分配太小会严重限制DataX的吞吐

3.为了保证安全,可以综合考量channel的个数和流控参数,保证理论峰值不会对服务器产生过高的压力;

4.为了提升效率,可以适当提高channel的个数从而提高并发数,调高每个channel的byte和record限制,从而提高DataX的吞吐

5.对目标端Tablestore的表进行预分区,充分利用分布式存储的特点,将写入压力分散到多台机器上,提高写入速度;提高写入batch的大小也可以明显提高吞吐。

四、注意事项

reader和writer的字段映射关系是通过字段位置一一对应的,而非字段名

writer中的parameter中primaryKey的描述必须和Tablestore中定义的字段名、类型一一匹配,事实上DataX在启动的时候,也会去目标数据源中拉取表定义信息,如果对应不上,会直接抛异常

writer中的parameter中column中字端的数量和类型应该和querySql中select的字段一致,字段名可以不一样;如果没有指定querySql,而是通过table名表示全表导出,writer中的column也应该和table表中的字段对应

Table阿里云mysql_数据同步-从MySQL到Tablestore-阿里云开发者社区相关推荐

  1. 专业GIS制图、CAD制图、照片管理和数据同步等强大功能的地理信息云产品

    水经微图是在基于<水经注万能地图下载器>所有功能的基础之上,新增了专业GIS制图.CAD制图.照片管理和数据同步等强大功能的地理信息云产品,软件集在线地图大图拼接.地图重投影.瓦片重切片. ...

  2. 袋鼠云数据库数据同步之flinkx1.10版入门-搭配flink1.11

    1.前提介绍 公司最近有个项目要做数据库之间的数据离线同步,经过调研在git上发现了袋鼠云的flinkx插件,感觉很好用,我们可以理解flnkx就是封装了同步操作的jar包,调用起来也很方便,我们只需 ...

  3. 数据同步:MySQL到Elasticsearch

    目录 背景 1.基于应用程序多写 2.基于binlog订阅 2.1:canal 简介 工作原理 2.2.Databus 2.3.Maxwell 2.4.Flink CDC 2.5.DTS(阿里云) 2 ...

  4. Oracle数据库数据同步到mysql数据库(Oracle数据库备份dmp如何同步到mysql)

    Oracle数据库dmp转mysql格式 一.背景 一项目客户是oracle数据库,需要导一个表数据到自研系统mysql数据库,甲方已给出oracle格式dmp文件! 处理思路: ①.安装Oracle ...

  5. Kafka-connect将Kafka数据同步到Mysql

    一.背景信息 Kafka Connect主要用于将数据流输入和输出消息队列Kafka版.Kafka Connect主要通过各种Source Connector的实现,将数据从第三方系统输入到Kafka ...

  6. linux云安装mysql_1.Linux系统安装MySql数据库(阿里云)

    引言:很多新手同学不会用linux安装MySql,看文档就是做不出来. 如何在阿里云linux CentOS7快速安装Mysql?教你用简单的方式搞定! 安装MySQL5.7(注意,MySQL和系统务 ...

  7. 数据生态mysql_数据生态:MySQL复制技术与生产实践

    基础篇 第1章 的概述 1.1 适用场景 1.2 数据同步方法 1.3 数据同步类型 1.4 格式 第2章 的基本原理 2.1 概述 2.2 细节 第3章 格式详解 3.1 格式概述 3.2 格式明细 ...

  8. 腾讯云服务器数据盘买多了,腾讯云Windows云服务器数据盘分区和格式化

    Windows2008_64位系统手动格式化小于2TB数据盘操作指引 新购买的Windows云服务器,数据盘未做分区和格式化,无法使用. 请根据以下步骤手动对数据盘进行分区以及格式化.暂不支持对Win ...

  9. mongodb转mysql思路_脚本 将阿里云 mongodb数据转成mysql

    思路:mongodb是阿里云上,有定时备份,用python脚本去下载备份文件,恢复到我的环境 中,用脚本查出数据转换成sql插入mysql. 我的环境python是2.7.5.注:mysql端要预先建 ...

最新文章

  1. linux 子进程exit6,Linux内核之do_exit
  2. jQuery find 和 filter 方法的区别
  3. MongoDB实战指南(二):索引与查询优化
  4. php5 数据库框架,数据库 · FastAdmin - 基于ThinkPHP5的极速后台开发框架文档 · 看云...
  5. 盐城大数据产业园人才公寓_盐城市大数据产业园获评大众创业万众创新示范基地...
  6. 【GitHub】知识蒸馏从入门到精通
  7. 华为 “OSPF” 多区域配置
  8. 网络工程职业规划【转载】
  9. 基于AT89C51单片机的电子万年历PROTEUS仿真设计
  10. 抖音直播带货数据统计,抖音直播带货复盘必看的4个数据
  11. 用C语言和JS分别实现“个税年度汇算清缴”计算
  12. 岭南师范学院计算机考试考场,广东专插本考场安排在哪?附:2018年考场详细安排表~...
  13. 心跳之旅——iOS用手机摄像头检测心率(PPG)
  14. 如何部署搭建app服务端运行环境(java)?
  15. python脚本之批量查询网站权重2.0
  16. 关于DistroWatch
  17. 顺序队列求解迷宫问题
  18. linux分区如何4k对齐,Linux如何进行无损修复4K对齐?
  19. 软件设计师考试笔记-(10)
  20. Linux第一章:3.VMTools设置共享文件夹

热门文章

  1. 深入理解Java虚拟机(第三版)-13.Java内存模型与线程
  2. 怎么让前端项目运行起来_如何立即使您的前端项目看起来更好
  3. 使用React,TypeScript和Socket.io构建聊天应用
  4. 易于使用的人工智能_通过AI使网络更易于访问
  5. win8关机快捷键_关机这么简单的电脑操作,大家了解吗?
  6. PAT:组个最小数(C++)
  7. Python高级——赋值、浅拷贝与深拷贝
  8. 机器视觉与Tesseract介绍
  9. GridView CSS的样式表
  10. extjs设计器破解程序及开发调试工具