写在前面的话

需求,将MySQL里的数据实时增量同步到Kafka。接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka。不过对比了一些工具,例如:Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些工具实现,配置了就可以读binlog,而client端是需要我们动手编写程序的,远没有达到我即插即用的期望和懒人的标准。

同步的格式

原作者的插件flume-ng-sql-source只支持csv的格式,如果开始同步之后,数据库表需要增减字段,则会给开发者造成很大的困扰。所以我添加了一个分支版本,用来将数据以JSON的格式,同步到kafka,字段语义更加清晰。

将此jar包下载之后,和相应的数据库驱动包,一起放到flume的lib目录之下即可。

处理机制

flume-ng-sql-source在【status.file.name】文件中记录读取数据库表的偏移量,进程重启后,可以接着上次的进度,继续增量读表。

启动说明

说明:启动命令里的【YYYYMM=201711】,会传入到flume.properties里面,替换${YYYYMM}

[test@localhost ~]$ YYYYMM=201711 bin/flume-ng agent -c conf -f conf/flume.properties -n sync &

-c:表示配置文件的目录,在此我们配置了flume-env.sh,也在conf目录下;

-f:指定配置文件,这个配置文件必须在全局选项的--conf参数定义的目录下,就是说这个配置文件要在前面配置的conf目录下面;

-n:表示要启动的agent的名称,也就是我们flume.properties配置文件里面,配置项的前缀,这里我们配的前缀是【sync】;

flume的配置说明

flume-env.sh

# 配置JVM堆内存和java运行参数,配置-DpropertiesImplementation参数是为了在flume.properties配置文件中使用环境变量

export JAVA_OPTS="-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"

flume.properties

# 数据来源

sync.sources = s-1

# 数据通道

sync.channels = c-1

# 数据去处,这里配置了failover,根据下面的优先级配置,会先启用k-1,k-1挂了后再启用k-2

sync.sinks = k-1 k-2

#这个是配置failover的关键,需要有一个sink group

sync.sinkgroups = g-1

sync.sinkgroups.g-1.sinks = k-1 k-2

#处理的类型是failover

sync.sinkgroups.g-1.processor.type = failover

#优先级,数字越大优先级越高,每个sink的优先级必须不相同

sync.sinkgroups.g-1.processor.priority.k-1 = 5

sync.sinkgroups.g-1.processor.priority.k-2 = 10

#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢

sync.sinkgroups.g-1.processor.maxpenalty = 10000

########## 数据通道的定义

# 数据量不大,直接放内存。其实还可以放在JDBC,kafka或者磁盘文件等

sync.channels.c-1.type = memory

# 通道队列的最大长度

sync.channels.c-1.capacity = 100000

# putList和takeList队列的最大长度,sink从capacity中抓取batchsize个event,放到这个队列。所以此参数最好比capacity小,比sink的batchsize大。

# 官方定义:The maximum number of events the channel will take from a source or give to a sink per transaction.

sync.channels.c-1.transactionCapacity = 1000

sync.channels.c-1.byteCapacityBufferPercentage = 20

### 默认值的默认值等于JVM可用的最大内存的80%,可以不配置

# sync.channels.c-1.byteCapacity = 800000

#########sql source#################

# source s-1用到的通道,和sink的通道要保持一致,否则就GG了

sync.sources.s-1.channels=c-1

######### For each one of the sources, the type is defined

sync.sources.s-1.type = org.keedio.flume.source.SQLSource

sync.sources.s-1.hibernate.connection.url = jdbc:mysql://192.168.1.10/testdb?useSSL=false

######### Hibernate Database connection properties

sync.sources.s-1.hibernate.connection.user = test

sync.sources.s-1.hibernate.connection.password = 123456

sync.sources.s-1.hibernate.connection.autocommit = true

sync.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

sync.sources.s-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver

sync.sources.s-1.run.query.delay=10000

sync.sources.s-1.status.file.path = /home/test/apache-flume-1.8.0-bin/status

# 用上${YYYYMM}环境变量,是因为我用的测试表示一个月表,每个月的数据会放到相应的表里。使用方式见上面的启动说明

sync.sources.s-1.status.file.name = test_${YYYYMM}.status

######## Custom query

sync.sources.s-1.start.from = 0

sync.sources.s-1.custom.query = select * from t_test_${YYYYMM} where id > $@$ order by id asc

sync.sources.s-1.batch.size = 100

sync.sources.s-1.max.rows = 100

sync.sources.s-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

sync.sources.s-1.hibernate.c3p0.min_size=5

sync.sources.s-1.hibernate.c3p0.max_size=20

######### sinks 1

# sink k-1用到的通道,和source的通道要保持一致,否则取不到数据

sync.sinks.k-1.channel = c-1

sync.sinks.k-1.type = org.apache.flume.sink.kafka.KafkaSink

sync.sinks.k-1.kafka.topic = sync-test

sync.sinks.k-1.kafka.bootstrap.servers = localhost:9092

sync.sinks.k-1.kafka.producer.acks = 1

# 每批次处理的event数量

sync.sinks.k-1.kafka.flumeBatchSize  = 100

######### sinks 2

# sink k-2用到的通道,和source的通道要保持一致,否则取不到数据

sync.sinks.k-2.channel = c-1

sync.sinks.k-2.type = org.apache.flume.sink.kafka.KafkaSink

sync.sinks.k-2.kafka.topic = sync-test

sync.sinks.k-2.kafka.bootstrap.servers = localhost:9092

sync.sinks.k-2.kafka.producer.acks = 1

