目录

前言

准备工作

MySQL开启Binlog

安装Canal Admin WEB UI

安装Canal Server

监控MySQL并导入Kafka

尾巴


前言

CDC(Change Data Capture)是一种捕获数据修改的技术方案,常常应用于异构数据源之间的数据同步。通常有两种解决方案:批式定时根据查询条件采集变更数据、监控数据源的操作日志。对于第一种批式的方案好处是门槛低成本低但坏处是可能会造成数据丢失以及数据有延迟等等,第二种日志方案好处很明显,比如对数据源侵入性低,数据时效性更高,不会有数据丢失的风险等等。在目前的关系型数据库中如MySQL、Oracle、SQLServer、Postgre等都提供了用于同步的日志解决方案,如MySQL基于binlog在主备之间进行同步。

本篇我们介绍的Canal由阿里开源,它将自己伪装成一个MySQL的Slave节点,通过拉取MySQL的binlog来达到实时采集数据库的操作日志的目的,在后面我们就可以将采集到的数据变更操作通过SparkStreaming、FlinkSQL、KafkaConnector等工具进行数据处理和导出到其他异构数据源。

准备工作

需要提前准备好下面几个组件的安装:

  1. JDK1.8:略
  2. MySQL5或8都行:Mysql 8.0.12解压版安装
  3. Kafka:Kafka系列(一)、2.6.0版本kafka集群搭建
  4. zookeeper:Zookeeper系列(三)、zk集群安装部署
  5. 下载Canal Admin 1.1.4版本:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
  6. 下载Canal Deployer 1.1.4版本:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

MySQL开启Binlog

上面也提到,基于日志的CDC方案在MySQL中就是通过binlog来记录的,因此第一步我们先将准备好的MySQL开启binlog,开启binlog的操作也很简单,只需要在my.cnf内加入下面的内容即可:

# 修改mysql配置文件
vim /etc/my.cnf-----------------------
[mysqld]
#开启binlog
log-bin=mysql-bin #日志记录方式,建议使用ROW模式
binlog-format=ROW #给当前mysql一个server id,之后的CDC工具里配置的不能跟这个重复
server_id=1 

开启binlog之后重启mysql服务:

#重复mysql服务
service mysqld restart#查看mysql服务状态
service mysqld status

进入mysql命令行,使用下面的命令查看binlog开启的情况:

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.01 sec)mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

由于canal伪装成MySQL的从节点,因此需要在被监控的MySQL上给canal创建一个拥有复制权限的用户:

#创建用户canal,密码canal_pwd
CREATE USER canal IDENTIFIED BY 'canal_pwd';    #如果创建用户时报错,密码不满足复杂度,可以使用下面的命令修改密码复杂度条件
#查看密码策略
SHOW VARIABLES LIKE 'validate_password%';
#设置密码策略为LOW
set global validate_password_policy=LOW;
#设置密码最短位数
set global validate_password_length=4;#授予canal用户复制binlog的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;#刷新权限
FLUSH PRIVILEGES; #查看canal用户的权限
show grants for 'canal' 

安装Canal Admin WEB UI

Canal Admin是一个是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的Web UI操作界面,方便用户快速和安全的操作,因此我们要先部署Canal admin,首先就要先给该应用一个管理数据库默认为canal_manager,这里我们就在被监控的MySQL实例上创建另一个用户canal_admin以及canal_manager库来作为canal admin的管理数据库(在生产环境中,Canal Admin的管理库和被监控binlog的MySQL一定是不在同一个MySQL实例上)。

使用root账户进入MySQL命令行执行下面的命令:

#创建用户canal_admin
CREATE USER canal_admin IDENTIFIED BY 'canal_admin_pwd';
GRANT ALL PRIVILEGES ON *.* TO 'canal_admin'@'%' ;
FLUSH PRIVILEGES; 

将下载到的CanalAdmin解压并配置yml文件:

