一、背景

二话不说,先上图

上图来自于官网(https://github.com/alibaba/canal),基本上涵盖了目前生产环境使用场景了,众所周知,Canal做数据同步已经是行业内标杆了。我们生产环境也用Canal监听binlog数据变更,然后解析成对应数据发送到MQ(RocketMQ)。一些非主流程业务,异步场景消费MQ处理即可。

但是我这篇文章,主要想聊一聊在做系统重构时,新老系统数据双向同步时Canal的使用场景。

注:关于系统重构的介绍我这里就不叙述了,大家可以看我之前写的系列文章:浅谈系统重构

二、关于双向同步

  1. 什么是双向同步?

所谓双向同步,就是老系统数据库数据往新系统数据库同步,新系统数据库同时也往老系统数据库同步。从而保证新系统,老系统数据库数据完全一致。系统重构时如果上线出现问题,随时能切回原来老系统,这也为灰度方案提供了底层保障。

  1. 一般同步如何做?各自优缺点是什么?

方案一: Dao层拦截方案

方案说明: 在Dao层打洞拦截所有写请求(insert,update,delete), 然后写入MQ队列,再通过消费MQ队列写入对应数据库。

优点: 这种方案实现比较简单。

缺点: 对于老系统数据库,可能有很多个服务在写入,如果从Dao层拦截,可能要修改很多地方,改动较大。

方案二: 利用Canal订阅解析Binlog

方案说明: 利用Canal订阅Binlog,解析成数据,再写入到对应数据库(这里可以直接写入,也可以先写入MQ,再消费MQ写入,推荐后者)。

优点:能够解决系统多处写入问题。

缺点:引入新的组件Canal,复杂度增加。

下面,我们就来实战操作一下方案二。

三、环境准备(Centos系统为例)

1. 安装Mysql

wget https://dev.mysql.com/get/mysql80-community-release-el8-1.noarch.rpm
yum install  mysql80-community-release-el8-1.noarch.rpm#禁用centos自带的mysql
yum module disable mysql -y
#安装
yum install mysql-community-server -y
#启动
systemctl start mysqld
#查看启动状态 提升 Active: active (running) 表示成功
systemctl status mysqld
#查看初始密码
grep 'temporary password' /var/log/mysqld.log
#初始密码登录
mysql -uroot -p'AXXXXX'  -hlocalhost -P3306
#修改ROOT密码
ALTER USER 'root'@'localhost' IDENTIFIED BY 'BXXXXX';

2、 环境部署

1)、查看当前mysql是否开启了binlog模式, 如果log_bin的值为OFF是未开启,为ON是已开启 。

SHOW VARIABLES LIKE '%log_bin%'

2)、若未开启需要修改/ect/my.cnf 开启binlog模式

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完之后重启mysql服务

3)、创建用户并且授权

create user canal@'%' IDENTIFIED by 'XXXX';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3、 Canal服务端安装

1)、canal下载地址

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2)、解压到指定目录

mkdir  canal-server-1.1.4
tar -zxf canal.deployer-1.1.4.tar.gz -C canal-server-1.1.4/

3)、修改配置文件 查看主库 binlog position

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000002 |     4526 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

修改配置文件 conf/example/instance.properties

# position info
canal.instance.master.address={IP}:3306
# 这里对应上面的File
canal.instance.master.journal.name=binlog.000002
# 这里对应上面的Position
canal.instance.master.position=4526# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=XXXX

4)、启动 canal-server

./bin/startup.sh
# 查看日志
tail -f logs/example/example.log

以上就完成了Canal-Server的单实例版本实现,生成环境集群环境一般是运维搭建,我们测试就用单实例版本。

关于Canal的HA机制设计下面简单介绍下,生产环境推荐使用。

canal的HA分为两部分,canal server和canal client分别有对应的HA实现


canal server:

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

canal client:

为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),这里就不展开介绍了,有兴趣的同学可以看下官方wiki。

四、演示环节

由于canal组件封装的代码太多,我花了几个晚上业余时间写的(请点个赞吧),代码已经开源至gitee,有需要的同学可以clone下来看。

gitee地址: https://gitee.com/bytearch/fast-cloud

目前已支持simple直连模式和zookeeper集群模式

