之前已经分享了一篇关于springboot + canal实现缓存更新的功能,但这种方案存在缺陷,当项目属于微服务架构时,一个服务可能有多个实例,即使多个实例都在监听canal的消息,但是只有一个实例能够消费该消息以此更新缓存,针对只是Redis做缓存的情况下没有问题,但是多级缓存Caffeine+Redis就有问题了,只有一个服务实例能够更新本地的Caffeine缓存,其他服务实例就无法同步更新。因此我想到可以RabbitMQ实现多服务实例的缓存同步更新操作。而canal本身就可以与RabbitMQ联合使用。

在Linux服务器上利用docker部署MySQL和RabbitMQ后,再部署canal,这次部署和之前有差异。

首先启动容器:

[root@iZ2vc97qcawcm6uqblda2fZ ~]# docker run -d canal/canal-server:v1.1.5
ddcce88c2050213f584849704ddb9401f6a8be1a923b1987f49729ea558e3a31

然后从容器中拷贝配置文件

[root@iZ2vc97qcawcm6uqblda2fZ canal]# docker cp ddcce88c2050:/home/admin/canal-server/conf/canal.properties ./conf/
docker cp ddcce88c2050:/home/admin/canal-server/conf/example/instance.properties ./conf/

修改canal.properties配置文件信息:


##################################################
#########           RabbitMQ         #############
##################################################
#不能加上端口号
rabbitmq.host =47.108.105.49
rabbitmq.virtual.host =/
rabbitmq.exchange =canal.exchange
rabbitmq.username =guest
rabbitmq.password =guest
rabbitmq.deliveryMode =fanout

修改instance.properties配置文件:

#myqsl主库地址
canal.instance.master.address=47.108.105.49:3306#mysql的binlog文件名
canal.instance.master.journal.name=mysql-bin.000006#正则表达式过滤需要监听的数据库
canal.instance.filter.regex=recl\\..*

重启canal-server:

​
[root@iZ2vc97qcawcm6uqblda2fZ canal]# docker run -d \
> -v /docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
> -v /docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
> -p 11111:11111 \
> --name canal \
> canal/canal-server:v1.1.5
303291f6de8d7b6b77928e2aece00e52573b199f90c5000ff3102b72c2736268​

创建springboot项目,引入rabbitMQ起步依赖:

        <!-- RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.properties中的配置信息:

# RabbitMQ配置
spring.rabbitmq.host=47.108.105.49
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.direct.prefetch=1

创建canal消息监听类:

package com.ren.fm.listener;import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.ren.fm.constant.CachePrefix;
import com.ren.utils.canal.CanalBean;
import com.ren.utils.canal.CanalType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @ClassName: CanalMQListener* @Description: TODO* @Author: RZY* @DATE: 2022/7/17 9:13* @Version: v1.0*/@Component
@Slf4j
public class CanalMQListener {@AutowiredCachePrefix cachePrefix;@RabbitListener(queues = "canal.queue.fm.8001")public void CanalMessageHandler(Message message, Channel channel) {//获取消息体内容CanalBean canalBean = null;try {//序列化消息体canalBean = JSON.parseObject(new String(message.getBody(), StandardCharsets.UTF_8), CanalBean.class);log.info("来自队列 {} 的消息", 8001);log.info("收到canal消息: {}", canalBean);String tableName = canalBean.getTable();if(CachePrefix.isContainsTableName(tableName) && !canalBean.isDdl()) {//当为UPDATE时,更新缓存if(canalBean.getType().equals(CanalType.UPDATE)) cachePrefix.updateCache(tableName, canalBean);//当为DELETE时,删除缓存if(canalBean.getType().equals(CanalType.DELETE)) cachePrefix.deleteCache(tableName, canalBean);}} catch (Exception e) {log.info("canal消息处理失败: {}", canalBean);throw new RuntimeException(e);}}
}

创建交换机和相关队列:

启动项目测试:

①修改数据库中的某条记录

②查看RabbitMQ控制台(收到了日志信息):

③启动springboot项目(缓存更新成功):

SpringBoot + RabbitMQ + canal实现缓存更新操作相关推荐

  1. SpringBoot整合canal实现缓存更新