#解压canal admin的tar包
tar -zxvf canal.admin-1.1.4.tar.gz -C /opt/app/canal_admin/#修改yml配置文件,主要修改canal admin的管理库DBhost和DBusername,DBpassword,最下面的canal admin的用户名和密码并不是canal admin web ui的用户,而是后门canal server和canal admin进行通信时的用户
vim /opt/app/canal_admin/conf/application.yml
-------------------------------------------------
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 192.168.142.91:3306database: canal_managerusername: canal_adminpassword: canal_admin_pwddriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin

使用canal_admin用户进入MySQL命令行,执行/opt/app/canal_admin/conf/canal_manager.sql 脚本:

# shell 进入mysql命令行
[root@wyk01 ~]# mysql -ucanal_admin -pcanal_admin_pwd# mysql 命令行 执行canal_manager.sql脚本
mysql> source /opt/app/canal_admin/conf/canal_manager.sql
Query OK, 1 row affected (0.05 sec)
Database changed
Query OK, 0 rows affected (0.00 sec)
...
...
Query OK, 0 rows affected (0.00 sec)

此时我们就准备好了canal admin服务,使用下面的命令启动:

/opt/app/canal_admin/bin/startup.sh 

在浏览器中输入ip:8089进入canal admin的web管理界面,默认管理员账号为admin/123456:

接下来我们通过canal admin管理界面来配置部署canal单机/集群。

点击操作-->主配置,配置该canal集群的主配置即对应之前canal版本中的canal.properties文件

点击载入模板

主要修改下面这些参数,其他参数默认即可:

# canal admin config
# 这里的密码就是前面在application.yml最后的参数提到的admin/admin,不过这里需要使用mysql加密后的密码,可以在mysql内通过命令 select password('canal') 获取加密串(去掉星号)
canal.admin.manager = 10.1.174.10:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.zkServers = wyk01:2181,wyk02:2181,wyk03:2181
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml#监控到的binlog输出到kafka
canal.serverMode = kafka
canal.mq.servers = wyk01:9092,wyk02:9092,wyk03:9092

安装Canal Server

接下来安装canal server,在1.1.4之前的版本中没有canal admin管理界面,我们只能通过修改canal.properties文件来给canal配置,有了canal admin之后,每个canal server作为一个分布式节点同享刚才我们创建集群时的主配置,因此在canal server这里只需要配置连接到canal admin的配置即可。

解压下载到的canal deployer,然后修改配置文件:

#解压canal deployer的tar包
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/app/canal_server/#修改配置文件
vim /opt/app/canal_server/conf/canal_local.properties-------------------------------------------
# register ip
# 每台canal server的ip不一样
canal.register.ip = wyk01#下面的配置在每台canal server上保持一致
# canal admin config
# 这里的密码就是前面在application.yml最后的参数提到的admin/admin,不过这里需要使用mysql加密后的密码,可以在mysql内通过命令 select password('canal') 获取加密串(去掉星号)
canal.admin.manager = wyk01:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
# 指向我们创建的mycanal集群
canal.admin.register.auto = true
canal.admin.register.cluster = mycanal
-------------------------------------------

使用下面的命令启动canal server,会自动添加到canal admin的server管理页面:

/opt/app/canal_server/bin/startup.sh local

到这一步,该canal集群里只有一台canal服务,也可以提供服务,不过为了保证HA,我们这里再添加两个canal server节点,将canal_server目录分发到另外两台节点,只需要修改canal_local.properties内的 canal.register.ip参数即可。

#一、wyk01: 将canal server 目录分发到wyk02,wyk03节点
scp -r /opt/app/canal_server root@wyk02:/opt/app/
scp -r /opt/app/canal_server root@wyk03:/opt/app/#二、wyk02 & wyk03: 修改canal_local.properties配置文件中下面的参数
vim /opt/app/canal_server/conf/canal_local.properties-------------------------------------------
# wyk02
# register ip
# 每台canal server的ip不一样
canal.register.ip = wyk02-------------------------------------------
# wyk03
# register ip
# 每台canal server的ip不一样
canal.register.ip = wyk03#三、wyk02 & wyk03: 重启动canal server
/opt/app/canal_server/bin/stop.sh
/opt/app/canal_server/bin/startup.sh local

重新启动wyk02,wyk03节点的canal server之后,在canal admin的server管理页面点击刷新列表即可看到三个canal server都在同一个canal cluster内了:

已注册到canal集群的canal server也可以在zookeeper的/otter/canal/cluster节点下看到:

[zk: localhost:2181(CONNECTED) 9] ls /otter/canal/cluster
[wyk01:11111, wyk02:11111, wyk03:11111]

监控MySQL并导入Kafka

我们已经部署好了canal集群,下面终于可以监控mysql的数据修改动作并将该动作导入kafka了,在之前1.1.4版本之前我们要配置instance.properties文件,通过canal admin,我们只需要在界面上载入模板修改对应参数即可应用到每个canal server:

本次测试我们来监控test_csdn这个库下的所有操作,只需要修改下面的参数即可,其他参数可以保持默认:

详细参数请参考官方文档:https://github.com/alibaba/canal/wiki/AdminGuide

# position info
canal.instance.master.address=wyk01:3306# username/password
# 这里的用户必须是刚才授权该库的binlog权限的用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_pwd# table regex
# 配置监控test_csdn库下的所有表,这里的正则可以定制匹配规则,具体可参考官方文档:https://github.com/alibaba/canal/wiki/AdminGuide
canal.instance.filter.regex=test_csdn\\..*# mq config
# 指定输出到kafka的topic为:wyk_csdn_canal
canal.mq.topic=wyk_csdn_canal

保存然后点击启动instance即可:

我们开启一个kafka的consuer命令行,然后在test_csdn库下做增删改查,验证kafka中有没有接收到CDC数据:

#开启kafka consumer
bin/kafka-console-consumer.sh --bootstrap-server wyk01:9092,wyk02:9092,wyk03:9092 --topic wyk_csdn_canal
-- 在mysql中造数据
create database test_csdn;
use test_csdn;-- 建表
create table wyk_csdn(id int,name varchar(20),ins_ts timestamp);-- 插入2条数据
insert into wyk_csdn values(1,'wyk1',current_timestamp());
insert into wyk_csdn values(2,'wyk2',current_timestamp());-- 更新id=2的数据
update wyk_csdn set name='wyk2_new' where id=2;-- 删除id=1的数据
delete from wyk_csdn where id=1;-- 清空表
truncate table wyk_csdn;-- 删除表
drop table wyk_csdn;-- 修改表结构
create table wyk_csdn2(id int,name varchar(20),ins_ts timestamp);
alter table wyk_csdn2 add column(add_c text);-- 修改表名
rename table wyk_csdn2 to wyk_csdn2_new;

Canal json数据格式如下:

