Canal用途很广,并且上手非常简单,小伙伴们在平时完成公司的需求时,很有可能会用到。本篇介绍一下数据库中间件Canal的使用。
很多时候为了缩短调用延时,我们会对部分接口数据加入了缓存。一旦这些数据在数据库中进行了更新操作,缓存就成了旧数据,必须及时删除。删除缓存的代码「理所当然可以写在更新数据的业务代码里」,但有时候写操作是在别的项目代码里,你可能无权修改,亦或者别人不愿你在他代码里写这种业务之外的代码。(毕竟多人协作中间会产生各种配合问题)。又或者就是单纯的删除缓存的操作失败了,缓存依然是旧数据。这个时候,我们可以将缓存更新操作完全独立出来,形成一套单独的系统。

在上一篇我们提到过,Canal能帮我们实现像下图这样的系统来进行数据的处理:

接下让我们一起来看看Canal到底是什么,以及用它如何实现上面我们我们提到的系统。

Canal概述

阿里是国内比较早地大量使用MySQL的互联网企业(去IOE化:去掉IBM的小型机、Oracle数据库、EMC存储设备,代之以自己在开源软件基础上开发的系统),并且基于阿里巴巴/淘宝的业务,从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。Canal应运而生,它通过伪装成数据库的从库,读取主库发来的binlog,用来实现数据库增量订阅和消费业务需求。我们可以使用Canal实现以下用途:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 缓存刷新
  • 带业务逻辑的增量数据处理

开源项目地址:https://github.com/alibaba/canal,大家有需要可以下载看看。这里有几点重点给大家提出来说一下:

  • canal 使用 client-server 模式,数据传输协议使用 protobuf 3.0(很多RPC框架也在使用例如gRPC)
  • 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
  • canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ。

Canal实际是将自己伪装成数据库的从库,来读取Binlog。这里我们先讲一下MySQL数据库主从数据库的知识,这样就能更快的理解Canal。

数据库相关知识

数据库的读写分离

为了应对高并发场景,MySQL支持把一台数据库主机分为单独的一台写主库(主要负责写操作),而把读的数据库压力分配给读的从库,而且读从库可以变为多台,这就是读写分离的典型场景。

数据库主从同步

实现数据库的读写分离,是通过数据库主从同步,让从数据库监听主数据库Binlog实现的。

大体流程如下图:

  1. MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  2. MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  3. MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

可以看到,这种架构下会有一个问题,数据库主从同步会存在延迟,那么就会有短暂的时间,主从数据库的数据是不一致的。这种不一致大多数情况下非常短暂,很多时候我们可以忽略他。但一旦要求数据一致,就会引申出如何解决这个问题的思考。

数据库主从同步一致性问题

我们通常使用MySQL主从复制来解决MySQL的单点故障问题,其通过逻辑复制的方式把主库的变更同步到从库,主备之间无法保证严格一致的模式,于是,MySQL的主从复制带来了主从“数据一致性”的问题。MySQL的复制分为:异步复制、半同步复制、全同步复制。

异步复制

概念:MySQL默认的复制即是异步复制,主库在执行完客户端提交的事务后会立即将结果返给给客户端,并不关心从库是否已经接收并处理;
缺点:主库将事务 Binlog 事件写入到 Binlog 文件中,此时主库只会通知一下 Dump 线程发送这些新的 Binlog,然后主库就会继续处理提交操作,而此时不会保证这些 Binlog 传到任何一个从库节点上。主如果crash掉了,此时主上已经提交的事务可能并没有传到从库上,如果此时,强行将从提升为主,可能导致新主上的数据不完整。

全同步复制

概念:当主库提交事务之后,所有的从库节点必须收到、APPLY并且提交这些事务,然后主库线程才能继续做后续操作
缺点:需要等待所有从库执行完该事务才能返回,全同步复制的性能必然会收到严重的影响

半同步复制

概念:介于异步复制和全同步复制之间,主库在执行完客户端提交的事务后不是立刻返回给客户端,而是等待至少一个从库接收到并写到relay log中才返回给客户端。相对于异步复制,半同步复制提高了数据的安全性;
缺点:造成了一定程度的延迟,这个延迟最少是一个TCP/IP往返的时间。所以,半同步复制最好在低延时的网络中使用。

当半同步复制发生超时时(由rpl_semi_sync_master_timeout参数控制,单位是毫秒,默认为10000,即10s),会暂时关闭半同步复制,转而使用异步复制。当master dump线程发送完一个事务的所有事件之后,如果在rpl_semi_sync_master_timeout内,收到了从库的响应,则主从又重新恢复为半同步复制。

Canal工作原理

回顾了数据库从库的数据同步原理,理解Canal十分简单,直接引用官网原文:

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)

Canal实战

因为canal主要是监听mysql的binlog日志,所以需要先保证mysql的binlog是开启的状态,怎么操作大家可以看这篇MYSQL专题-使用Binlog日志恢复MySQL数据,这里不再赘述。然后看一下我们mysql的用户都有哪些:

创建Canal账号

