文章目录

  • 概述
    • 1.1 背景
    • 1.2 工作原理
    • 1.4 HA机制设计
    • 1.5 docker上安装canal
    • 1.6 简单使用

概述

1.1 背景

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2 工作原理

MySQL主从复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events)

MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)

架构

server 代表一个canal服务,管理多个instance
instance 伪装成一个slave,从mysql dump数据
instance模块:

eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)

1.4 HA机制设计

canal的高可用HA(High Availability)

为了减少对mysql dump的请求,要求同一时间只能有一个处于running,其他的处于standby状态
如下图所示:

大致步骤:

canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建短暂的节点,谁创建成功就允许谁启动)
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于备用状态
一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
canal client每次进行连接时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect

1.5 docker上安装canal

创建mysql容器

docker run -id --name canal_mysql -v /mnt/canal_mysql:/var/lib/mysql
-p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7

安装vim
需要在MySQL容器中修改配置文件,但是容器中默认没有vim命令,需要进行安装。

直接执行命令安装vim速度会很慢,因为使用的是国外的源,需要更新Debian源以提高速度。

#在宿主机创建sources.list配置文件
vi sources.list
#内容为:

deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster main
deb http://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main
deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main


#复制宿主机的配置到MySQL容器中

docker cp sources.list canal_mysql:/etc/apt/


#进入MySQL容器

docker exec -it canal_mysql /bin/bash


#执行安装命令

apt-get update && apt-get install vim -y

修改MySQL配置
需要让canal伪装成salve并正确获取mysql中的binary log,首先要开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,命令如下:

#修改MySQL配置文件

vim /etc/mysql/mysql.conf.d/mysqld.cnf

#添加的内容如下:

log-bin=mysql-bin
binlog-format=ROW
server_id=1

#开启binlog 选择ROW模式
#server_id不要和canal的slaveId重复
重启MySQL

docker restart canal_mysql

远程登录MySQL,查看配置状态,执行以下sql:

show variables like 'log_bin';
show variables like 'binlog_format';
show master status;

创建Canal账号
创建连接MySQL的账号canal并授予作为 MySQL slave 的权限,执行以下sql:

创建账号

CREATE USER canal IDENTIFIED BY 'canal';

授予权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

刷新并应用

FLUSH PRIVILEGES

创建canal-server容器

docker run -d --name canal-server -p 11111:11111 canal/canal-server:v1.1.4

配置canal-server
#进入canal-server容器

docker exec -it canal-server /bin/bash

#编辑canal-server的配置

vi canal-server/conf/example/instance.properties

重启canal-server

修改完成后重启canal-server,并查看日志:

#按ctrl+D退出容器,并重启容器

docker restart canal-server

可以看到现在已经有两个镜像在启动:

#重启成功后进入容器

docker exec -it canal-server /bin/bash

#查看日志

tail -100f canal-server/logs/example/example.log

1.6 简单使用

在数据库服务中创建数据库canal_test并创建表:

