mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume
写在前面的话
需求,将MySQL里的数据实时增量同步到Kafka。接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka。不过对比了一些工具,例如:Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些工具实现,配置了就可以读binlog,而client端是需要我们动手编写程序的,远没有达到我即插即用的期望和懒人的标准。
同步的格式
原作者的插件flume-ng-sql-source只支持csv的格式,如果开始同步之后,数据库表需要增减字段,则会给开发者造成很大的困扰。所以我添加了一个分支版本,用来将数据以JSON的格式,同步到kafka,字段语义更加清晰。
将此jar包下载之后,和相应的数据库驱动包,一起放到flume的lib目录之下即可。
处理机制
flume-ng-sql-source在【status.file.name】文件中记录读取数据库表的偏移量,进程重启后,可以接着上次的进度,继续增量读表。
启动说明
说明:启动命令里的【YYYYMM=201711】,会传入到flume.properties里面,替换${YYYYMM}
[test@localhost ~]$ YYYYMM=201711 bin/flume-ng agent -c conf -f conf/flume.properties -n sync &
-c:表示配置文件的目录,在此我们配置了flume-env.sh,也在conf目录下;
-f:指定配置文件,这个配置文件必须在全局选项的--conf参数定义的目录下,就是说这个配置文件要在前面配置的conf目录下面;
-n:表示要启动的agent的名称,也就是我们flume.properties配置文件里面,配置项的前缀,这里我们配的前缀是【sync】;
flume的配置说明
flume-env.sh
# 配置JVM堆内存和java运行参数,配置-DpropertiesImplementation参数是为了在flume.properties配置文件中使用环境变量
export JAVA_OPTS="-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"
flume.properties
# 数据来源
sync.sources = s-1
# 数据通道
sync.channels = c-1
# 数据去处,这里配置了failover,根据下面的优先级配置,会先启用k-1,k-1挂了后再启用k-2
sync.sinks = k-1 k-2
#这个是配置failover的关键,需要有一个sink group
sync.sinkgroups = g-1
sync.sinkgroups.g-1.sinks = k-1 k-2
#处理的类型是failover
sync.sinkgroups.g-1.processor.type = failover
#优先级,数字越大优先级越高,每个sink的优先级必须不相同
sync.sinkgroups.g-1.processor.priority.k-1 = 5
sync.sinkgroups.g-1.processor.priority.k-2 = 10
#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
sync.sinkgroups.g-1.processor.maxpenalty = 10000
########## 数据通道的定义
# 数据量不大,直接放内存。其实还可以放在JDBC,kafka或者磁盘文件等
sync.channels.c-1.type = memory
# 通道队列的最大长度
sync.channels.c-1.capacity = 100000
# putList和takeList队列的最大长度,sink从capacity中抓取batchsize个event,放到这个队列。所以此参数最好比capacity小,比sink的batchsize大。
# 官方定义:The maximum number of events the channel will take from a source or give to a sink per transaction.
sync.channels.c-1.transactionCapacity = 1000
sync.channels.c-1.byteCapacityBufferPercentage = 20
### 默认值的默认值等于JVM可用的最大内存的80%,可以不配置
# sync.channels.c-1.byteCapacity = 800000
#########sql source#################
# source s-1用到的通道,和sink的通道要保持一致,否则就GG了
sync.sources.s-1.channels=c-1
######### For each one of the sources, the type is defined
sync.sources.s-1.type = org.keedio.flume.source.SQLSource
sync.sources.s-1.hibernate.connection.url = jdbc:mysql://192.168.1.10/testdb?useSSL=false
######### Hibernate Database connection properties
sync.sources.s-1.hibernate.connection.user = test
sync.sources.s-1.hibernate.connection.password = 123456
sync.sources.s-1.hibernate.connection.autocommit = true
sync.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
sync.sources.s-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
sync.sources.s-1.run.query.delay=10000
sync.sources.s-1.status.file.path = /home/test/apache-flume-1.8.0-bin/status
# 用上${YYYYMM}环境变量,是因为我用的测试表示一个月表,每个月的数据会放到相应的表里。使用方式见上面的启动说明
sync.sources.s-1.status.file.name = test_${YYYYMM}.status
######## Custom query
sync.sources.s-1.start.from = 0
sync.sources.s-1.custom.query = select * from t_test_${YYYYMM} where id > $@$ order by id asc
sync.sources.s-1.batch.size = 100
sync.sources.s-1.max.rows = 100
sync.sources.s-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
sync.sources.s-1.hibernate.c3p0.min_size=5
sync.sources.s-1.hibernate.c3p0.max_size=20
######### sinks 1
# sink k-1用到的通道,和source的通道要保持一致,否则取不到数据
sync.sinks.k-1.channel = c-1
sync.sinks.k-1.type = org.apache.flume.sink.kafka.KafkaSink
sync.sinks.k-1.kafka.topic = sync-test
sync.sinks.k-1.kafka.bootstrap.servers = localhost:9092
sync.sinks.k-1.kafka.producer.acks = 1
# 每批次处理的event数量
sync.sinks.k-1.kafka.flumeBatchSize = 100
######### sinks 2
# sink k-2用到的通道,和source的通道要保持一致,否则取不到数据
sync.sinks.k-2.channel = c-1
sync.sinks.k-2.type = org.apache.flume.sink.kafka.KafkaSink
sync.sinks.k-2.kafka.topic = sync-test
sync.sinks.k-2.kafka.bootstrap.servers = localhost:9092
sync.sinks.k-2.kafka.producer.acks = 1
sync.sinks.k-2.kafka.flumeBatchSize = 100
flume各部分参数含义
batchData的大小见参数:batchSize
PutList和TakeList的大小见参数:transactionCapactiy
Channel总容量大小见参数:capacity
问题记录
异常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J
分析:由于我用的是flume1.8,而flume-ng-sql-1.4.3插件对应的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了两个方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失败补偿暂停线程处理时,需要用到这个方法。
解决方法:更新flume-ng-sql-1.4.3里依赖的flume-ng-core版本为1.8.0,并在源代码【SQLSource.java】里添加这两个方法即可。
@Override
public long getBackOffSleepIncrement() {
return 1000;
}
@Override
public long getMaxBackOffSleepInterval() {
return 5000;
}
mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume相关推荐
- mysql同步mongodb_MySQL数据实时增量同步到MongoDB
一.go-mysql-transfer go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具.能够实时监听MySQL二进制日志(binlog)的变动,将变更内容形成指 ...
- 简单实现MySQL数据实时增量同步到Kafka————Maxwell
任务需求:将MySQL里的数据实时增量同步到Kafka 1.准备工作 1.1.MySQL方面:开启BinLog 1.1.1.修改my.cnf文件 vi /etc/my.cnf [mysqld] ser ...
- flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)
点击上方蓝色字体,关注我 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB). 准备 配置 ...
- 数据实时增量同步之CDC工具—Canal、mysql_stream、go-mysql-transfer、Maxwell
数据实时增量同步之CDC工具-Canal.mysql_stream.go-mysql-transfer.Maxwell 什么是CDC? CDC工具对比 实现原理: Mysql binlog 讲解: m ...
- kettle spoon判断增量更新_使用Kettle实现数据实时增量同步--时间戳增量回滚同步...
使用Kettle实现数据实时增量同步 0. 前言 本文介绍了使用Kettle对一张业务表数据(500万条数据以上)进行实时(10秒)同步,采用了时间戳增量回滚同步的方法.关于ETL和Kettle的入门 ...
- kettle 插入更新 数据增量_使用Kettle实现数据实时增量同步
2018-09-28: 示例job已上传至github,地址见文末 0. 前言 本文介绍了使用Kettle对一张业务表数据(500万条数据以上)进行实时(10秒)同步,采用了时间戳增量回滚同步的方法. ...
- linux-windows主动推送文件同步目录数据 linux-windows数据目录同步
linux->windows主动推送文件同步目录数据 linux-windows数据目录同步 1 .windows下安装openssh for windows工具,下载地址 https://ww ...
- elasticsearch date_MySQL数据实时增量同步到Elasticsearch
Mysql到Elasticsearch的数据同步,一般用ETL来实现,但性能并不理想,目前大部分的ETL是定时查询Mysql数据库有没有新增数据或者修改数据,如果数据量小影响不大,但如果几百万上千万的 ...
- Centos7.0系统下Rsync+sersync实现多文件数据实时增量同步
前言: 一.为什么要用Rsync+sersync架构? 1.sersync是基于Inotify开发的,类似于Inotify-tools的工具 2.sersync可以记录下被监听目录中发生变化的(包括增 ...
最新文章
- 【Qt】Qt样式表总结(四):CSS盒子模型
- 关系型数据库 和 非关系型数据对比 以及 MySQL与Oracle对比
- AS插件-Android Drawable Importer
- fft快速傅利叶变的C实现
- wpfdiagram 学习 教学_开启双自主学习模式 助力学生生命成长——长清湖实验学校开展“双自主合作学习”教学模式...
- access数据库文件导入mysql数据库文件怎么打开,Access数据库从外部数据导入文本文件和VF数据库-dbf文件怎么打开...
- Java之final详解
- JS 立即执行的函数表达式(function)写法
- 机器人图规划算法研究现状简述
- 读书书签-《高等应用数学问题的MATLAB求解》第三版,薛定宇 陈阳泉著
- [管理员手册](五)Ubuntu desktop 20.04系统安装显卡驱动NVIDIA cuda pytorch向日葵sunlogin安装
- ubuntu全版本安装 NVIDIA显卡驱动、以及重装、卸载
- 沈阳工业大学计算机专业排名,中国的大学计算机专业排名。
- android activity是什么呢
- 公众号点击图片变成另一张_朋友圈也能发九宫格图片,再也不犯选择困难症
- vmware-vmx.exe无法结束进程, 关闭Hyper-v虚拟服务
- Cesium 填挖方分析
- 图片转PDF有哪些软件?这几款软件建议收藏
- 颓废的人怎样振奋精神_5个令人振奋的行业,从事数据科学家在科技行业以外的工作...
- Spring框架的简介
热门文章
- 用ESP32怎么实现离线语音
- linux下编写时钟代码,Linux时间子系统之一:clock source(时钟源)【转】(示例代码)...
- linux rs232触摸屏驱动程序,Linux下的触摸屏驱动
- fileset java_Java FileSet.iterator方法代码示例
- 全球首个知识增强千亿大模型鹏城-百度·文心发布
- Python接口测试-模块引用与映射
- 温故之 “快速排序”
- springboot 中根据数据库表生成所有表的model,mapper和xml文件
- 了解mysqlpump工具
- Shell 脚本编程 基本语法: