阿里canal简介及canal部署、原理和使用介绍

canal入门

什么是canal

阿里巴巴B2B公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求,从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)。

这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具:

canal通过binlog同步拿到变更数据,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等多源同步。

canal使用场景

场景1:原始场景, 阿里otter中间件的一部分

场景2:更新缓存

场景3:抓取业务数据新增变化表,用于制作拉链表。( 拉链表:记录每条信息的生命周期,一旦一条记录的生命周期结束,就要重新开始一条新的记录,并把当前日期放入生效的开始日期 )

场景4:抓取业务表的新增变化数据,用于制作实时统计。

canal运行原理

复制过程分成三步:

  1. Master主库将改变记录,写到二进制日志(binary log)中

  2. Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);

  3. Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

canal的工作原理很简单,就是把自己伪装成slave,假装从master复制数据。

MySQL的binlog介绍

什么是binlog

MySQL的二进制日志可以说是MySQL最重要的日志了,它记录了所有的DDL和DML( 除了数据查询语句 )语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

其一:MySQL Replication在Master端开启binlog,Mster把它的二进制日志传递给slaves来达到master-slave数据一致的目的。

其二:通过使用mysqlbinlog工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。

开启MySQL的binlog

在mysql的配置文件中开启并重启MySQL生效,一般Linux系统下的MySQL配置文件路径基本都在 /etc/my.cnf ;log-bin=mysql-bin

这个表示binlog日志的前缀是mysql-bin ,以后生成的日志文件就是 mysql-bin.123456 的文件后面的数字按顺序生成。 每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

binlog的分类设置

MySQL的binlog的格式有三种,分别是STATEMENT、MIXED、ROW。在配置文件中可以配置选项指定:binlog_format=

statement [ 语句级 ]

​ 语句级,binlog会记录每次一执行写操作的语句。

​ 相对row模式节省空间,但是可能产生不一致性,例如:update table_name set create_date=now();

​ 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同 ( master落库数据时create_date为2021-08-08 11:10:30 ,但binlog从库落库执行语句时create_date的时间可能就变为2021-08-08 11:11:23 ,主要是语句执行时间为异步)

​ 优点: 节省空间

​ 缺点: 有可能造成数据不一致

row [ 行级 ]

​ 行级, binlog会记录每次操作后每行记录的变化。

​ 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。

​ 缺点:占用较大空间。

mixed [ 综合语句级和行级 ]

​ statement的升级版,一定程度上解决了因一些情况而造成的statement模式不一致问题

​ 在某些情况下譬如:

​ ○ 当函数中包含 UUID() 时;

​ ○ 包含 AUTO_INCREMENT 字段的表被更新时;

​ ○ 执行 INSERT DELAYED 语句时;

​ ○ 用 UDF 时;

​ 会按照 ROW的方式进行处理

​ 优点:节省空间,同时兼顾了一定的一致性。

​ 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

环境准备

机器规划

我这里用了4台机器:

机器规划:ops01、ops02、ops03 用于安装kafka + zookeeper + canal 集群 ;ops04用于部署MySQL服务,测试时可以将MySQL部署在3台集群中的一台

11.8.37.50 ops01

11.8.36.63 ops02

11.8.36.76 ops03

11.8.36.86 ops04

4台机器都在/etc/hosts中配置主机名hosts解析

安装配置MySQL

新建数据库和表用于业务模拟,这里不介绍安装步骤了,如未安装过MySQL,可自行查阅之前的文章《Hive简介及hive部署、原理和使用介绍》中有MySQL详细的安装步骤;

安装完MySQL后,做基本的设置配置

# 登录mysql
root@ops04:/root #mysql -uroot -p123456
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 442523
Server version: 5.7.29 MySQL Community Server (GPL)Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
# 增加canal用户并配置权限
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
Query OK, 0 rows affected, 1 warning (0.00 sec)mysql> quit;
Bye
# 修改MySQL配置文件,增添binlog相关配置项
root@ops04:/root #vim /etc/my.cnf
# binlog
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall

新建一个gmall库,库其实都可以,只要和上方配置文件中对应即可

重启MySQL:

root@ops04:/root #mysql -V
mysql  Ver 14.14 Distrib 5.7.29, for Linux (x86_64) using  EditLine wrapper
root@ops04:/root #systemctl status mysqld
● mysqld.service - MySQL ServerLoaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)Active: active (running) since Wed 2021-05-26 09:30:25 CST; 2 months 22 days agoDocs: man:mysqld(8)http://dev.mysql.com/doc/refman/en/using-systemd.htmlMain PID: 32911 (mysqld)Memory: 530.6MCGroup: /system.slice/mysqld.service└─32911 /usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pidMay 26 09:30:18 ops04 systemd[1]: Starting MySQL Server...
May 26 09:30:25 ops04 systemd[1]: Started MySQL Server.
root@ops04:/root #
root@ops04:/root #systemctl restart mysqld
root@ops04:/root #

