简介

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

kafka: https://github.com/apache/kafka

RocketMQ : https://github.com/apache/rocketmq

本文中默认已经安装了kafka环境,仅做对接的演示;(若没有安装则需要提前安装kafka)

演示环境如下:

bigdata111

bigdata112

bigdata113

centos7.0

centos7.0

centos7.0

jdk1.8

jdk1.8

jdk1.8

zookeeper3.4

zookeeper3.4

zookeeper3.4

mysql5.7

canal-server

canal-server

canal-server

canal-admin

kafka2.11

kafka2.11

kafka2.11

1.修改canal.properties文件(三台机器)

[root@bigdata111 canal-server]# cd conf/

[root@bigdata111 conf]# ll

总用量 20

-rwxr-xr-x. 1 root root 291 3月 16 04:43 canal_local.properties

-rwxr-xr-x. 1 root root 5182 3月 16 04:54 canal.properties

drwxrwxrwx. 2 root root 47 3月 16 05:02 example

-rwxr-xr-x. 1 root root 3119 3月 16 04:43 logback.xml

drwxrwxrwx. 2 root root 38 3月 16 04:43 metrics

drwxrwxrwx. 3 root root 4096 3月 16 04:43 spring

[root@bigdata111 conf]# vi canal.properties

# 修改服务模式为kafka

canal.serverMode = kafka

# 修改kafka的对应服务器

canal.mq.servers = bigdata111:9092,bigdata112:9092,bigdata113:9092

2.修改instance.properties文件(三台机器)

[root@bigdata111 conf]# cd example/

[root@bigdata111 example]# ll

总用量 196

-rw-r--r--. 1 root root 196608 3月 16 04:43 h2.mv.db

-rwxr-xr-x. 1 root root 2037 3月 16 05:02 instance.properties

[root@bigdata111 example]# vi instance.properties

# 将数据发送到指定的topic

canal.mq.topic=test

3.启动zookeeper和Canal(三台机器)

启动zookeeper、查看zk的状态

[root@bigdata111 canal-server]# zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@bigdata111 canal-server]# zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Mode: follower

[root@bigdata112 canal-server]# zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@bigdata112 canal-server]# zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Mode: follower

[root@bigdata113 canal-server]# zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@bigdata113 canal-server]# zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Mode: leader

启动canal

[root@bigdata111 canal-server]# bin/startup.sh

[root@bigdata112 canal-server]# bin/startup.sh

[root@bigdata113 canal-server]# bin/startup.sh

4.启动kafka服务(三台机器)

