kafka跨库同步mysql表_canal实时同步mysql表数据到Kafka
准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
安装canal.server
1 下载压缩包
到官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz
2 将canal.deployer 复制到固定目录并解压
mkdir -p /usr/local/canal
cp canal.deployer-1.1.1.tar.gz /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz
3 配置修改参数
a. 修改instance 配置文件,修改单个数据源的配置 vi conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
...
#需要同步的mysql地址,端口
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
#需要同步的具体表,也可以使用正则表达式监听多张表或者库
canal.instance.filter.regex=realtime.tpp_vehicle_through_points_history
...
# mq config
# topic的名称
canal.mq.topic=topic_start
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# 库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
b. 修改canal 配置文件,修改 Canal 全局设置 vi ./conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
# 更改模式,直接把数据扔进 Kafka
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
## Kafka 的地址
canal.mq.servers = 10.255.xx.xx:9092,10.255.xx.xx:9092,10.255.xx.xx:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
#canal.mq.flatMessage = false
canal.mq.flatMessage = true# 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
开启 Canal bin/startup.sh
查看日志是否有异常 vim logs/canal/canal.log vim logs/test/test.log
测试,连上数据库尝试执行更改 SQL
启动kafka的消费监控:kafka-console-consumer --bootstrap-server app02:9092 --from-beginning --topic topic_start
看得到同步的表名称和同步的数据类型:UPDATE,INSERT,DELETE
如果出现一些意外导致数据错误,数据不能采集,删除/opt/zp/canal/conf/example/h2.mv.db和meta.dat。重启canal
kafka跨库同步mysql表_canal实时同步mysql表数据到Kafka相关推荐
- MySQL到Elasticsearch实时同步构建数据检索服务的选型与思考
前言 本文具体探讨 MySQL 数据实时同步到 Elasticsearch (以下简称 ES ) 技术方案和思考,同时使用一定篇幅介绍一些前置知识,从理论到实践,让读者更好的理解这块内容和相关问题.包 ...
- es mysql 同步插件_[es和数据库怎么同步]mysql与elasticsearch实时同步常用插件及优缺点对比(ES与关系型数据库同步)...
目前mysql与elasticsearch常用的同步机制大多是基于插件实现的,常用的插件包括:elasticsearch-jdbc,elasticsearch-river-MySQL,go-mysql ...
- [生产库实战] 如何使用触发器对生产库上亿级大表进行实时同步
触发器迁移数据和Oracle 物化视图(MV)的原理相同,通过在源表创建Trigger,记录源表的DML操作日志,然后通过存储过程同步DML受影响的记录,达到目标表和源表数据一致的效果.此方法只是对P ...
- mysql数据库双向实时同步
1.需求描述 近期一个项目需要实现两个数据库间的数据双向实时同步. 2.解决思路 打算采用双主的架构实现这个需求,但是通常双主架构是为了备份,不是为了多点写入,所以实际项目中多个系统只访问双主中 的一 ...
- pg 同步到mysql_MySQL准实时同步到PostgreSQL, Greenplum的方案之一 - rds_dbsync
PostgreSQL , Greenplum , rds_dbsync , binlog rds_dbsync是阿里云数据库内核组开源的一个数据实时同步工具. 可以解析MySQL的binlog,或者P ...
- redis mysql原理_Canal(redis与mysql数据一致性)
canal实现原理 1.canal server端会伪装成mysql从节点,去读取mysql主节点binlog文件,实现增量同步 2.canal server端将数据以json格式同步到客户端,MQ消 ...
- linux 文件双向同步,Linux文件双向实时同步rsync
在线QQ客服:1922638 专业的SQL Server.MySQL数据库同步软件 echo"/usr/sersync/sersync2-d-o/usr/sersync/confxml.xm ...
- Linux平台上文件同步——rsync+inotify之实时同步
1 前言 1.1 概述 本文介绍使用rsync和 inotify-tools,实现linux 上的本地实时同步和本地定时同步的方法. 1.2 实验环境 服务器两台 操作系统: CentOS-7.4 软 ...
- 运维之道 | Linux rsync 文件同步、Inotify远程实时同步
Linux rsync 文件同步服务器 与传统的cp.scp.tar备份方式相比,rsync具有安全性高.备份迅速.支持增量备份等优点,通过rsync可以解决对实时性要求不高的数据备份需求,例如定期的 ...
最新文章
- android 8.0 ,9.0 静态广播不显示问题处理
- chrome 适配调试_移动端适配
- 洛谷P1456 Monkey King
- python生词本的生词_【Anki小工具】有道生词本转Anki 1.0
- JAVA a --; 与 -- a;
- (9)<textarea>标签在mac环境下的问题
- deepin 安装cuda 编译 ffmpeg
- 在C#中对列表/数组进行碎片整理——关闭所有空白
- matepad和鸿蒙,上手华为新 MatePad Pro :搭载鸿蒙系统后,生产力有何不同?
- 编码技术新突破:字节跳动AVG让视频缩小13%
- Spark kyro Serialization配置运行案例
- 如何const定义一个不可变数组
- 如何给REED3阅读器都opml文件
- java md5加密 32位 小写
- TCPMP-interface相关文件函数解析-Mediainfo.c
- Java使用EasyExcel下载xls、xlsx 出现文件格式与扩展名不匹配
- 谷歌卫星地图上的奇特景象
- Spring中常用注解及其作用(二)
- JavaScript奇淫技巧:按键精灵
- 【UCOSii源码解析】任务间通讯与同步