下面演示canal-client-demo

  1. 新建库order_center ,并且创建表order_info

    CREATE TABLE `order_info` (`order_id` bigint(20) unsigned NOT NULL,`user_id` int(11) DEFAULT '0' COMMENT '用户id',`status` int(11) DEFAULT '0' COMMENT '订单状态',`booking_date` datetime DEFAULT NULL,`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,PRIMARY KEY (`order_id`),KEY `idx_user_id` (`user_id`),KEY `idx_bdate` (`booking_date`),KEY `idx_ctime` (`create_time`),KEY `idx_utime` (`update_time`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  2. 添加处理器handler

    @CanalHandler(value = "orderInfoHandler", destination = "example", schema = {"order_center"}, table = {"order_info"}, eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT,CanalEntry.EventType.DELETE})
    public class OrderHandler implements Handler<CanalEntryBO> {@Overridepublic boolean beforeHandle(CanalEntryBO canalEntryBO) {if (canalEntryBO == null) {return false;}return true;}@Overridepublic void handle(CanalEntryBO canalEntryBO) {//1. 更新后数据解析OrderInfoDTO orderInfoDTO = CanalAnalysisUti.analysis(OrderInfoDTO.class, canalEntryBO.getRowData().getAfterColumnsList());System.out.println("event:" + canalEntryBO.getEventType());System.out.println(orderInfoDTO);//2. 后续操作 TODO}
    }
    1. 添加配置

      canal:clients:simpleInstance:enable: truemode: simpleservers: XXXXX:11111batchSize: 1000destination: examplegetMessageTimeOutMS: 500#zkInstance:#   enable: true#   mode: zookeeper#   servers: 172.30.1.6:2181,172.30.1.7:2181,172.30.1.8:2181#   batchSize: 1000#   #filter: order_center.order_info#   destination: example#   getMessageTimeOutMS: 500

      配置说明:

      public class CanalProperties {/*** 是否开启 默认不开启*/private boolean enable = false;/*** 模式* zookeeper: zk集群模式* simple: 简单直连模式*/private String mode = "simple";/*** canal-server地址 多个地址逗号隔开*/private String servers;/*** canal-server 的destination*/private String destination;private String username = "";private String password = "";private int batchSize = 5 * 1024;private String filter = StringUtils.EMPTY;/*** getMessage & handleMessage 的重试次数, 最后一次重试会ack, 之前的重试会rollback*/private int retries = 3;/*** getMessage & handleMessage 的重试间隔ms* canal-client内部代码 的重试间隔ms*/private int retryInterval = 3000;private long getMessageTimeOutMS = 1000;
  1. 测试insert和update操作

    mysql> insert into order_info(order_id,user_id,status,booking_date,create_time,update_time) values(6666666,6,10,"2022-02-19 00:00:00","2022-02-19 00:00:00", "2022-02-19 00:00:00");
    Query OK, 1 row affected (0.00 sec)mysql> update order_info set status=20 where order_id=66666;
    Query OK, 0 rows affected (0.00 sec)
    Rows matched: 0  Changed: 0  Warnings: 0mysql>
  2. 测试结果

    2022-02-18 19:29:52.399  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   :
    ****************************************************
    * Batch Id: [11] ,count : [3] , memsize : [189] , Time : 2022-02-18 19:29:52.399
    * Start : [binlog.000003:18893:1645183792000(2022-02-18 19:29:52.000)]
    * End : [binlog.000003:19123:1645183792000(2022-02-18 19:29:52.000)]
    ****************************************************2022-02-18 19:29:52.405  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   :
    ----------------> binlog[binlog.000003:19056] , name[order_center,order_info] , eventType : INSERT ,tableName : order_info, executeTime : 1645183792000 , delay : 400msevent:INSERT
    OrderInfoDTO{orderId=6666666, userId=6, status=10, bookingDate=2022-02-19 00:00:00, createTime=2022-02-19 00:00:00, updateTime=2022-02-19 00:00:00}

    大功告成,到这一步就顺利完成了Canal订阅解析binlog步骤。

五、数据同步注意事项

抛下两个问题大家可以思考下

  1. 数据双向同步时,如何解决数据回环问题?

    例如新系统产生的数据,同步到老系统,不能又回流到新系统,如何解决?

  2. 数据顺序问题,如果写入到MQ,是否要保证顺序消费?如何实现?

  3. 当同步并发比较大,如何提高同步速度。

温馨提示: 此专题未完,以上问题我将在下一篇文章《系统重构数据同步利器之Canal实战篇-续》实现,大家可以提前思考一下。

六、 号外

欢迎大家关注”浅谈架构“ 公众号,不定期分享原创文章

有任何问题,欢迎私信我交流。


系统重构数据同步利器之Canal实战篇相关推荐

  1. 数据同步利器之Tapdata Cloud

    一.数据同步概述 数据同步,顾名思义就是对不同系统之间的数据进行同步处理.开发过程中遇到数据同步需求有很多,比如数据冗余备份.缓存实时更新.不同系统间基础数据实时同步等.同步方案的选择,需要根据具体的 ...

  2. 接口推送数据原理_数据同步组件(Canal)在珍爱网的应用与实践

    本文作者:珍爱网技术团队 二爷 随着公司业务的不断发展,公司对于实时报表的需求越来越旺盛,原则上来说,实时报表最好的实现方式的通过Spark,storm这类的技术去支撑,由于人手原因,并不能很好的支撑 ...

  3. 数据同步的解决方案Canal

    Canal实现数据同步的原理: 1.是根据模拟mysql slave的主从交互协议,伪装自己是mysql slave,向mysql master发送dump请求. 2.mysql master收到du ...

  4. 中控考勤与海威达C6考勤系统考勤数据同步

    公司的老考勤机坏了,换成中控的S30考勤机了,考勤数据需要合并在一起,特写此存储过程,加入到每天的作业中让其自动执行.考勤统计仍然使用C6的,所以将考勤数据同步到C6的考勤里面. ---------- ...

  5. 企业系统之间数据同步处理

    引用内容: 数据同步一般是指一个数据源的数据发生改变时,其他相关的数据源的数据也发生相应变化.数据同步可以有五种实现方案,根据具体需求不同,可以采取不同方案. 1. 触发器:在源数据库建立增.删.改触 ...

  6. redis 备份导出rdb_Redis数据迁移利器之redisshake

    " 当需要进行Redis实例或集群数据迁移时,我们可以采用导出/导入的方式进行数据迁移,但当需要做数据异地灾备或双活时,再使用传统的方式就不合适了,我们需要借助工具(如redis-port/ ...

  7. 从青铜到王者的路线,java不同系统间数据同步

    深耕技术,啃下22个技术点 互联网行业更新换代非常快,行业常态便是不断学习,因此这些主流技术你一个都不能落下! ①并发编程 Java并发编程是整个Java开发体系中最难以理解,但也是最重要的知识点之一 ...

  8. 数据分析利器之Excel功能篇

    数据说·实操季 先相信自己,然后别人才会相信你.   --罗曼·罗兰 导读:今天我们要介绍的关于Excel功能的系列内容,在数据分析行业里面的地位是举足轻重的.从使用范围来看,微软办公软件Office ...

  9. windows系统下oracle数据库rman备份记录(实战篇)

    在windows 2003系统中装了oracle10G数据库,同时建了多个实例,这个备份应该怎么做呢? 在网上查了N多的资料,终于了解了一些: 冷备:把数据库已执行关闭后,对数据库做全备:需要停机,一 ...

最新文章

  1. 《深度探索C++对象模型》--5 构造析构拷贝 6 执行期语意学
  2. 【Yoshua Bengio 亲自解答】机器学习 81 个问题及答案(最全收录)
  3. 【MATLAB统计分析与应用100例】案例014:matlab读取Excel数据,调用stepwise函数作交互式逐步回归分析
  4. string::size_type
  5. 在matlab中求协方差,matlab里面的求协方差函数
  6. aop的实现原理_非Spring管理Bean如何添加AOP呢?
  7. 微商如何打印电子面单
  8. 纽约首次尝试在大桥上识别车内司机面孔 失败得很彻底
  9. Maven 私服 Version policy mismatch, cannot upload SNAPSHOT content to RELEASE repositories for file‘0’
  10. Echarts数据可视化series-scatter散点图,开发全解+完美注释
  11. php自定义函数表格,自定义函数table()
  12. WPF 中依赖属性的继承(Inherits)
  13. 数据分析师的前世今生
  14. JDK6中synchronized优化之锁升级
  15. 怎么用谷歌或百度搜自己在csdn内的文章呢?
  16. 执行安装操作的时候,出现丢失MSVCR120.dll的解决方法
  17. 机器学习-多元分类/回归决策树模型(tree包)
  18. 如何让ARM板开机启动Qt
  19. 轻流入选|国际权威研究机构「2021年低代码平台中国市场现状分析报告」发布
  20. 怒赞!7个下载UI组件包的顶级网站

热门文章

  1. 图解Word2vec
  2. java 格式化 浮点数_如何在javascript中格式化浮点数?
  3. JavaScript中Set的使用
  4. 第一单元 用python学习微积分(五) 隐函数微分法和逆函数导数(上)- 隐函数微分
  5. vue——实现组织架构图(vue-org-tree)——技能提升
  6. 【基于人脸特征的心率检测研究】非接触式光电容积图和红外人脸视频瞬时心率估计
  7. AdGuard推荐设置
  8. 树莓派连接WiFi连不上
  9. 兼容exe的linux系统,国产操作系统下想运行EXE?你何不虚拟个电脑系统出来
  10. HashMap灵魂26问