文章目录

  • 一、Flink简介与数据同步需求
    • 1、Flink简介
    • 2、数据同步需求
      • 1)支持多种部署模式
      • 2)分布式资源调度能力
      • 3)支持断点续传
    • 3、常见异构数据同步工具对比
  • 二、Flinkx框架实现与原理
    • 1、实现概览
    • 2、任务自动组装
    • 3、并发同步实现分析
      • 1)配置文件
      • 2)并发读取分析
    • 4、断点续传
      • 1) 概念解读
      • 2) Flink checkpoint机制
      • 3) MySQL同步时的断点续传
      • 4) 断点续传不是万能的
  • 三、开发一个Flinkx插件
    • 1、基础功能实现
      • 1) 命名约束
      • 2) 配置获取
      • 3) 构建数据输出
      • 4) row处理
    • 2、batch功能实现
      • 1) setBatchInterval
      • 2) rows处理
  • 四、Flinkx on yarn部署
    • 1、环境依赖
    • 2、部署细则
  • 五、其他
    • 1、补充
    • 2、参考

一、Flink简介与数据同步需求

1、Flink简介

Flink是新型的计算框架,具有分布式、低延迟、高吞吐和高可靠的特性。其支持多种部署方式:local(单机)、standalone模式,也可以基于yarn,mesos或者k8s做资源调度。Flink提供了比较高级的API,我们能比较方便地扩展现有的API来满足一些特殊需求,此外Flink提供了完整的状态管理体系(checkpoint),可以基于这个机制实现断点续传。

2、数据同步需求

这里主要说明的是离线数据同步。实时数据同步相对而言没有周期性的资源调度问题,原生的Flink框架结合其丰富的connector即可满足大部分需求。

1)支持多种部署模式

开发测试时可以单机部署,在生产环境支持分布式部署。

2)分布式资源调度能力

如果可以基于yarn,mesos或者k8s等做资源调度,可以极大提高资源利用率、提升运营效率!

3)支持断点续传

在大数据量的传输场景下,当网络出现抖动\DB抖动等情况时任务可能会失败。那这个时候重跑任务耗时耗力(血与泪)。此时就需要能从失败的点继续跑,也就是断点续传。

3、常见异构数据同步工具对比

Flinkx是袋鼠云开源的一款基于Flink的分布式数据同步工具(框架)。

工具 flinkx datax logkit sqoop
架构 分布式 单机(分布式版本未开源) 单机 分布式
同步速度控制 支持 支持 支持 不支持
脏数据管理 支持 支持 不支持 不支持
插件化 支持 支持 支持 不支持
断点续传 支持 不支持 不支持 不支持
配置方式 json json web界面 脚本

经过对比不难发现:少数据量、简单数据源下,阿里开源的datax等已经可以满足需求。但在我们复杂环境下Flinkx凭借其分布式架构与断点续传特点,具有明显优势,比较符合我们的需求。

二、Flinkx框架实现与原理

1、实现概览

FlinkX采用了一种插件式的架构来实现多种异构数据源之间的数据同步:

  • 不同的源数据库被抽象成不同的Reader插件;
  • 不同的目标数据库被抽象成不同的Writer插件;

理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

2、任务自动组装

Template模块根据同步任务的配置信息加载源数据库和目的数据库对应的Reader插件和Writer插件;

  • Reader插件实现了InputFormat接口,从源数据库中获取DataStream对象;
  • Writer插件实现了OutputFormat接口,将目的数据库与DataStream对象相关联;

Template模块通过DataStream对象将Reader和Writer串接在一起,组装成一个Flink任务,并提交到Flink集群上执行。

3、并发同步实现分析

这里以MySQL为例:

1)配置文件

配置文件中mysql reader部分如下:

"reader": {"name": "mysqlreader","parameter": {"username": "user","password": "password","connection": [{"table": ["data_test"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true"]}],"splitPk": "id"}
}

配置全文见:

https://github.com/DTStack/flinkx/blob/1.8.2/docs/rdbreader.md

2)并发读取分析

实际读取数据时首先要构造数据分片,构造数据分片就是根据通道索引和checkpoint记录的位置构造查询sql,sql模板如下:

select * from data_test
where id mod ${channel_num}=${channel_index}
and id > ${offset}

在没有出现下一节所说的断点续传问题时,多个通道对应的任务即为:

-- 通道一
select * from data_test
where id mod 2=0;-- 通道二
select * from data_test
where id mod 2=1;

通过对splitPk字段取模,就可以生成多个SQL并发地从db里面拉取数据,实现并发读取。

4、断点续传

1) 概念解读

断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可(类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行),可以大大节省时间和计算资源。

2) Flink checkpoint机制

Flinkx框架的断点续传是基于Flink的checkpoint机制实现,所以我们首先了解一下Flink的checkpoint机制实现:

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。

当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。

3) MySQL同步时的断点续传