【注意】:在新增了binlog配置后,重启MySQL服务后,在存储目录下会有相关的binlog文件,格式如下

root@ops04:/var/lib/mysql #ll | grep mysql-bin
-rw-r----- 1 mysql mysql     1741 Aug 17 14:27 mysql-bin.000001
-rw-r----- 1 mysql mysql       19 Aug 17 11:18 mysql-bin.index

验证canal用户登录:

root@ops04:/root #mysql -ucanal -pcanal -e "show databases"
mysql: [Warning] Using a password on the command line interface can be insecure.
+--------------------+
| Database           |
+--------------------+
| information_schema |
| gmall              |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
root@ops04:/root #

在gmall库中新建表,并插入一些样例数据做测试:

CREATE TABLE `canal_test` (`体温` varchar(255) DEFAULT NULL,`身高` varchar(255) DEFAULT NULL,`体重` varchar(255) DEFAULT NULL,`文章` varchar(255) DEFAULT NULL,`日期` date DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('36.5', '1.70', '180', '4', '2021-06-01');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('36.4', '1.70', '160', '8', '2021-06-02');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('36.1', '1.90', '134', '1', '2021-06-03');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('37.3', '1.70', '110', '14', '2021-06-04');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('35.7', '1.70', '133', '0', '2021-06-05');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('36.8', '1.90', '200', '6', '2021-06-06');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('37.5', '1.70', '132', '25', '2021-06-07');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('35.7', '1.70', '160', '2', '2021-06-08');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('36.3', '1.80', '131.4', '9', '2021-06-09');
INSERT INTO `canal_test`(`体温`, `身高`, `体重`, `文章`, `日期`) VALUES ('37.3', '1.70', '98.8', '4', '2021-06-10');

安装kafka + zookeeper

为了实现canal的高可用,这里也不介绍具体安装步骤,减少篇幅,可自行查阅之前的文章《Kafka简介及Kafka部署、原理和使用介绍》中有kafka的详细安装步骤;

查询kafka和zookeeper各端口集群运行状态:

wangting@ops03:/opt/module >ssh ops01 'sudo netstat -tnlpu| grep -E "9092|2181"'
tcp6       0      0 :::9092                 :::*                    LISTEN      42305/java
tcp6       0      0 :::2181                 :::*                    LISTEN      41773/java
wangting@ops03:/opt/module >ssh ops02 'sudo netstat -tnlpu| grep -E "9092|2181"'
tcp6       0      0 :::9092                 :::*                    LISTEN      33518/java
tcp6       0      0 :::2181                 :::*                    LISTEN      33012/java
wangting@ops03:/opt/module >ssh ops03 'sudo netstat -tnlpu| grep -E "9092|2181"'
tcp6       0      0 :::9092                 :::*                    LISTEN      102886/java
tcp6       0      0 :::2181                 :::*                    LISTEN      102422/java

安装部署canal

阿里的canal项目地址为:https://github.com/alibaba/canal,下载链接可以在github页面点击右边的release查看各版本下载,建议有精力可以多查阅阿里首页的热门项目,有很多项目越来越受欢迎。

下载安装包

# 下载安装包
wangting@ops03:/opt/software >wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
wangting@ops03:/opt/software >ll | grep canal
-rw-r--r-- 1 wangting wangting  60205298 Aug 17 11:23 canal.deployer-1.1.5.tar.gz

解压安装

# 新建canal解压目录【注意】: 官方项目解压出来没有顶级canal目录,所以新建个目录用于解压组件
wangting@ops03:/opt/software >mkdir -p /opt/module/canal
wangting@ops03:/opt/software >tar -xf canal.deployer-1.1.5.tar.gz -C /opt/module/canal/

修改canal主配置

# 修改canal主配置文件
wangting@ops03:/opt/module/canal >cd conf/
wangting@ops03:/opt/module/canal/conf >ll
total 28
-rwxrwxr-x 1 wangting wangting  319 Apr 19 15:48 canal_local.properties
-rwxrwxr-x 1 wangting wangting 6277 Apr 19 15:48 canal.properties
drwxrwxr-x 2 wangting wangting 4096 Aug 17 13:49 example
-rwxrwxr-x 1 wangting wangting 3437 Apr 19 15:48 logback.xml
drwxrwxr-x 2 wangting wangting 4096 Aug 17 13:49 metrics
drwxrwxr-x 3 wangting wangting 4096 Aug 17 13:49 spring
# 改动如下相关配置: zk | 同步策略目标方式 | kafka
wangting@ops03:/opt/module/canal/conf >vim canal.properties
canal.zkServers =ops01:2181,ops02:2181,ops03:2181
canal.serverMode = kafka
kafka.bootstrap.servers = ops01:9092,ops02:9092,ops03:9092

修改canal的实例配置- (mysql to kafka)

# 配置实例相关配置:canal可以启多实例,一个实例对应一个目录配置,例如把example目录复制成xxx,把xxx目录下的配置改动启动,就是一个新实例
wangting@ops03:/opt/module/canal/conf >cd example/
wangting@ops03:/opt/module/canal/conf/example >ll
total 4
-rwxrwxr-x 1 wangting wangting 2106 Apr 19 15:48 instance.properties
# 注意11.8.38.86:3306需要改成自己环境的mysql地址和端口,其次用户名密码改成自己环境的,topic自定义一个
wangting@ops03:/opt/module/canal/conf/example >vim instance.properties
canal.instance.master.address=11.8.38.86:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=wangting_test_canal
canal.mq.partitionsNum=12

分发安装目录

# 将修改后的canal目录分发至另外2台服务器:
wangting@ops03:/opt/module >scp -r /opt/module/canal ops01:/opt/module/
wangting@ops03:/opt/module >scp -r /opt/module/canal ops02:/opt/module/

启动canal集群

# 各服务器依次启动集群canal
wangting@ops03:/opt/module >cd /opt/module/canal/bin/
wangting@ops03:/opt/module/canal/bin >./startup.sh wangting@ops02:/home/wangting >cd /opt/module/canal/bin/
wangting@ops02:/opt/module/canal/bin >./startup.sh wangting@ops01:/home/wangting >cd /opt/module/canal/bin/
wangting@ops01:/opt/module/canal/bin >./startup.sh

验证结果

# 在一台服务器上监测kafka
wangting@ops03:/opt/module/canal/bin >kafka-console-consumer.sh --bootstrap-server ops01:9092,ops02:9092,ops03:9092 --topic wangting_test_canal
[2021-08-17 14:21:29,924] WARN [Consumer clientId=consumer-console-consumer-17754-1, groupId=console-consumer-17754] Error while fetching metadata with correlation id 2 : {wangting_test_canal=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

按照预期,如果现在成功的监测了ops04上MySQL中gmall库,那么在gmall库中的表如有数据改动,那么控制台会有信息输出实时同步更新到前台

当前表内数据:

改动表中的数据观察控制台输出:

1.将2021-06-10 -> 2021-08-17

2.新增一条数据

3.将一个数值改动 1 -> 1111

wangting@ops03:/opt/module/canal/bin >kafka-console-consumer.sh --bootstrap-server ops01:9092,ops02:9092,ops03:9092 --topic wangting_test_canal
[2021-08-17 14:21:29,924] WARN [Consumer clientId=consumer-console-consumer-17754-1, groupId=console-consumer-17754] Error while fetching metadata with correlation id 2 : {wangting_test_canal=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient){"data":[{"体温":"37.3","身高":"1.70","体重":"98.8","文章":"4","日期":"2021-08-17"}],"database":"gmall","es":1629185045000,"id":6,"isDdl":false,"mysqlType":{"体温":"varchar(255)","身高":"varchar(255)","体重":"varchar(255)","文章":"varchar(255)","日期":"date"},"old":[{"日期":"2021-06-10"}],"pkNames":null,"sql":"","sqlType":{"体温":12,"身高":12,"体重":12,"文章":12,"日期":91},"table":"canal_test","ts":1629185063194,"type":"UPDATE"}{"data":[{"体温":"35.55","身高":"1.999","体重":"99.99","文章":"999","日期":"2021-08-17"}],"database":"gmall","es":1629185086000,"id":7,"isDdl":false,"mysqlType":{"体温":"varchar(255)","身高":"varchar(255)","体重":"varchar(255)","文章":"varchar(255)","日期":"date"},"old":null,"pkNames":null,"sql":"","sqlType":{"体温":12,"身高":12,"体重":12,"文章":12,"日期":91},"table":"canal_test","ts":1629185104967,"type":"INSERT"}{"data":[{"体温":"36.1","身高":"1.90","体重":"134","文章":"1111","日期":"2021-06-03"}],"database":"gmall","es":1629185104000,"id":8,"isDdl":false,"mysqlType":{"体温":"varchar(255)","身高":"varchar(255)","体重":"varchar(255)","文章":"varchar(255)","日期":"date"},"old":[{"文章":"1"}],"pkNames":null,"sql":"","sqlType":{"体温":12,"身高":12,"体重":12,"文章":12,"日期":91},"table":"canal_test","ts":1629185122499,"type":"UPDATE"}

能明显看到每次改动都能在记录中呈现出,old数据和当前数据都能一一对应,截止到这里canal整个流程链算是全通了,canal同步到不同的存储介质方法基本都大同小异。

扩展:

可以在zookeeper命令行中查看canal信息:

wangting@ops01:/opt/module/canal/bin >zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls -w /
[hbase, kafka, otter, wangting, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls -w /otter
[canal]
[zk: localhost:2181(CONNECTED) 2] ls -w /otter/canal
[cluster, destinations]

canal简介及canal部署、原理和使用介绍相关推荐

  1. canal简介及canal部署、TCP原理和使用介绍

    canal简介及canal部署.原理和使用介绍 canal入门 什么是canal canal使用场景 canal运行原理 MySQL的binlog介绍 什么是binlog 开启MySQL的binlog ...

  2. canal部署、原理和使用介绍

    canal简介及canal部署.原理和使用介绍 canal入门 什么是canal 阿里巴巴B2B公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求,从2 ...

  3. canal.deployer-1.0.24部署

    1.准备: 下载地址:http://download.csdn.net/detail/qbw2010/9854441 2.canal概述: canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房 ...

  4. Maxwell简介、部署、原理和使用介绍

    Maxwell简介.部署.原理和使用介绍 1.Maxwell概述简介 1-1.Maxwell简介 ​ Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件.他会实 ...

  5. ElasticSearch简介及ElasticSearch部署、原理和使用介绍

    ElasticSearch简介及ElasticSearch部署.原理和使用介绍 第一章:elasticsearch简介 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式 ...

  6. Flume简介及Flume部署、原理和使用介绍

    Flume简介及Flume部署.原理和使用介绍 Flume概述 ​ Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统.Flume基于流式架构,灵活简单. ...

  7. 【Canal】canal简介

    1.canal简介 canal是一个增量解析MySQL binlog日志解析,提供增量数据订阅和消费的组件. 官方github https://github.com/alibaba/canal 2.c ...

  8. azkaban简介及azkaban部署、原理和使用介绍

    azkaban简介及azkaban部署.原理和使用介绍 azkaban简介 ​ Azkaban是一套简单的任务调度服务,整体包括三部分webserver.dbserver.executorserver ...

  9. Pulsar简介及Pulsar部署、原理和使用介绍

    Pulsar简介及Pulsar部署.原理和使用介绍 Pulsar简介 诞生背景 Apache Pulsar 是一个企业级的分布式消息系统,最初由 Yahoo 开发,在 2016 年开源,并于2018年 ...

最新文章

  1. 05 Java程序员面试宝典视频课程之Jquery
  2. 不是内部或外部命令 windows10 执行 linux命令
  3. 给FlvDownloader加了一个视频预览的功能
  4. mysql enum和set_mysql enum和set字段类型的使用
  5. 选择日期保存后日期变成前一天(avue date)
  6. 如何跨域来同步不同网站之间的Cookie
  7. java test 用法,pytest基本用法简介
  8. 二叉搜索树(二叉排序树)
  9. Python中参数iterable的意义
  10. 思岚S2激光雷达3—ROS node and test application for RPLIDAR
  11. 服务器cpu型号E5,超强悍服务器CPU:Intel 18核心至强E5
  12. 电脑显示器屏幕看不清灰色,灰色部分都几乎呈现白色状态的解决办法。
  13. 鲁大师2022牛角尖颁奖盛典落幕,年度最强产品揭晓!
  14. python抠图教程_简单几行Python代码实现8秒抠图的AI神器,根本无需PS(附教程)...
  15. 短视频怎么获得高流量?简单小技巧,让你的短视频被更多人看到
  16. 【学习笔记之Linux】工具之gcc/g++
  17. easyexcel读取合并单元格
  18. Oracle 快速入门 PL/SQL游标
  19. 首次不依赖生成模型,一句话让AI修图!
  20. w3c JS测试

热门文章

  1. Servlet_W3School
  2. linux suse git,SUSE 搭建GIT服务
  3. [小说]魔王冢(4)疑窦初生
  4. 精益画布和数据分析框架
  5. lg 禁用android系统更新,LG 手机Android 11更新:部分最晚四季度才更新
  6. MySQL8.0 日志
  7. 无线认证计费系统应用
  8. CAN 口转以太网、CAN 以太网、CAN IP的功能特点
  9. C/C++基础----多态
  10. 烤仔TVの尚书房 | IXO:项目方与投资人之间的话语权争夺战