    canal是阿里巴巴的开源组件,用于监听MySQL的binlog日志而实现消息的同步机制,提供增量数据订阅和消费. canal必须基于MySQL的主从架构才可使用,canal会伪装成MySQL的一个s ...

  2. springboot集成canal,实现缓存实时刷新,驼峰问题

    1.cancl安装 下载路径:cancl下载路径 下载完安装包,安装完成后,需要修改conf\example路径下配置文件instance.properties:设置position info和tab ...

  3. SpringBoot配置使用Redis缓存

    1.添加依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>sp ...

  4. 更新操作先删除缓存后更新DB,还是先更新DB后删除缓存问题???

    以前自己在学校学习redis的时候还真没想到这么多,上班后看公司的项目代码,发现都是先更新DB,然后删除缓存,而且更新DB后不会立马将DB数据放入缓存,然而我以前不管是查询还是update都是操作完D ...

  5. SpringBoot RabbitMQ 商品秒杀【SpringBoot系列15】

    SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见. 程序员每天的CV 与 板砖,也要知其所以然,本系列课程可以帮助初学者学习 SpringBooot 项目开发 与 SpringCl ...

  6. 讲讲 Redis 缓存更新一致性

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:Finley www.cnblogs.com/Finley/ ...

  7. springboot+mybatis集成自定义缓存ehcache用法笔记

    今天小编给大家整理了springboot+mybatis集成自定义缓存ehcache用法笔记,希望对大家能有所办帮助! 一.ehcache介绍 EhCache 是一个纯Java的进程内缓存管理框架,属 ...

  8. 利用SpringBoot+RabbitMQ,实现一个邮件推送服务

    一.流程图 本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列,实现生产者与消费者解耦,所以有必要先贴出来,涵盖了 RabbitMQ 很多知识点,如: 消息发送确认机制 消费确认机制 消 ...

  9. SpringBoot (14)---使用Redis缓存

    SpringBoot 中使用Redis缓存 在项目中对数据的访问往往都是直接访问数据库的方式,但如果对数据的访问量很大或者访问很频繁的话,将会对数据库来很大的压力,甚至造成数据库崩溃.为了解决这类问题 ...

最新文章

  1. 开源使得所有的软件卖成白菜价,但终将普惠世界!
  2. php微信墙开发,Node.js如何开发微信墙
  3. U-Boot启动过程--详细版的完全分析
  4. ELSE 技术周刊(2017.11.27期)
  5. POSIX线程专有数据的空间释放问题,pthread_key_create
  6. 文本分类模型_【文本分类】几个可作为Baseline的模型
  7. ARP、RARP、ICMP、ping
  8. 飞秋命令行发送消息和文件
  9. 对称加密和非对称加密、公钥和私钥、单向认证和双向认证、数字签名、数字证书、根证书
  10. node.js+uni计算机毕设项目鲸落图书商城小程序LW(程序+小程序+LW)
  11. leetcode 初级算法 数组
  12. 电脑WIN XP蓝屏错误代码大全查询
  13. 局部刷新的两种实现方式
  14. Nachos实验实现线程id、限制线程数和更改调度算法(按优先级调度)
  15. tick_params()--matplotlib
  16. 弘辽科技:拼多多专属推广怎么设置时间。
  17. 【5分钟教你】3种实现验证码功能-数字短信验证码-图形验证码-滑动验证码
  18. Git基于已有分支创建分支
  19. 【自定义表单】自定义表单设计
  20. js实现父页面的刷新

热门文章

  1. 0921深度学习硬件CPU和GPU
  2. 【NLP】第 3 章 :BERT
  3. ORB-SLAM2:(二)Monocular/Stereo/RGB-D数据集
  4. 福建数字校园一卡通投入使用一卡一刷就搞定
  5. 如何在线完成视频转gif制作?分享一招视频秒转gif
  6. Win10C盘文件夹内容详解(持续更新,欢迎留言)
  7. 蚂蚁集团重组支付宝高层,井贤栋辞去支付宝法人和董事长职位
  8. 2019年4月9日 星期二(退休不是目的 自由才是)
  9. Excel数据分析学习笔记(一)数据分析六步法和重要分析模型
  10. Css中常用中文字体的Unicode编码对照