sync.sinks.k-2.kafka.flumeBatchSize  = 100

flume各部分参数含义

batchData的大小见参数:batchSize

PutList和TakeList的大小见参数:transactionCapactiy

Channel总容量大小见参数:capacity

问题记录

异常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J

分析:由于我用的是flume1.8,而flume-ng-sql-1.4.3插件对应的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了两个方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失败补偿暂停线程处理时,需要用到这个方法。

解决方法:更新flume-ng-sql-1.4.3里依赖的flume-ng-core版本为1.8.0,并在源代码【SQLSource.java】里添加这两个方法即可。

@Override

public long getBackOffSleepIncrement() {

return 1000;

}

@Override

public long getMaxBackOffSleepInterval() {

return 5000;

}

mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume相关推荐

  1. mysql同步mongodb_MySQL数据实时增量同步到MongoDB

    一.go-mysql-transfer go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具.能够实时监听MySQL二进制日志(binlog)的变动,将变更内容形成指 ...

  2. 简单实现MySQL数据实时增量同步到Kafka————Maxwell

    任务需求:将MySQL里的数据实时增量同步到Kafka 1.准备工作 1.1.MySQL方面:开启BinLog 1.1.1.修改my.cnf文件 vi /etc/my.cnf [mysqld] ser ...

  3. flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)

    点击上方蓝色字体,关注我 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB). 准备 配置 ...

  4. 数据实时增量同步之CDC工具—Canal、mysql_stream、go-mysql-transfer、Maxwell

    数据实时增量同步之CDC工具-Canal.mysql_stream.go-mysql-transfer.Maxwell 什么是CDC? CDC工具对比 实现原理: Mysql binlog 讲解: m ...

  5. kettle spoon判断增量更新_使用Kettle实现数据实时增量同步--时间戳增量回滚同步...

    使用Kettle实现数据实时增量同步 0. 前言 本文介绍了使用Kettle对一张业务表数据(500万条数据以上)进行实时(10秒)同步,采用了时间戳增量回滚同步的方法.关于ETL和Kettle的入门 ...

  6. kettle 插入更新 数据增量_使用Kettle实现数据实时增量同步

    2018-09-28: 示例job已上传至github,地址见文末 0. 前言 本文介绍了使用Kettle对一张业务表数据(500万条数据以上)进行实时(10秒)同步,采用了时间戳增量回滚同步的方法. ...

  7. linux-windows主动推送文件同步目录数据 linux-windows数据目录同步

    linux->windows主动推送文件同步目录数据 linux-windows数据目录同步 1 .windows下安装openssh for windows工具,下载地址 https://ww ...

  8. elasticsearch date_MySQL数据实时增量同步到Elasticsearch

    Mysql到Elasticsearch的数据同步,一般用ETL来实现,但性能并不理想,目前大部分的ETL是定时查询Mysql数据库有没有新增数据或者修改数据,如果数据量小影响不大,但如果几百万上千万的 ...

  9. Centos7.0系统下Rsync+sersync实现多文件数据实时增量同步

    前言: 一.为什么要用Rsync+sersync架构? 1.sersync是基于Inotify开发的,类似于Inotify-tools的工具 2.sersync可以记录下被监听目录中发生变化的(包括增 ...

最新文章

  1. 【Qt】Qt样式表总结(四):CSS盒子模型
  2. 关系型数据库 和 非关系型数据对比 以及 MySQL与Oracle对比
  3. AS插件-Android Drawable Importer
  4. fft快速傅利叶变的C实现
  5. wpfdiagram 学习 教学_开启双自主学习模式 助力学生生命成长——长清湖实验学校开展“双自主合作学习”教学模式...
  6. access数据库文件导入mysql数据库文件怎么打开,Access数据库从外部数据导入文本文件和VF数据库-dbf文件怎么打开...
  7. Java之final详解
  8. JS 立即执行的函数表达式(function)写法
  9. 机器人图规划算法研究现状简述
  10. 读书书签-《高等应用数学问题的MATLAB求解》第三版,薛定宇 陈阳泉著
  11. [管理员手册](五)Ubuntu desktop 20.04系统安装显卡驱动NVIDIA cuda pytorch向日葵sunlogin安装
  12. ubuntu全版本安装 NVIDIA显卡驱动、以及重装、卸载
  13. 沈阳工业大学计算机专业排名,中国的大学计算机专业排名。
  14. android activity是什么呢
  15. 公众号点击图片变成另一张_朋友圈也能发九宫格图片,再也不犯选择困难症
  16. vmware-vmx.exe无法结束进程, 关闭Hyper-v虚拟服务
  17. Cesium 填挖方分析
  18. 图片转PDF有哪些软件?这几款软件建议收藏
  19. 颓废的人怎样振奋精神_5个令人振奋的行业,从事数据科学家在科技行业以外的工作...
  20. Spring框架的简介

热门文章

  1. 用ESP32怎么实现离线语音
  2. linux下编写时钟代码,Linux时间子系统之一:clock source(时钟源)【转】(示例代码)...
  3. linux rs232触摸屏驱动程序,Linux下的触摸屏驱动
  4. fileset java_Java FileSet.iterator方法代码示例
  5. 全球首个知识增强千亿大模型鹏城-百度·文心发布
  6. Python接口测试-模块引用与映射
  7. 温故之 “快速排序”
  8. springboot 中根据数据库表生成所有表的model,mapper和xml文件
  9. 了解mysqlpump工具
  10. Shell 脚本编程 基本语法: