前言

上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。

架构设计

canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。上一篇文章 canal入门 中简单介绍了使用方式,即tcp模式;其实canal也是支持直接发送到MQ中,比如:Kafka、RocketMQ、RabbitMQ。本文采用Kafka讲解,实现mysql与redis之间的数据同步。

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

Kafka&Zookeeper搭建

首先在官网下载Kafka:

下载后解压文件夹,可以看到以下几个文件:

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

# 命令常见一个canaltopic 队列kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic

Canal搭建

canal搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的canal.properties配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式canal.serverMode = kafka# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况canal.instance.parser.parallelThreadSize = 16# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录下要有example同名的目录,可以配置多个canal.destinations = example

然后配置instance,找到/conf/example/instance.properties配置文件:​​​​​​​​​​​​​​

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0
# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0

经过上述配置后,就可以启动canal了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖​​​​​​​

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2、封装Redis工具类

在application.yml文件增加以下配置:

spring:    redis:    host: 127.0.0.1    port: 6379    database: 0    password: 123456

封装一个操作Redis的工具类:

​​​​​​​

@Componentpublic class RedisClient {
    /**     * 获取redis模版     */    @Resource    private StringRedisTemplate stringRedisTemplate;
    /**     * 设置redis的key-value     */    public void setString(String key, String value) {        setString(key, value, null);    }
    /**     * 设置redis的key-value,带过期时间     */    public void setString(String key, String value, Long timeOut) {        stringRedisTemplate.opsForValue().set(key, value);        if (timeOut != null) {            stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);        }    }
    /**     * 获取redis中key对应的值     */    public String getString(String key) {        return stringRedisTemplate.opsForValue().get(key);    }
    /**     * 删除redis中key对应的值     */    public Boolean deleteKey(String key) {        return stringRedisTemplate.delete(key);    }}

3、创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:spring:  kafka:      # Kafka服务地址    bootstrap-servers: 127.0.0.1:9092    consumer:      # 指定一个默认的组名      group-id: consumer-group1      #序列化反序列化      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    producer:      key-serializer: org.apache.kafka.common.serialization.StringDeserializer      value-serializer: org.apache.kafka.common.serialization.StringDeserializer      # 批量抓取      batch-size: 65536      # 缓存容量      buffer-memory: 524288

创建一个CanalBean对象进行接收:

public class CanalBean {    //数据    private List<TbCommodityInfo> data;    //数据库名称    private String database;    private long es;    //递增,从1开始    private int id;    //是否是DDL语句    private boolean isDdl;    //表结构的字段类型    private MysqlType mysqlType;    //UPDATE语句,旧数据    private String old;    //主键名称    private List<String> pkNames;    //sql语句    private String sql;    private SqlType sqlType;    //表名    private String table;    private long ts;    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等    private String type;    //getter、setter方法}
public class MysqlType {    private String id;    private String commodity_name;    private String commodity_price;    private String number;    private String description;    //getter、setter方法}
public class SqlType {    private int id;    private int commodity_name;    private int commodity_price;    private int number;    private int description;}

最后就可以创建一个消费者CanalConsumer进行消费:​​​​​​​

@Slf4j@Componentpublic class CanalConsumer {@Resource    private RedisClient redisClient;@KafkaListener(topics = "canaltopic")    public void receive(ConsumerRecord<?, ?> consumer) {        String value = (String) consumer.value();        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),                consumer.partition(), consumer.offset(), value);        //转换为javaBean        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);        //获取是否是DDL语句        boolean isDdl = canalBean.hasDdl();        //获取类型        String type = canalBean.getType();        //不是DDL语句        if (!isDdl) {            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();            //过期时间            long TIME_OUT = 600L;            if ("INSERT".equals(type)) {                //新增语句                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                    String id = tbCommodityInfo.getId();                    log.info("新增数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));                    //新增到redis中,过期时间是10分钟                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);                    log.info("从redis获取数据 result: {}", JSONObject.toJSONString(redisClient.getString(id)));                }            } else if ("UPDATE".equals(type)) {                //更新语句                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                    String id = tbCommodityInfo.getId();                    log.info("修改数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));                    //更新到redis中,过期时间是10分钟                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);                }            } else {                //删除语句                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                    String id = tbCommodityInfo.getId();                    log.info("删除数据从redis, id: {}", id);                    //从redis中删除                    redisClient.deleteKey(id);                }            }        }    }}

测试Mysql与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (  `id` varchar(32) NOT NULL,  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',  `number` int(10) DEFAULT '0' COMMENT '商品数量',  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

启动项目后,新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉烧包', '3.99', '3', '又大又香的叉烧包,老人小孩都喜欢');

可以在控制台看到以下输出:​​​​​​​

2022-01-02 18:12:51.317  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 新增数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}2022-01-02 18:12:51.320  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 从redis获取数据 result: "{\"commodity_name\":\"叉烧包\",\"commodity_price\":\"3.99\",\"description\":\"又大又香的叉烧包,老人小孩都喜欢\",\"id\":\"3e71a81fd80711eaaed600163e046cc3\",\"number\":\"3\"}"

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='很便宜的青菜包呀,不买也开看看了喂' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

同样可以在控制台看到以下输出:​​​​​​​

2022-01-02 18:14:44.613  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : topic名称:canaltopic,key:null,分区位置:0,下标:6,value:{"data":[{"id":"3e71a81fd80711eaaed600163e046cc3","commodity_name":"青菜包","commodity_price":"3.99","number":"3","description":"很便宜的青菜包呀,不买也开看看了喂"}],"database":"study","es":1641118484000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"varchar(36)","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"叉烧包","description":"又大又香的叉烧包,老人小孩都喜欢"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":12,"number":4,"description":12},"table":"tb_commodity_info","ts":1641118484602,"type":"UPDATE"}2022-01-02 18:14:44.616  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 修改数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"青菜包","commodity_price":"3.99","description":"很便宜的青菜包呀,不买也开看看了喂","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}

经过测试完全么有问题。

总结

既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等;尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。

如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。

canal+Kafka实现mysql与redis数据同步相关推荐