{"data": null,"database": "test_csdn","es": 1600756599000,"id": 2,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "create table wyk_csdn(id int,name varchar(20),ins_ts timestamp)","sqlType": null,"table": "wyk_csdn","ts": 1600756600261,"type": "CREATE"
}
{"data": [{"id": "1","name": "wyk1","ins_ts": "2020-09-22 14:37:19"}],"database": "test_csdn","es": 1600756639000,"id": 3,"isDdl": false,"mysqlType": {"id": "int","name": "varchar(20)","ins_ts": "timestamp"},"old": null,"pkNames": null,"sql": "","sqlType": {"id": 4,"name": 12,"ins_ts": 93},"table": "wyk_csdn","ts": 1600756639895,"type": "INSERT"
}
{"data": [{"id": "2","name": "wyk2","ins_ts": "2020-09-22 14:37:19"}],"database": "test_csdn","es": 1600756639000,"id": 3,"isDdl": false,"mysqlType": {"id": "int","name": "varchar(20)","ins_ts": "timestamp"},"old": null,"pkNames": null,"sql": "","sqlType": {"id": 4,"name": 12,"ins_ts": 93},"table": "wyk_csdn","ts": 1600756639895,"type": "INSERT"
}
{"data": [{"id": "2","name": "wyk2_new","ins_ts": "2020-09-22 14:37:48"}],"database": "test_csdn","es": 1600756668000,"id": 4,"isDdl": false,"mysqlType": {"id": "int","name": "varchar(20)","ins_ts": "timestamp"},"old": [{"name": "wyk2","ins_ts": "2020-09-22 14:37:19"}],"pkNames": null,"sql": "","sqlType": {"id": 4,"name": 12,"ins_ts": 93},"table": "wyk_csdn","ts": 1600756668632,"type": "UPDATE"
}
{"data": [{"id": "1","name": "wyk1","ins_ts": "2020-09-22 14:37:19"}],"database": "test_csdn","es": 1600756682000,"id": 5,"isDdl": false,"mysqlType": {"id": "int","name": "varchar(20)","ins_ts": "timestamp"},"old": null,"pkNames": null,"sql": "","sqlType": {"id": 4,"name": 12,"ins_ts": 93},"table": "wyk_csdn","ts": 1600756682711,"type": "DELETE"
}
{"data": null,"database": "test_csdn","es": 1600756722000,"id": 6,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "truncate table wyk_csdn","sqlType": null,"table": "wyk_csdn","ts": 1600756722959,"type": "TRUNCATE"
}
{"data": null,"database": "test_csdn","es": 1600756727000,"id": 7,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "DROP TABLE `wyk_csdn` /* generated by server */","sqlType": null,"table": "wyk_csdn","ts": 1600756728388,"type": "ERASE"
}
{"data": null,"database": "test_csdn","es": 1600757190000,"id": 8,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "create table wyk_csdn2(id int,name varchar(20),ins_ts timestamp)","sqlType": null,"table": "wyk_csdn2","ts": 1600757192968,"type": "CREATE"
}
{"data": null,"database": "test_csdn","es": 1600757196000,"id": 9,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "alter table wyk_csdn2 add column(add_c text)","sqlType": null,"table": "wyk_csdn2","ts": 1600757201221,"type": "ALTER"
}
{"data": null,"database": "test_csdn","es": 1600758458000,"id": 10,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "rename table wyk_csdn2 to wyk_csdn2_new","sqlType": null,"table": "wyk_csdn2_new","ts": 1600758460224,"type": "RENAME"
}

尾巴

可以看到,Canal能够监控到数据库的alter table&view /create table&view/insert/update/delete/truncate/drop/rename等动作,已经非常的全面,甚至可以监控truncate动作,CanalAdmin为我们提供了可视化的配置,也给Canal提供了HA解决了单点故障的问题。后面介绍其他CDC工具如Maxwell、Debezium也可以实现类似的功能,区别其实都不大,都是通过监控binlog日志来进行数据变更的捕获,其次就是发出来的json格式不一样,获取这些数据变更动作之后,我们就可以有针对性的对它们进行处理同步到各种异构目标数据源了。

在后面的文章中我会介绍这三种主要的CDC工具的对比以及结合Flink 的使用。

希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