我们为Cannal创建一个单独的账号并为其授权,依次执行以下语句:

CREATE USER canal IDENTIFIED BY 'xxxx';  (填写密码)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

最后通过show grants for 'canal’看一下账号信息,已经创建成功。

配置Canal服务

去Github下载最近的Canal稳定版本包:点此进入下载页面,然后依据自己想要的版本进行下载即可。

我这里使用的是1.1.5的版本。下载后查看文件:

将相应信息更改为你对应的数据库地址以及相应的数据库账号和密码。
进入bin目录点击即可启动:


出现如下界面即位启动成功:

Canal操作

配置好canal以后,我们接下来用代码连接进行操作。在之前的代码中增加模块miaosha-job,然后写一个连接canal的程序:

public class CanalClient {private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);public static void main(String[] args) {// 第一步:与canal进行连接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "", "");connector.connect();// 第二步:开启订阅connector.subscribe();// 第三步:循环订阅while (true) {try {// 每次读取 1000 条Message message = connector.getWithoutAck(1000);long batchID = message.getId();int size = message.getEntries().size();if (batchID == -1 || size == 0) {LOGGER.info("当前暂时没有数据,休眠1秒");Thread.sleep(1000);} else {LOGGER.info("-------------------------- 有数据啦 -----------------------");printEntry(message.getEntries());}connector.ack(batchID);} catch (Exception e) {LOGGER.error("处理出错");} finally {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 获取每条打印的记录*/public static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {// 第一步:拆解entry 实体Header header = entry.getHeader();EntryType entryType = entry.getEntryType();// 第二步: 如果当前是RowData,那就是我需要的数据if (entryType == EntryType.ROWDATA) {String tableName = header.getTableName();String schemaName = header.getSchemaName();RowChange rowChange = null;try {rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {e.printStackTrace();}EventType eventType = rowChange.getEventType();LOGGER.info(String.format("当前正在操作表 %s.%s, 执行操作= %s", schemaName, tableName, eventType));// 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来if (eventType == EventType.QUERY || rowChange.getIsDdl()) {LOGGER.info("执行了查询语句:[{}]", rowChange.getSql());return;}// 第三步:追踪到 columns 级别rowChange.getRowDatasList().forEach((rowData) -> {// 获取更新之前的column情况List<Column> beforeColumns = rowData.getBeforeColumnsList();// 获取更新之后的 column 情况List<Column> afterColumns = rowData.getAfterColumnsList();// 当前执行的是 删除操作if (eventType == EventType.DELETE) {printColumn(beforeColumns);}// 当前执行的是 插入操作if (eventType == EventType.INSERT) {printColumn(afterColumns);}// 当前执行的是 更新操作if (eventType == EventType.UPDATE) {printColumn(afterColumns);// 进行删除缓存操作deleteCache(afterColumns, tableName, schemaName);}});}}}/*** 每个row上面的每一个column 的更改情况* @param columns*/public static void printColumn(List<Column> columns) {columns.forEach((column) -> {String columnName = column.getName();String columnValue = column.getValue();String columnType = column.getMysqlType();// 判断 该字段是否更新boolean isUpdated = column.getUpdated();LOGGER.info(String.format("数据列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));});}/*** 秒杀下单接口删除库存缓存*/public static void deleteCache(List<Column> columns, String tableName, String schemaName) {if ("stock".equals(tableName) && "test_my_db".equals(schemaName)) {AtomicInteger id = new AtomicInteger();columns.forEach((column) -> {String columnName = column.getName();String columnValue = column.getValue();if ("id".equals(columnName)) {id.set(Integer.parseInt(columnValue));}});// TODO: 删除缓存LOGGER.info("Canal删除stock表id:[{}] 的库存缓存", id);}}
}

方法的功能以及注解在里面写的很详细,我们直接跑程序进行测试,启动程序,当我们没有进行任何操作时,一直会处于等待的状态:

我们在数据库中进行更改UPDATE操作,把用户王二改成张三,然后再改回王二,Canal成功收到了两条更新操作,见下图:

我们再模拟一个删除Cache缓存的业务,在代码中有秒杀下单接口删除库存缓存的接口,更新操作后,我们刷新库存缓存。效果如下:

也可以成功监听到数据。简单的Canal使用就介绍到这里,剩下的发挥空间留给各位读者们。

猜你感兴趣
教你从0到1搭建秒杀系统-防超卖
教你从0到1搭建秒杀系统-限流
教你从0到1搭建秒杀系统-抢购接口隐藏与单用户限制频率
教你从0到1搭建秒杀系统-缓存与数据库双写一致
教你从0到1搭建秒杀系统-Canal快速入门(番外篇)
教你从0到1搭建秒杀系统-订单异步处理

更多文章请点击:更多…

参考文章:
https://blog.csdn.net/l1028386804/article/details/81208362
https://github.com/alibaba/canal/wiki/QuickStart
https://youzhixueyuan.com/database-master-slave-synchronization.html
https://www.jianshu.com/p/790a158d9eb3
https://blog.csdn.net/xihuanyuye/article/details/81220524
https://www.cnblogs.com/ivictor/p/5735580.html

