canal同步mysql数据到rocketmq集群
rockermq多主多从异步复制部署参考
canal github
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
工作原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
部署canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -xvf canal.deployer-1.1.6.tar.gz -C canal
1.配置canal基本属性
vim conf/canal.properties
canal.ip = 10.12.13.228
canal.port = 11111
canal.serverMode = rocketMQ
rocketmq.producer.group = canal_mq_es_group #生产者分组, 和在代码中指定的 group 的含义一致
rocketmq.namesrv.addr = 10.12.13:9876;10.12.13.224:9876;10.12.13.227:9876;10.12.13.228:9876;#mq 的 nameSvr 的ip地址
rocketmq.tag = binlog_es_tag#生产者生产消息时, 指定的 tag
2.配置canal的mysql
vim conf/example/instance.properties
canal.instance.mysql.slaveId=228#伪装为slave
canal.instance.master.address=10.12.13.97:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..*#要监听的数据库表, 正则表达式匹配模式, 没配置就是全部监听
canal.mq.topic=binlog_topic#生产者的发送消息时, 指定的 topic, 和 java代码的含义一致
3.启动canal
sh bin/startup.sh tail -fn 200 logs/canal/canal.log
2022-08-27 01:23:24.897 [Thread-9] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## canal server is down.
2022-08-27 01:23:26.702 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2022-08-27 01:23:26.712 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2022-08-27 01:23:26.761 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2022-08-27 01:23:26.849 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2022-08-27 01:23:26.877 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.13.228(10.12.13.228):11111]tail -fn 200 logs/example/example.log
查看 rocketmq_client.log 日志, 发现 canal 一直在像 mq 发送心跳检测, 并输出了 mq 的 group, 实例id信息
2022-08-27 02:43:27.845 [MQClientFactoryScheduledThread] INFO RocketmqClient - HeartbeatData [clientID=172.17.0.1@51284, producerDataSet=[ProducerData [groupName=CLIENT_INNER_PRODUCER], ProducerData [groupName=canal_mq_es_group]], consumerDataSet=[]]
2022-08-27 02:53:27.844 [MQClientFactoryScheduledThread] INFO RocketmqClient - send heart beat to broker[broker-b 0 192.168.216.224:10911] success
2022-08-27 02:53:27.845 [MQClientFactoryScheduledThread] INFO RocketmqClient - HeartbeatData [clientID=172.17.0.1@51284, producerDataSet=[ProducerData [groupName=canal_mq_es_group], ProducerData [groupName=CLIENT_INNER_PRODUCER]], consumerDataSet=[]]
2022-08-27 02:53:27.846 [MQClientFactoryScheduledThread] INFO RocketmqClient - send heart beat to broker[broker-a 0 192.168.216.223:10911] success
2022-08-27 02:53:27.846 [MQClientFactoryScheduledThread] INFO RocketmqClient - HeartbeatData [clientID=172.17.0.1@51284, producerDataSet=[ProducerData [groupName=canal_mq_es_group], ProducerData [groupName=CLIENT_INNER_PRODUCER]], consumerDataSet=[]]
登录 rocketMQ 可视化控制台, 可以看到 canal 注册到 mq 中的生产者实例, 以及 topic 信息, 这些信息和我们之前在 canal 的配置文件中配置的一致
4.修改myql数据发生到rockermq
mysql开启binlog
root账号新建一个名为canal的用户和名为canal的库并授权给canal:
use mysql;
-- 新建 canal 用户, 密码为 canal
CREATE USER canal IDENTIFIED BY 'canal';
-- 新建 canal 数据库并给 canal 用户授予权限
CREATE DATABASE canal CHARACTER SET utf8mb4;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT ALL PRIVILEGES ON canal.* TO 'canal'@'%';
FLUSH PRIVILEGES;
创建一个user表并添加几条数据用于后续测试:
CREATE TABLE `user` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户 id',`username` varchar(50) DEFAULT NULL COMMENT '用户名',`password` varchar(50) DEFAULT NULL COMMENT '密码',`email` varchar(45) DEFAULT NULL COMMENT '邮箱',`phone` varchar(15) DEFAULT NULL COMMENT '手机号码',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;INSERT INTO `canal`.`user` (`id`, `username`, `password`, `email`, `phone`) VALUES (1, 'xx', 'k0VP$l@ru', 't.wazmcs@qxfvsstyo.uy', '18145206808'),(2, 'xx', '8pig73*dW', 'h.wsecj@wmlp.li', '19832458514'),(3, 'aa', '5G)c@7RyV', 'c.afkrfcr@rnhewu.org.cn', '18656022523'),(4, 'cc', 'KjvLG*BP', 'r.tbnmdyh@pzzuo.jo', '18674498531'),(5, 'dd', '%fqmhybp3', 'o.hnlu@hhyvqxbv.eg', '18192674843');
修改表中数据,然后查看mq控制台消息
FAQ:
1.canal启动后11111端口未打开,只打开了11112端口,但是启动日志正常
答:11111端口是给tcp模式(netty)时候用的,你应该是用了kafka或者rocketmq吧,就不会去起这个端口了。可以看下 CanalController那一块初始化的逻辑(在构造函数里)
2.关闭canal后重新启动要删除
rm -rf conf/example/meta.dat
3.本人亲自对比使用mysql5和mysql8 ,发现mysql8会报,但不影响使用。myql5不会。
2022-08-27 05:49:13.920 [MultiStageCoprocessor-other-example-0] WARN com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: binlog.000002:4528
canal同步mysql数据到rocketmq集群相关推荐
- Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑
Elasticsearch集群部署 1.服务器规划 10.4.7.11 node1 10.4.7.12 node2 10.4.7.13 node3 1. 集群相关 一个运行中的 Elastics ...
- 实践练习四(必选):迁移 MySQL 数据到 OceanBase 集群
OceanBase Docker安装体验:https://www.xmmup.com/oceanbase-dockeranzhuangtiyan.html 手动部署 OceanBase 单副本集群:h ...
- Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB
摘要:本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的形式导入到 Kafka 中,继而被 Flink 消费的案例.内容包括: 背景介绍 环境介绍 部署 TiDB Clus ...
- 使用canal同步MySQL数据到Elasticsearch(ES)
目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...
- canal同步mysql数据
canal简介 canal官网文档 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 早期阿里巴巴因为杭州和美国双机房部署, ...
- mysql keepalived主主同步_KEEPALIVED+MYSQL主主同步=MYSQL高可用(HA)集群
1.这个环境最大的缺陷在于主机写入速度极慢,主键重复. 2.mysql最好采用5.6以上集群版本,5.5以下单线程版本不大适合.博客中的mysql为5.5,实际测试挺差的,换成5.6之后十分完美,请各 ...
- 实践练习四:迁移 MySQL 数据到 OceanBase 集群
练习目的 本次练习目的掌握从 MySQL 向 OceanBase 迁移数据的基本方法:mysqldump.datax .canal 等. 练习内容 请记录并分享下列内容: (必选)使用 mysqldu ...
- canal同步mysql到kafka_使用Canal同步MySQL数据到Kafka 得到的数据中sql字段无值-问答-阿里云开发者社区-阿里云...
这个应该跟你的binlog记录模式有关系,binlog有3中模式,ROW(行模式), Statement(语句模式), Mixed(混合模式)三种模式的用法如下: ROW(行模式):记录那条数据修改了 ...
- canal实现mysql数据同步
前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案例演示下利用canal实现my ...
最新文章
- python argparse模块_Python argparse模块应用实例解析
- pandas编写自定义函数计算多个数据列的加和(sum)、使用groupby函数和apply函数聚合计算分组内多个数据列的加和
- Python初识面向对象
- Delphi控制Excel输出上标示例
- python importlib_metadata_Python 动态导入对象,importlib.import_module()的使用方法
- WebUploader 设置单个文件上传
- xpath in biztalk
- [SPS2010] 使用心得 7 - ebook for Installation
- Verilog HDL中模块参数传递的方法
- python 三维绘图库_Python第三方库matplotlib(2D绘图库)入门与进阶
- treegrid.bootstrap使用说明
- java jxl 写 excel文件_Java使用jxl写入Excel文件
- 安装apache 后,找不到服务,解决办法
- 20154319 实验九web安全基础实践
- Codeforces Round #565 (div. 3)
- mysql ip 远程连接不上_【技术贴】解决MySql连接不上 ip远程连接Host is not allowed to conn-阿里云开发者社区...
- 安装cdr2019卡在正在下载_cdr2019最新版下载-coreldraw2019安装包中文版 - 极光下载站...
- bootice添加黑苹果引导_Clover Configurator黑苹果 Clover 引导配置工具
- vc语言c1083错误,VC Fatal Error C1083的几种解决方案
- NC15979 小q的数列