CDC系列(一)、Canal 集群部署及使用(带WebUI)相关推荐

  1. es系列:es集群部署

    下载 https://www.elastic.co/cn/downloads/elasticsearch 下载你需要的版本 准备三台Linux系统 192.168.28.129 192.168.28. ...

  2. mysql 集群 运维_【MySQL运维】Canal集群模式与多数据库同步部署

    一.Canal工作原理 Canal基于数据库的增量日志进行解析,然后提供增量数据的订阅和消费.Canal会将自己伪装成MySQL的 Slave去向主库进行同步请求,然后将获取到的binlog解析成特定 ...

  3. 从架构设计理念到集群部署,全面认识KubeEdge

    摘要:本篇文章将从KubeEdge架构设计理念.KubeEdge代码目录概览.KubeEdge集群部署三方面带大家认识KubeEdge. KubeEdge即Kube+Edge,顾名思义就是依托K8s的 ...

  4. Kubernetes(k8s)集群部署(k8s企业级Docker容器集群管理)系列目录

    0.目录 整体架构目录:ASP.NET Core分布式项目实战-目录 k8s架构目录:Kubernetes(k8s)集群部署(k8s企业级Docker容器集群管理)系列目录 一.感谢 在此感谢.net ...

  5. 架构系列三:使用Keepalived+Nginx+tomcat实现集群部署

    架构系列三:使用Keepalived+Nginx+tomcat实现集群部署 介绍了通过Nginx配置Tomct集群,当其中一个Tomcat服务停止后,Nginx可自动识别并选择另一个服务器响应用户请求 ...

  6. 架构系列二:使用Nginx+tomcat实现集群部署

    架构系列二:使用Nginx+tomcat实现集群部署 一.环境介绍  VM1:Ubuntu-S100 IP:192.168.130.128 部署Tomcat应用及Nginx  VM2:Ubuntu-S ...

  7. Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1.服务器规划 10.4.7.11 node1 10.4.7.12 node2 10.4.7.13 node3 1. 集群相关    一个运行中的 Elastics ...

  8. Kafka系列(七)、Kafka套件 Confluent Platform 单机/集群部署

    目录 简介 单机部署 集群部署 尾巴 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 k ...

  9. 【运维】K8S集群部署系列之ETCD集群搭建(四)

    ETCD集群扩容和缩容 本文将介绍生产环境下如何对ETCD集群进行扩容和缩容. 文章目录 ETCD集群扩容和缩容 新节点环境准备(node3) 下载安装包并初始化环境 网络准备 生成`node3`对等 ...

最新文章

  1. 三篇ISME讨论‘1%的微生物可培养’
  2. mac下使用git的冲突的解决方案
  3. 阿里云服务器如何选配?
  4. Python会干掉Java, 一统天下?
  5. 解决XCode安装插件后插件不能使用的问题(转载)
  6. ARM联合创始人:若被英伟达收购 将是一场灾难
  7. 一文了解下对小微风控策略的优雅调整
  8. L1-041 寻找250-PAT团体程序设计天梯赛GPLT
  9. 【项目实战一】基于人工神经网络ANN的车牌识别
  10. 用Google地图获取地图上某点的经纬度坐标
  11. 什么是web安全测试
  12. 几种国内芯片测序格式和 Illumina Omni 位点集格式的对比
  13. 红楼梦人物关系图,一代大师成绝响,下回分解待何人,kindle读书摘要
  14. omnet++ 第一个工程的创建
  15. 学习ELMo从文本中提取特征的分步NLP指南
  16. 黑苹果驱动板载intel蓝牙
  17. C语言文件操作之----文件的读写
  18. 开机 提示Reboot and select proper boot device or Insert Boot Media in selected Boot device and press a
  19. oracle 所有外键约束,如何在oracle中找到所有外键约束?
  20. Rose Data Modeler与数据库的建模

热门文章

  1. 七星在线农牧饲渔、电力等行业大涨
  2. 2017南京大学计算机考研答案,全新栏目!【真题解锁】第1期 | 实验设计:南京大学2017年学硕考题...
  3. R5F10268 FDL CS+ for CC 设置与实测数据
  4. 单元测试的框架Hamcrest
  5. 关于数学建模论文写作技巧的分析
  6. kubectl 创建pvc_k8s数据持久化之statefulset的数据持久化,并自动创建PV与PVC
  7. 群聊私聊天建群社交即时通讯H5系统开发
  8. 【 uC/OS II 】uC/OS II 源代码阅读(os_task.c)任务管理
  9. 因为计算机中丢失mfc100ud.dll,mfc100ud.dll
  10. WiFi常见业务下的大小包情况分析