  1. Canal+Kafka实现mysql与redis数据准实时同步

    思维导图 文章已收录Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary 前言 在很多业务情况下,我们都会在系统中加入redis缓存 ...

  2. 使用canal解决Mysql和Redis数据同步问题

    前言 千呼万唤始出来,停了好个月,终于又开始动手写文章了,今天带给大家的是阿里的一个工具Canal,这个工具是企业做数据同步使用的比较多的方案,希望对你有所帮助,喜欢的话请给个好评 工作原理分析 我们 ...

  3. mysql与redis数据同步(c/c++)(写mysql同步到redis,并且以json格式保存)

    系统开发中时常会需要缓存来提升并发读的能力,这时可以通过mysql的UDF和hiredis来进行同步 原理: 通过mysql自动同步redis 在服务端开发过程中,一般会使用MySQL等关系型数据库作 ...

  4. Mysql和Redis数据同步策略

    为什么对缓存只删除不更新 不更新缓存是防止并发更新导致的数据不一致. 所以为了降低数据不一致的概率,不应该更新缓存,而是直接将其删除, 然后等待下次发生cache miss时再把数据库中的数据同步到缓 ...

  5. Mysql和Redis数据同步该怎么做

    前言 算法血拼:Google+百度+Alibaba+字节+Tencent+网易+360+拼夕夕+美团 不知不觉双11就来了,轰轰烈烈的秋招也完美结束了,不知算法与数据结构成为了多少小伙伴进击大厂的绊脚 ...

  6. 灵魂拷问!Mysql和Redis数据同步该怎么做?请查收

    前言 我们从一个问题引入今天的主题. 在日常业务开发中,我们可能经常听到 DBA 对我们说"不要"(注意:不是禁止)使用 join,那么为什么 DBA 对 join 这么抵触呢?是 ...

  7. Mysql和Redis数据同步该怎么做?含答案解析

    前言 关于技术人如何成长的问题,一直以来都备受关注,因为程序员职业发展很快,即使是相同起点的人,经过几年的工作或学习,会迅速拉开极大的差距,所以技术人保持学习,提升自己,才能够扛得住不断上赶的后浪,也 ...

  8. Canal监控MySQL数据库实现数据同步

    canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL. canal简介 需求:将MySQL中某些表的数据实时的同步到 ...

  9. Canal 实现 Mysql数据库实时数据同步

    简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...

最新文章

  1. C#语言与面向对象技术(4)
  2. c语言 文件名变量,C语言中,如何用根据不同的变量来更改文件名?
  3. html5学习笔记1
  4. 理解GBDT算法(三)——基于梯度的版本
  5. Dart.Powerweb.livecontrols应用
  6. ARM Keil MDK开发STM32工程模板
  7. Spring注释,我从来没有机会使用第1部分:@primary
  8. oracle table 函数使用
  9. xbox360fsd更新游戏封面_游戏类短视频创作指南
  10. oracle查询列属性,Oracle中查看所有的表,列,属性,…
  11. pannel添加的子窗体很大_在WordPress中添加简书风格的连载目录和文章导航
  12. 【目瞪口呆】通信机房内部长这样
  13. 雷达威力图绘制matlab,用Matlab语言实现雷达探测范围图的绘制
  14. Linux 实用指令 -- 关机、重启和用户登录注销、用户管理
  15. 同事说关键字查询用Mysql,我上去就是一个高压锅,用ElasticSearch不香吗?
  16. cocos渲染引擎分析(五)-----FBO实现多分辨率渲染
  17. PyCharm的Requirement already satisfied 解决方法
  18. 千万别活成自己最讨厌的样子
  19. Pytho学习(3)——注释
  20. 满分作文生成器:生活在代码上

热门文章

  1. mysql 主从 通俗易懂_MySQL 主从同步架构中你不知道的“坑”(完结篇)
  2. linux 内核定时器精度_linux使用select实现精确定时器详解
  3. Centos R安装
  4. 精简Linux文件路径
  5. dotnetnuke|dnn 内网实现自动登录
  6. ALV的颜色分为行的颜色、列的颜色和CELL的颜色
  7. express不是内部或外部命令,也不是可运行的程序或批处理文件
  8. java的反射和它的类加载机制
  9. 雷林鹏分享:PHP If...Else 语句
  10. 自动化运维-Ansible (第三部:Playbook 介绍)