kafka对接mysql_【Canal】利用canal实现mysql实时增量备份并对接kafka
简介
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
kafka: https://github.com/apache/kafka
RocketMQ : https://github.com/apache/rocketmq
本文中默认已经安装了kafka环境,仅做对接的演示;(若没有安装则需要提前安装kafka)
演示环境如下:
bigdata111
bigdata112
bigdata113
centos7.0
centos7.0
centos7.0
jdk1.8
jdk1.8
jdk1.8
zookeeper3.4
zookeeper3.4
zookeeper3.4
mysql5.7
canal-server
canal-server
canal-server
canal-admin
kafka2.11
kafka2.11
kafka2.11
1.修改canal.properties文件(三台机器)
[root@bigdata111 canal-server]# cd conf/
[root@bigdata111 conf]# ll
总用量 20
-rwxr-xr-x. 1 root root 291 3月 16 04:43 canal_local.properties
-rwxr-xr-x. 1 root root 5182 3月 16 04:54 canal.properties
drwxrwxrwx. 2 root root 47 3月 16 05:02 example
-rwxr-xr-x. 1 root root 3119 3月 16 04:43 logback.xml
drwxrwxrwx. 2 root root 38 3月 16 04:43 metrics
drwxrwxrwx. 3 root root 4096 3月 16 04:43 spring
[root@bigdata111 conf]# vi canal.properties
# 修改服务模式为kafka
canal.serverMode = kafka
# 修改kafka的对应服务器
canal.mq.servers = bigdata111:9092,bigdata112:9092,bigdata113:9092
2.修改instance.properties文件(三台机器)
[root@bigdata111 conf]# cd example/
[root@bigdata111 example]# ll
总用量 196
-rw-r--r--. 1 root root 196608 3月 16 04:43 h2.mv.db
-rwxr-xr-x. 1 root root 2037 3月 16 05:02 instance.properties
[root@bigdata111 example]# vi instance.properties
# 将数据发送到指定的topic
canal.mq.topic=test
3.启动zookeeper和Canal(三台机器)
启动zookeeper、查看zk的状态
[root@bigdata111 canal-server]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata111 canal-server]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@bigdata112 canal-server]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata112 canal-server]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@bigdata113 canal-server]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata113 canal-server]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
启动canal
[root@bigdata111 canal-server]# bin/startup.sh
[root@bigdata112 canal-server]# bin/startup.sh
[root@bigdata113 canal-server]# bin/startup.sh
4.启动kafka服务(三台机器)
[root@bigdata111 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
[root@bigdata112 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
[root@bigdata113 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
5.查看所有进程(三台机器)
[root@bigdata111 canal-server]# jps
5360 Kafka
4963 QuorumPeerMain
5699 Jps
5044 CanalLauncher
5.启动kafka消费者(bigdata113)
在bigdata113上启动消费者
kafka版本大于0.9以上,使用以下命令启动消费者:
[root@bigdata113 canal-server]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test
kafka版本小于0.9版本,使用以下命令启动消费者:
[root@bigdata113 canal-server]# kafka-console-consumer.sh --zookeeper bigdata113:2181 --from-beginning --topic test
如果在启动过程中报错:
zookeeper is not a recognized option
则是因为版本不同造成的,对应修改如上方式即可;
6.mysql操作数据
比如数据库中有一个stu表,我们向里面增加,更新,删除一个数据,看kafka消费者中的数据变化;
mysql> select * from stu limit 10;
+----+--------+------+---------------------+
| id | name | sex | stime |
+----+--------+------+---------------------+
| 1 | 张三 | 男 | 2019-09-23 17:25:07 |
| 2 | 李四 | 女 | 2019-09-23 17:25:13 |
| 3 | 李楠 | 男 | 2019-09-23 17:25:21 |
| 4 | 张畅 | 女 | 2019-09-23 17:25:30 |
| 5 | 李想 | 男 | 2019-09-23 17:25:38 |
| 6 | 赵街 | 男 | 2019-09-23 17:25:50 |
| 7 | 林安 | 男 | 2019-09-23 17:26:00 |
| 8 | 秦王 | 男 | 2019-09-23 17:45:47 |
| 9 | 纣王 | 男 | 2019-09-23 17:45:47 |
| 10 | 张楠 | 男 | 2019-09-23 17:45:47 |
+----+--------+------+---------------------+
10 rows in set (0.00 sec)
mysql> insert into stu(id,name,sex)values(99332,‘test111‘,‘男‘);
Query OK, 1 row affected (0.03 sec)
mysql> update stu set name=‘test222‘ where id = 99332;
Query OK, 1 row affected (0.07 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> delete from stu where id =99332;
Query OK, 1 row affected (0.03 sec)
kafka消费者变化:
[root@bigdata113 kafka-2.11]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test
{"data":[{"id":"9999","name":"张三22222","sex":"男","stime":"2020-03-16 06:24:53"}],"database":"student","es":1584311093000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584313187762,"type":"INSERT"}
[2020-03-16 07:19:38,216] INFO [GroupMetadataManager brokerId=113] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
{"data":[{"id":"99332","name":"test111","sex":"男","stime":"2020-03-16 07:22:02"}],"database":"student","es":1584314522000,"id":3,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314522967,"type":"INSERT"}
{"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314635000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":[{"name":"test111","stime":"2020-03-16 07:22:02"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314635674,"type":"UPDATE"}
{"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314658000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314658748,"type":"DELETE"}
【Canal】利用canal实现mysql实时增量备份并对接kafka
标签:null option inf name 服务器 tin zookeeper timestamp server
本条技术文章来源于互联网,如果无意侵犯您的权益请点击此处反馈版权投诉
本文系统来源:https://www.cnblogs.com/ShadowFiend/p/12522976.html
kafka对接mysql_【Canal】利用canal实现mysql实时增量备份并对接kafka相关推荐
- mysql实时增量备份
启用binlog日志实现对数据的增量备份: 日志存储位置: /var/lib/mysql/ 日志名称:主机名-bin.000001 或mysqld-bin.000001 binlog日志概述:二进制日 ...
- flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)
点击上方蓝色字体,关注我 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB). 准备 配置 ...
- linux mysql定时增量备份_Mysql 日常备份和增量备份脚本(Linux)
适合对象 本文是在linux下,mysql 4.1.14版本下测试的,经过适当修改可能适合mysql 4.0,5.0及其其他版本. 本文适合于没有启动复制功能的mysql,如果启动了复制,可能不需要采 ...
- mysql定时增量备份_Mysql日常自动备份和增量备份脚本
序 你是否在寻找一个MySQL备份脚本? 适合对象 本文是在Linux下,mysql 4.1.14版本下测试的,经过适当修改可能适合mysql 4.0,5.0及其其他版本. 本文适合于没有启动复制功能 ...
- 实验——MySQL数据库增量备份恢复
目录 一.MySQL数据库增量备份恢复 1.1 物理冷备份,开启服务 1.2 开启二进制日志文件 1.3 创建库和表,进行完全备份和增量备份 1.4 进行正常操作和误操作,进行增量备份 1.5 查看增 ...
- MySQL 数据增量备份
目录 MySQL 数据增量备份 binlog日志 日志概述 启用日志 自定义日志存储目录和日志文件名 手动生成新的日志文件 清理日志(删除已有的日志文件) 查看日志文件内容 使用binlog日志恢复数 ...
- 实战-MySQL定时增量备份(2)
阅读本文大约需要 9 分钟 实战-MySQL定时全量备份(1) 实战-MySQL定时增量备份(2) 实战-将MySQL备份上传到私有云(3) 概要 引言 增量备份 恢复增量备份 定时备份 引言 在产品 ...
- mysql增量_mysql实时增量备份
采用binlog日志的好处 掌控所有更改操作,必要时可用于恢复数据 数据库主从复制的必要条件 [linyouyi@localhost~]# vim /etc/my.cnf [mysqld] .. .. ...
- python比较数据库表今天跟前一天数据增量,Python 生产环境Mysql数据库增量备份脚本...
Mysql数据库常用的办法是通过mysqldump导出sql进行备份,但是不适合数据量很大的数据库,速度,锁表是两个严重的问题.前面写了一遍blog介绍xtrabackup的热备工具.下面的脚本是基于 ...
最新文章
- PHP根据时间戳返回星期几
- 嘿,开发者,你的坑,我来填!
- 客户机-服务器系统,什么是客户机/服务器计算
- java电脑类的接口_java 一个类实现两个接口的案例
- catia怎么将特征参数化_CATIA参数化建模及关系式的创建和使用 | 坐倚北风
- 如何保持最佳 MacBook 温度?
- 【Tensorflow/keras】KeyError: ‘loss‘
- 最新版华为HG255D硬件定义
- gif生成工具(免费)
- ssm教培管理系统毕业设计源码230932
- Python 硬核分析我国 14 亿人口,发现三大危机!
- 如何发送国际短信更便宜、更稳定?
- 我的RHCE认证考试经历
- 如何使用虚拟机运行“小HomeKit”智汀家庭云
- docker docker安装app
- Linux 安装qq农场小游戏
- 【opencv机器学习】基于SVM和神经网络的车牌识别
- Unity 碰撞检测
- tomcat配置静态资源访问
- 马云说聪明的人都离开了阿里,剩下的成了富翁
热门文章
- css之文字在图片上居中显示
- 计算机博士一年看多少篇文献,科学网—博士生真的需要一天看20篇文献吗? - 喻海良的博文...
- Exchange Server 2013 日常管理经典案例:统计邮箱使用情况
- 黑马程序员--飞行棋体会
- xp系统打不开征途服务器列表,win10系统与xp双系统安装打不开xp的解决方案
- 计算机财务管理第六章答案,2018年中级会计《财务管理》第六章课后精练-5
- ceph提示: non-power-of-two pg_num解决办法
- 阿里发布鲁班智能设计平台,将大规模赋能新商业
- oracle 备份 几种,Oracle数据库四种备份方法优缺点
- 深度技术 GHOSTXPSP3 快速装机专业版 V2011.04