教你从0到1搭建秒杀系统-Canal快速入门(番外篇)相关推荐

  1. 教你从0到1搭建秒杀系统-订单异步处理

    前面几篇我们从限流角度,缓存角度来优化了用户下单的速度,减少了服务器和数据库的压力.这些处理对于一个秒杀系统都是非常重要的,并且效果立竿见影,那还有什么操作也能有立竿见影的效果呢?答案是下单的异步处理 ...

  2. 教你从0到1搭建秒杀系统-缓存与数据库双写一致

    本文是秒杀系统的第四篇,我们来讨论秒杀系统中缓存热点数据的问题,进一步延伸到数据库和缓存的双写一致性问题. 在秒杀实际的业务中,一定有很多需要做缓存的场景,比如售卖的商品,包括名称,详情等.访问量很大 ...

  3. 教你从0到1搭建秒杀系统-抢购接口隐藏与单用户限制频率

    在前两篇文章的介绍下,我们完成了防止超卖商品和抢购接口的限流,已经能够防止大流量把我们的服务器直接搞炸,这篇文章中,我们要开始关心一些细节问题.对于稍微懂点电脑的,点击F12打开浏览器的控制台,就能在 ...

  4. 教你从0到1搭建秒杀系统-限流

    本文是秒杀系统的第二篇,主要讲解接口限流措施.接口限流其实定义也非常广,接口限流本身也是系统安全防护的一种措施,在面临高并发的请购请求时,我们如果不对接口进行限流,可能会对后台系统造成极大的压力,尤其 ...

  5. 教你从0到1搭建秒杀系统-防超卖

    各位读者好,最近笔者学了很多东西,其实都想跟大家进行分享,奈何需要将所学习的知识整理出来需要耗费大量的时间,包括总结,或各种图形以及写代码示例,所以可能更新的速度会比较慢.但大家放心,只要有时间我就会 ...

  6. 如何从0到1搭建物联网系统?

    如何从0到1搭建物联网系统? 2019年是一个好的开端,在互联网行业混迹3年,充分理解互联网行业关于用户思维.平台思维的诠释后,以及对于敏捷研发.协同工作等新思想新管理模式实践后,我准备回身物联网产业 ...

  7. yolov5使用2080ti显卡训练是一种什么样的体验我通过vscode搭建linux服务器对python-yolov5-4.0项目进行训练,零基础小白都能看得懂的教程。>>>>>>>>>第二章番外篇

    第二章番外篇:yolov5通过vscode搭建linux服务器对python-yolov5-4.0项目进行训练,零基础小白都能看得懂的教程.YOLOv5搭建的最快搭建方式,踩坑经历详谈 前期准备: 2 ...

  8. Debian 7.1.0 安装教程图解(——Debian系统轻量级快速安装法)

    Debian 7.1.0 安装教程图解(--Debian系统轻量级快速安装法) 目录 一.Debian 7.1.0最新安装光盘二.安装系统教程图解 三.DebianLNMP更多相关使用系列文章 一.D ...

  9. 【梅哥的Ring0湿润插入教程】【番外篇二】秒杀网游Lanucher直接开客户端

    [梅哥的Ring0湿润插入教程] Email:mlkui@163.com 转载请注明出处,谢绝喷子记者等,如引起各类不适请自觉滚J8蛋! 番外篇二:秒杀网游Lanucher直接开客户端 [湿润前言] ...

最新文章

  1. Flink1.7.2 sql 批处理示例
  2. .Net winform中嵌入Flash
  3. linux内核常用函数或宏
  4. selenium无界面chromedriver
  5. ARM GIC(六) GIC V3 电源/功耗管理 分析笔记。
  6. Linux挂载OneDrive
  7. EKF SLAM学习笔记02
  8. 短视频SDK测试tips
  9. 开发一个发送手机短信的计算机软件
  10. HDU 2565 放大的X【图形】
  11. range 小程序picker_微信小程序picker滚动选择器使用详解
  12. 【DONET学习笔记】C#与VB.NET除法运算的区别
  13. Python语言零基础入门教程(一)
  14. PTA L1-039 古风排版
  15. 现代金融体系的构成与课程的逻辑框架
  16. mac如何定时执行任务
  17. RadioButton+Fragment和XlistView及多条目加载
  18. Salesforce开发教程(上下)
  19. 故障管理:故障定级和定责
  20. 追溯世界第一个物联网系统:可乐机“Only”

热门文章

  1. [How TO]-git/gerrit配置方法
  2. [BSidesSF2019]slashslash
  3. Mysql数据库五大常用数据引擎
  4. SQLite 数据库注入总结
  5. aliyun服务器安装nc工具
  6. 【Python】快速设置 pip 源
  7. 【HTTPS】Let's Encrypt certbot renew
  8. 二叉树的四种遍历方法:前序、中序、后序、层次
  9. 【前端模块】HTML5标签
  10. 你值得拥有!一个基于 Spring Boot 的API、RESTful API 的项目