[root@bigdata111 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &

[root@bigdata112 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &

[root@bigdata113 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &

5.查看所有进程(三台机器)

[root@bigdata111 canal-server]# jps

5360 Kafka

4963 QuorumPeerMain

5699 Jps

5044 CanalLauncher

5.启动kafka消费者(bigdata113)

在bigdata113上启动消费者

kafka版本大于0.9以上,使用以下命令启动消费者:

[root@bigdata113 canal-server]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test

kafka版本小于0.9版本,使用以下命令启动消费者:

[root@bigdata113 canal-server]# kafka-console-consumer.sh --zookeeper bigdata113:2181 --from-beginning --topic test

如果在启动过程中报错:

zookeeper is not a recognized option

则是因为版本不同造成的,对应修改如上方式即可;

6.mysql操作数据

比如数据库中有一个stu表,我们向里面增加,更新,删除一个数据,看kafka消费者中的数据变化;

mysql> select * from stu limit 10;

+----+--------+------+---------------------+

| id | name | sex | stime |

+----+--------+------+---------------------+

| 1 | 张三 | 男 | 2019-09-23 17:25:07 |

| 2 | 李四 | 女 | 2019-09-23 17:25:13 |

| 3 | 李楠 | 男 | 2019-09-23 17:25:21 |

| 4 | 张畅 | 女 | 2019-09-23 17:25:30 |

| 5 | 李想 | 男 | 2019-09-23 17:25:38 |

| 6 | 赵街 | 男 | 2019-09-23 17:25:50 |

| 7 | 林安 | 男 | 2019-09-23 17:26:00 |

| 8 | 秦王 | 男 | 2019-09-23 17:45:47 |

| 9 | 纣王 | 男 | 2019-09-23 17:45:47 |

| 10 | 张楠 | 男 | 2019-09-23 17:45:47 |

+----+--------+------+---------------------+

10 rows in set (0.00 sec)

mysql> insert into stu(id,name,sex)values(99332,‘test111‘,‘男‘);

Query OK, 1 row affected (0.03 sec)

mysql> update stu set name=‘test222‘ where id = 99332;

Query OK, 1 row affected (0.07 sec)

Rows matched: 1 Changed: 1 Warnings: 0

mysql> delete from stu where id =99332;

Query OK, 1 row affected (0.03 sec)

kafka消费者变化:

[root@bigdata113 kafka-2.11]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test

{"data":[{"id":"9999","name":"张三22222","sex":"男","stime":"2020-03-16 06:24:53"}],"database":"student","es":1584311093000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584313187762,"type":"INSERT"}

[2020-03-16 07:19:38,216] INFO [GroupMetadataManager brokerId=113] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

{"data":[{"id":"99332","name":"test111","sex":"男","stime":"2020-03-16 07:22:02"}],"database":"student","es":1584314522000,"id":3,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314522967,"type":"INSERT"}

{"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314635000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":[{"name":"test111","stime":"2020-03-16 07:22:02"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314635674,"type":"UPDATE"}

{"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314658000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314658748,"type":"DELETE"}

【Canal】利用canal实现mysql实时增量备份并对接kafka

标签:null   option   inf   name   服务器   tin   zookeeper   timestamp   server

本条技术文章来源于互联网,如果无意侵犯您的权益请点击此处反馈版权投诉

本文系统来源:https://www.cnblogs.com/ShadowFiend/p/12522976.html

kafka对接mysql_【Canal】利用canal实现mysql实时增量备份并对接kafka相关推荐

  1. mysql实时增量备份

    启用binlog日志实现对数据的增量备份: 日志存储位置: /var/lib/mysql/ 日志名称:主机名-bin.000001 或mysqld-bin.000001 binlog日志概述:二进制日 ...

  2. flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)

    点击上方蓝色字体,关注我 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB). 准备 配置 ...

  3. linux mysql定时增量备份_Mysql 日常备份和增量备份脚本(Linux)

    适合对象 本文是在linux下,mysql 4.1.14版本下测试的,经过适当修改可能适合mysql 4.0,5.0及其其他版本. 本文适合于没有启动复制功能的mysql,如果启动了复制,可能不需要采 ...

  4. mysql定时增量备份_Mysql日常自动备份和增量备份脚本

    序 你是否在寻找一个MySQL备份脚本? 适合对象 本文是在Linux下,mysql 4.1.14版本下测试的,经过适当修改可能适合mysql 4.0,5.0及其其他版本. 本文适合于没有启动复制功能 ...

  5. 实验——MySQL数据库增量备份恢复

    目录 一.MySQL数据库增量备份恢复 1.1 物理冷备份,开启服务 1.2 开启二进制日志文件 1.3 创建库和表,进行完全备份和增量备份 1.4 进行正常操作和误操作,进行增量备份 1.5 查看增 ...

  6. MySQL 数据增量备份

    目录 MySQL 数据增量备份 binlog日志 日志概述 启用日志 自定义日志存储目录和日志文件名 手动生成新的日志文件 清理日志(删除已有的日志文件) 查看日志文件内容 使用binlog日志恢复数 ...

  7. 实战-MySQL定时增量备份(2)

    阅读本文大约需要 9 分钟 实战-MySQL定时全量备份(1) 实战-MySQL定时增量备份(2) 实战-将MySQL备份上传到私有云(3) 概要 引言 增量备份 恢复增量备份 定时备份 引言 在产品 ...

  8. mysql增量_mysql实时增量备份

    采用binlog日志的好处 掌控所有更改操作,必要时可用于恢复数据 数据库主从复制的必要条件 [linyouyi@localhost~]# vim /etc/my.cnf [mysqld] .. .. ...

  9. python比较数据库表今天跟前一天数据增量,Python 生产环境Mysql数据库增量备份脚本...

    Mysql数据库常用的办法是通过mysqldump导出sql进行备份,但是不适合数据量很大的数据库,速度,锁表是两个严重的问题.前面写了一遍blog介绍xtrabackup的热备工具.下面的脚本是基于 ...

最新文章

  1. PHP根据时间戳返回星期几
  2. 嘿,开发者,你的坑,我来填!
  3. 客户机-服务器系统,什么是客户机/服务器计算
  4. java电脑类的接口_java 一个类实现两个接口的案例
  5. catia怎么将特征参数化_CATIA参数化建模及关系式的创建和使用 | 坐倚北风
  6. 如何保持最佳 MacBook 温度?
  7. 【Tensorflow/keras】KeyError: ‘loss‘
  8. 最新版华为HG255D硬件定义
  9. gif生成工具(免费)
  10. ssm教培管理系统毕业设计源码230932
  11. Python 硬核分析我国 14 亿人口,发现三大危机!
  12. 如何发送国际短信更便宜、更稳定?
  13. 我的RHCE认证考试经历
  14. 如何使用虚拟机运行“小HomeKit”智汀家庭云
  15. docker docker安装app
  16. Linux 安装qq农场小游戏
  17. 【opencv机器学习】基于SVM和神经网络的车牌识别
  18. Unity 碰撞检测
  19. tomcat配置静态资源访问
  20. 马云说聪明的人都离开了阿里,剩下的成了富翁

热门文章

  1. css之文字在图片上居中显示
  2. 计算机博士一年看多少篇文献,科学网—博士生真的需要一天看20篇文献吗? - 喻海良的博文...
  3. Exchange Server 2013 日常管理经典案例:统计邮箱使用情况
  4. 黑马程序员--飞行棋体会
  5. xp系统打不开征途服务器列表,win10系统与xp双系统安装打不开xp的解决方案
  6. 计算机财务管理第六章答案,2018年中级会计《财务管理》第六章课后精练-5
  7. ceph提示: non-power-of-two pg_num解决办法
  8. 阿里发布鲁班智能设计平台,将大规模赋能新商业
  9. oracle 备份 几种,Oracle数据库四种备份方法优缺点
  10. 深度技术 GHOSTXPSP3 快速装机专业版 V2011.04