checkpoint触发后,两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。Snapshot生成之后向数据流里面插入barrier,barrier随数据流向Writer。以Writer_0为例,Writer_0接收Reader_0和Reader_1发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0停止写出数据,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达之后再将Buffer里的数据全部写出,然后生成Writer的Snapshot,整个checkpoint结束后,记录的任务状态为:

  • Reader_0:id=12
  • Reader_1:id=11

任务如果异常结束恢复后,任务会给把各个通道记录的状态赋值给offset,再次读取数据时构造的sql为:

-- 第一个通道
select * from data_test
where id mod 2=0
and id > 12;-- 第二个通道
select * from data_test
where id mod 2=1
and id > 11;

这样就可以从上一次失败的位置继续读取数据了。

4) 断点续传不是万能的

  • 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键或者日期类型的字段。同步过程中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据;
  • 数据源必须支持数据过滤。如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复;
  • 目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持。

三、开发一个Flinkx插件

这里我们以改造后的stream writer插件为例(框架自带的stream插件不支持自定义分隔符和batch写入)

1、基础功能实现

1) 命名约束

flinkx框架本身对命名有一些约束:

  • module命名: <module>flinkx-stream</module>
  • package命名:com.dtstack.flinkx.stream.writercom.dtstack.flinkx.classloader.PluginUtil getPluginClassName 限制
  • class命名:├── StreamOutputFormat.java
    ├── StreamOutputFormatBuilder.java
    └── StreamWriter.java当命名不符合框架的约束时,会出现找不到对应class的异常。

2) 配置获取

首先StreamOutputFormatBuilder需要定义相关的set函数,如:

public void setWriteDelimiter(String writeDelimiter) {format.writeDelimiter = writeDelimiter;}

然后StreamWriter获取json里的配置,并由builder实例调用set函数,如:

writeDelimiter = config.getJob().getContent().get(0).getWriter().getParameter().getStringVal("writeDelimiter", "|");
builder.setWriteDelimiter(writeDelimiter);

3) 构建数据输出

StreamOutputFormat继承自RichOutputFormat,主要需要自定义以下逻辑:

  • openInternal:进行一些初始化的处理逻辑
  • writeSingleRecordInternal:定义单条数据处理逻辑
  • writeMultipleRecordsInternal:定义批量数据处理逻辑(见下一小节)
  • closeInternal:进行一些关闭的处理逻辑

4) row处理

writeSingleRecordInternal数据处理的对象是row,row是flink原生的结构org.apache.flink.types.Row,本质上是一个Arrays,主要使用如下方法:

  • getArity:row的长度
  • getField:获取指定位置的值
  • setField:修改指定位置的值
  • toString:将数组内的值以","分隔转成String

2、batch功能实现

在一些场景下,我们还需要进行batch处理以提升传输的性能,此时我们开发插件时需要启动writeMultipleRecordsInternal部分。

1) setBatchInterval

builder实例需要setBatchInterval

builder.setBatchInterval(batchInterval);

当batchInterval>1,才会调用writeMultipleRecordsInternal

2) rows处理

rows是前文提到的ArrayList<Row>结构,进行简单遍历即可处理。

注意:不同数据源batchInterval设置值是不同,并且有些数据源不支持batch操作。

四、Flinkx on yarn部署

1、环境依赖

我们需要一个Flink on yarn的集群:

  • flink:1.8.x
  • hadoop:2.8.5

在实际测试中,我们hadoop版本过低时会出现各种异常,建议使用 2.8.5版本。

2、部署细则

  1. node manager 所有节点需要拷贝Flink相关文件到yarn客户机相同目录,包括:bin、lib、plugins
  2. 启动一个session

yarnPer模式下可以不用手动启动session

  1. 提交flinkx任务bin/flinkx -mode yarn -job mysql_2_xx.json -pluginRoot /data/home/xx/flinkx/plugins -flinkconf /data/clusterserver/flink-1.8.1/conf/ -yarnconf /data/clusterserver/hadoop/etc/hadoop/ -confProp ‘{“flink.checkpoint.interval”:60000,“flink.checkpoint.stateBackend”:"/tmp/checkpoint_dir"}’ -s /tmp/checkpoint_dir/xx/chk-*
  2. 去flink ui和各自数据系统上验收相关数据了。

五、其他

1、补充

  • Flinkx目前官方文档较少,大部分时候需要阅读其源码才能解决问题。
  • 小数据场景下,Flinkx优势不是很大,毕竟集群启动任务调度等均需要时间。不同场景下的详细分析报告,敬请期待!

2、参考

flink官网:https://flink.apache.org/