CREATE TABLE `student` (`id` varchar(20) NOT NULL,`name` varchar(50) DEFAULT NULL,`age` int(11) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL,`city` varchar(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建Maven工程canal-demo,在pom.xml中添加依赖:

<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
</dependencies>

编写代码获取canal数据:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;public class CanalTest {public static void main(String[] args) {String ip = "自己ip地址";String destination = "example";//创建连接对象CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111), destination, "", "");//进行连接canalConnector.connect();//进行订阅canalConnector.subscribe();int batchSize = 5 * 1024;//使用死循环不断的获取canal信息while (true) {//获取Message对象Message message = canalConnector.getWithoutAck(batchSize);long id = message.getId();int size = message.getEntries().size();System.out.println("当前监控到的binLog消息数量是:" + size);//判断是否有数据if (id == -1 || size == 0) {//如果没有数据,等待1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}} else {//如果有数据,进行数据解析List<Entry> entries = message.getEntries();//遍历获取到的Entry集合for (Entry entry : entries) {System.out.println("----------------------------------------");System.out.println("当前的二进制日志的条目(entry)类型是:" + entry.getEntryType());//如果属于原始数据ROWDATA,进行打印内容if (entry.getEntryType() == EntryType.ROWDATA) {try {//获取存储的内容RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());//打印事件的类型,增删改查哪种 eventTypeSystem.out.println("事件类型是:" + rowChange.getEventType());//打印改变的内容(增量数据)for (RowData rowData : rowChange.getRowDatasList()) {System.out.println("改变前的数据:" + rowData.getBeforeColumnsList());System.out.println("改变后的数据:" + rowData.getAfterColumnsList());}} catch (Exception e) {e.printStackTrace();}}}//消息确认已经处理了canalConnector.ack(id);}}}
}

canal介绍和使用docker安装canal相关推荐

  1. docker安装canal

    #数据库开启binlog 查看我之前的教程 docker 创建MySQL docker cp mysql:/etc/mysql/mysql.conf.d/mysqld.cnf / vim /mysql ...

  2. Day15(Js入门、jquery入门、ajax入门、前后端分离开发跨域问题、linux环境准备、jdk_tomcat环境搭建、docker介绍及应用(docker安装、基本命令、安装tomcat))

    js入门 js代码辅助 window–>preferences–>javaScript–>Content Assist .abcdefghijklmnopqrstuvwxyz alt ...

  3. Redis第一话 -- Redis介绍以及基于Docker安装Redis

    随着缓存技术的发展,Redis已经成为了工作中使用的最多的缓存技术了. 那么对Redis的了解肯定是必不可少的. 1.Redis介绍 Redis是基于C语言开发的纯内存存储KV结构的一个数据库.获取数 ...

  4. Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步

    因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...

  5. Docker部署Canal

    Canal:Canal是阿里开源的一款基于Mysql数据库binlog的增量订阅和消费组件,通过它可以订阅数据库的binlog日志,然后进行一些数据消费,如数据镜像.数据异构.数据索引.缓存更新等.相 ...

  6. saiku docker配置部署_【安装教程】01 Gitea Docker 安装部署 - 【SkywenCode】技术团队基...

    在2016年-2019年,SkywenCode技术团队使用码云Gitee 作为线上代码库管理,基于敏捷开发和持续构建的思路,我们整体基础建设以Drone / Jenkins + Docker 的方式构 ...

  7. Docker安装gbase8s数据库教程

    本章教程介绍如何利用Docker安装gbase8s数据库. 目录 一. 搜索镜像 二.拉取镜像 三.启动容器 四.数据库连接 一. 搜索镜像 docker search gbase8s 二.拉取镜像 ...

  8. Docker安装OpenResty教程

    本文主要介绍如何通过Docker安装OpenResty. 目录 一. 搜索镜像 二.拉取镜像 三.启动容器 四.访问测试 一. 搜索镜像 docker search openresty 二.拉取镜像 ...

  9. Docker安装Python3教程

    本文主要介绍如何通过Docker安装Python3. 目录 一.拉取镜像 二.挂载目录 三.创建容器 四.进入容器 五.运行脚本 (1)编写脚本 (2)上传脚本 (3)运行脚本 一.拉取镜像 本次教程 ...

最新文章

  1. pytorch 实现线性回归
  2. eBay Notification介绍
  3. 同一路由带参刷新,以及params和query两种方式传参的异同
  4. vsftp 一键安装包
  5. python正则表达式(1)--特殊字符
  6. VIIRS 学习资料分享
  7. jenkins+docker+nginx服务并访问vue项目
  8. socket数据的发送和接收
  9. 在Linux中使用7zip/7zz
  10. VB6 Chr码值对应列表大全
  11. 计算机硬件系统的主要性能指标
  12. ios 应用跳转商店
  13. 百度地图根据经纬度获取实际地理位置Api接口
  14. C语言:成绩等级划分!
  15. 荣耀v40轻奢版和华为nova8的区别 哪个好
  16. 网络分流器|网络分流器|网络分流采集器的应用领域
  17. 数学建模国赛2022C解题分享
  18. ssm南工二手书交易平台 毕业设计-附源码172334
  19. 博士申请 | 香港中文大学(深圳)濮实老师招收分布式优化与机器学习全奖博士生...
  20. 逐梦旅程(著:毛星云)---学习笔记第三章

热门文章

  1. 网页全文搜索字符和全局搜索文件名【Edge和谷歌浏览器均适用】
  2. 使用Spring进行不同开发所需要用到的包
  3. 2 了解MyBatis配置文件
  4. c 读取mysql 时间字段_MySQL中的时间字段的几种数据类型比较
  5. linux字符串转大写_在 Linux 命令行中转换大小写
  6. 【小白学习C++ 教程】十一、C++类中访问修饰符
  7. 四十八、面试前,必须搞懂Java中的线程池(下篇)
  8. sklearn模型的训练(下)
  9. 杭州/北京内推 | 蚂蚁集团智能决策团队招聘运筹优化算法工程师/实习生
  10. 无关于目标or特定于目标:简单且有效的零样本立场检测对比学习方法