解决方案:数据同步Canal


关键词

  • 伪装为mysql slave(发dump协议,接受binary log)
  • eventParser(模拟slave协议) ,eventSink(过滤加工),eventStore (存储)
  • @CanalEventListener—>adUpdate方法

一、概述

canal是阿里巴巴旗下的一款开源项目,纯Java开发。canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

  • 基于数据库增量日志解析
  • 提供增量数据订阅&消费
  • 目前主要支持了MySQL(也支持mariaDB)

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

二、原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysqlmaster发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

三、架构

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

EventParser

大致过程

整个parser过程大致可分为几步:

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
  2. Connection建立链接,发送BINLOG_DUMP指令
    // 0. write command number
    // 1. write 4 bytes bin-log position to start at
    // 2. write 2 bytes bin-log flags
    // 3. write 4 bytes server id of the slave
    // 4. write bin-log file name
  3. Mysql开始推送Binaly Log
  4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
    补充字段名字,字段类型,主键信息,unsigned类型处理
  5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
  6. 存储成功后,定时记录Binaly Log位置。

EventSink

说明:

  • 数据过滤:支持通配符的过滤模式,表名,字段内容等
  • 数据路由/分发:解决1:n (1个parser对应多个store的模式)
  • 数据归并:解决n:1 (多个parser对应1个store)
  • 数据加工:在进入store之前进行额外的处理,比如join

HA机制设计

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

  • canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。


大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和Canal Server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

四、环境部署

1. mysql开启binlog模式

(1)查看当前mysql是否开启binlog模式

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启

(2)修改/etc/my.cnf 需要开启binlog模式

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完成之后,重启mysqld的服务

(3)进入mysql

mysql -h localhost -u root -p

(4)创建账号 用于测试使用

使用root账号创建用户并授予权限

create user canal@'%' IDENTIFIED by 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;
2. canal服务端安装配置

(1)下载地址canal

https://github.com/alibaba/canal/releases/tag/canal-1.0.24

(2)下载之后 上传到linux系统中,解压缩到指定的目录/usr/local/canal

解压缩之后的目录结构如下:

(3)修改 exmaple下的实例配置

修改如图所示的几个参数

一定要注释掉下面这个参数,这样就会扫描全库

#canal.instance.defaultDatabaseName =

(3)启动服务:

[root@localhost canal]# ./bin/startup.sh

(4)查看日志:

cat /usr/local/canal/logs/canal/canal.log


启动成功

五、通过canal监控数据库中表数据的变更

当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。

我们这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。

https://github.com/chenqian56131/spring-boot-starter-canal

使用前需要将starter-canal安装到本地仓库(把提供的依赖拷贝到本地仓库即可)

我们可以参照它提供的canal-test,进行代码实现。

(1)创建工程模块dabing_canal,pom引入依赖

<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>

(2)创建包com.dabing.canal ,包下创建启动类

/*** 数据监控微服务*/
@SpringBootApplication
//开启canalClient
@EnableCanalClient
public class CanalApplication {public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}}

(3)添加配置文件application.properties

canal:client:instances:example:host: 192.168.253.128port: 11111

(4)创建com.lagou.canal.listener包,包下创建类

/*** 监控Business库下的表*/
@CanalEventListener
public class BusinessListener {/*** 设置监控点,监控的目标,对应库下的具体表* @param entryType* @param rowData*/@ListenPoint(schema = "dabing_business",table = {"tb_ad"})public void adUpdate(CanalEntry.EntryType entryType, CanalEntry.RowData rowData){System.out.println("tb_ad表中的数据发生变化");System.out.println("变化前的数据");rowData.getBeforeColumnsList().forEach((c)-> System.out.println(c.getName()+":"+c.getValue()));System.out.println("变化后的数据");rowData.getAfterColumnsList().forEach((c)-> System.out.println(c.getName()+":"+c.getValue()));}
}

测试: 启动数据监控微服务,修改dabing_business的tb_ad表,观察控制台输出。

解决方案:数据同步Canal相关推荐

  1. Canal监控MySQL数据库实现数据同步

    canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL. canal简介 需求:将MySQL中某些表的数据实时的同步到 ...

  2. Redis和Mysql数据同步的两种方案

    ** 利用数据库本身进行手动同步 ** 具体操作 在MySQL中对要操作的数据设置触发器Trigger,监听操作 客户端(NodeServer)向MySQL中写入数据时,触发器会被触发,触发之后调用M ...

  3. 数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!

    写在前面 在当今互联网行业,尤其是现在分布式.微服务开发环境下,为了提高搜索效率,以及搜索的精准度,会大量使用Redis.Memcached等NoSQL数据库,也会使用大量的Solr.Elastics ...

  4. springboot实现增量备份_SpringBoot canal数据同步解决方案

    SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...

  5. 【Canal】互联网背景下有哪些数据同步需求和解决方案?看完我知道了!!

    点击上方蓝色"冰河技术",关注并选择"设为星标" 持之以恒,贵在坚持,每天进步一点点! 作者个人研发的在高并发场景下,提供的简单.稳定.可扩展的延迟消息队列框架 ...

  6. canal mysql多节点_数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!...

    写在前面 在当今互联网行业,尤其是现在分布式.微服务开发环境下,为了提高搜索效率,以及搜索的精准度,会大量使用Redis.Memcached等NoSQL数据库,也会使用大量的Solr.Elastics ...

  7. 数据同步的解决方案Canal

    Canal实现数据同步的原理: 1.是根据模拟mysql slave的主从交互协议,伪装自己是mysql slave,向mysql master发送dump请求. 2.mysql master收到du ...

  8. 数据同步解决方案-canal

    1.canal简介 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据. canal是应对阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的. 阿里系公司开始逐 ...

  9. 基于Canal的MySQL=>ES数据同步方案

    文章目录 1.MySQL和ES的主要区别? 1.1 功能性 1.2 性能指标 1.3 在搜索业务上的区别 1.3.1 查询 1.3.2 检索 2.为什么要做数据同步 2.1 检索性能 2.2 写入性能 ...

最新文章

  1. 智能车竞赛技术报告 | 单车拉力组 - 哈尔滨工业大学 - 紫丁香
  2. 使用Reactor进行反应式编程最全教程
  3. PhotoSwipe 图片浏览插件使用方法
  4. 繁体简体转换器 v 1.0
  5. 深度学习(四十五)条件对抗网络
  6. 使用寄存器点亮LED——编程实战
  7. DataGrid实现单选功能,将DataGrid绑定的单选钮放在一个组里
  8. 数字孪生将这样改变我们的工作与生活
  9. 如果IE浏览器是IE11以下版本跳转到升级页面
  10. html 图标制作,icon小图标制作
  11. JSP从入门到精通_课堂实战视频教程
  12. win10系统服务器不能创建对象,win10系统中activex部件不能创建对象怎么修复
  13. matlab chi2gof,chi2gof函数里的检验值P为什么总等于NaN呢
  14. 网易邮箱与GMAIL
  15. (一)MQTT+阿里云实现设备>云,云>设备之间的通信。
  16. 如何扩充C盘空间,不需要删除其余盘的任何东西。
  17. mysqlbinlog 加-v -vv 的区别
  18. 好用的AndroidStudio插件推荐
  19. python 处理EXCEL 追加写
  20. 全志JAVA_全志R11处理器参数详细说明

热门文章

  1. 首届CSS开发者大会|七牛助力前端开发
  2. 北大青鸟ASP.NET之总结篇
  3. Silverlight控件应用系列索引
  4. 【请教】服务器上出现的两个问题!
  5. [译] 关于 HTTP/3 的一些心得
  6. python笔记-标准库unittest
  7. Linux网络——一种强制门户技术
  8. Your password has expired. To log in you must change it using a client that supports expired pass...
  9. Cacti设置流量阀值实现邮件报警
  10. 1.3、Bootstrap V4自学之路------起步---浏览器支持