flinkx官网:https://github.com/DTStack/flinkx

logkit:https://github.com/qiniu/logkit/blob/master/READMECN.md


文章首发: https://cloud.tencent.com/developer/article/1550695

【FlinkX】数据同步工具的研究与实践相关推荐

  1. 数据同步工具的研究(实时)

    数据同步工具的研究(实时同步): FlinkCDC.Canal.Maxwell.Debezium --2023年01月17日 --Yahui Di 1. 常用CDC方案比较 2. FlinkCDC F ...

  2. 【ATF】林伟:大数据计算平台的研究与实践

      2016 ATF阿里技术论坛于4月15日在清华大学举办,主旨是阐述阿里对世界创新做出的贡献.阿里巴巴集团技术委员会主席王坚,阿里巴巴集团首席技术官(CTO)张建锋(花名:行癫),阿里巴巴集团首席风 ...

  3. 【基础】:Rsync数据同步工具

    第二十一节 Rsync数据同步工具 1.1 Rsync介绍 1.1.1 什么是Rsync? 1.1.2 Rsync简介 1.3 Rsync的特性 1.1.4 Rsync的企业工作场景说明 1.2 Rs ...

  4. Linux实战教学笔记21:Rsync数据同步工具

    原文地址:https://www.cnblogs.com/chensiqiqi/p/6514315.html 目录 第二十一节 Rsync数据同步工具 1.1 Rsync介绍 1.1.1 什么是Rsy ...

  5. 【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

    欢迎关注博客主页:微信搜:import_bigdata,大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发 ...

  6. Linux的rsync远程数据同步工具

    Rsync(remote synchronize) 是一个远程数据同步工具,可以使用"Rsync算法"同步本地和远程主机之间的文件. rsync的好处是只同步两个文件不同的部分,相 ...

  7. ETL的数据同步工具调研(持续更新中)

    扯白了,数据同步工具就是"导数据 "的 名称 社区响应 国内使用情况(以前程无忧为参考) SQOOP 更新缓慢,对于hbase2.x以上版本使用时需要老版本的jar包 9页 Dat ...

  8. etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别

    什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...

  9. fox pro删除单条数据_Mac文件夹数据同步工具——Sync Folders Pro

    Mac版同步文件夹Pro(文件夹数据同步工具)分享给大家!Mac版同步文件夹Pro是一种功能强大的文件夹数据同步工具,可帮助您同步两一个文件夹的内容,包括任何子文件夹.使用文件夹同步软件,允许您在任一 ...

  10. DataX离线数据同步工具/平台

    DataX离线数据同步工具/平台 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.SQL Server.Oracle.PostgreSQL.HDFS.Hive.O ...

最新文章

  1. rbac 一个用户对应多个账号_电商后台系统:管理后台之账号管理(一)
  2. rocketmq存储结构_RocketMQ消息存储
  3. 【Pytorch神经网络理论篇】 40 Transformers中的词表工具Tokenizer
  4. 阅读《经济学人》,学会这样查词典,从此英语学习不求人
  5. element手机验证格式_vue+element表单验证(身份证(校验),手机号码,导游证号码)...
  6. 计算机主机组件图,电脑主机结构图大全
  7. Bandicam安装
  8. 用氦气(He)、氖气(Ne)、氩气(Ar)、118号元素(气奥)(Og,放射性,人造元素)组成的激光器
  9. netd模块工作流程
  10. C语言编程练习 7.13个人围成一圈,从第1个人开始顺序报号1、2、3,凡报到3的人退出圈子。
  11. IntelliJ IDEA 激活 破解补丁
  12. pgpool-Ⅱ一主两从同步流读写分离高可用方案测试
  13. 菜鸟教程C#高级教程
  14. 高德地图 gps坐标偏移到火星坐标系
  15. K8S 配置域名访问 Ingress【Traefik】
  16. 137.菜单悬停放大特效
  17. Win10 更新又爆 BUG:删除音频驱动;苹果拒绝法院送达禁令裁定
  18. java/php/net/python会员健身系统管理设计
  19. 移动市场平台MTK和android谁会笑到最后
  20. 天很蓝,应该不会下雨

热门文章

  1. Ubuntu18.04安装NVIDIA显卡驱动
  2. 【ubuntu如何录制gif图】
  3. 正则表达式提取身份证号码
  4. java rxtx 64_win7 64为下使用rxtx串口通信
  5. springboot 配置programe arguments 不生效
  6. testbed与 c++test 的几点区别
  7. matlab单回路和串级控制回路,单回路和串级控制系统仿真研究
  8. 似乎可以破解软件代码
  9. 自己开店用什么收银系统好-纳客收银系统
  10. 关于urule决策引擎客户端服务器配置的一些细节