上一篇 分布式缓存重建并发冲突问题以及zookeeper分布式锁解决方案, 主要讲解了分布式缓存重建冲突原因及利用zookeeper分布式锁解决缓存重建冲突问题,本篇接着上篇,实现上篇思路,带你利用zookeeper代码实现分布式锁解决重建缓存冲突问题。

缓存重建分析图

从上图我们可以看出:

  • 缓存主动更新
    我们监听kafka中的缓存操作消息队列,当接收到一个商品变更消息后,我们会立即根据源数据服务获取商品最新信息,然后更新到ehcache 和 redis cluster 中,这种情况笔者将之称为缓存主动更新
  • 缓存被动重建
    当nginx 请求获取商品信息时,发现redis cluster 和 ehcache 中都没有获取到相关商品信息,这时候就需要到源数据服务中拉取商品信息,这时候我们需要同步更新到redis cluster 和 ehcache 中,然后返回nginx 并进行nginx 本地缓存,这种情况笔者将之称为缓存被动重建

那么前面一章中,笔者分析了缓存重建情况

这里笔者在着重讲下,当缓存数据由于个方面因素(如LRU等算法)清理了,这时候缓存主动更新 和 缓存在高并发或者特殊情况下,同时进行时,缓存重建冲突就悲剧的发生了(注:上篇说多个缓存服务实例时,出现分布式缓存重建冲突没错,但是就算不是多缓存实例服务,单个也会发生,只要两者同时发生即可,这里着重补充一下

上篇讲了可以通过分布式锁方案解决

zookeeper分布式锁的解决逻辑思路

  • 变更缓存重建或者空缓存请求重建,更新redis之前,先获取对应商品id的分布式锁
  • 拿到分布式锁后,做时间版本比较,如果自己的版本新于redis中的版本,那么就更新,否则就不更新
  • 如果拿不到分布式锁,那么就等待,不断轮询等待,直到自己获取到分布式的锁

那下面我们就进入coding 环节,来吧!(注:以下代码层面只针对分布式锁,其他不做介绍,具体其他设计实现会单独剥离讲解

首先,我们来写个zookeeper 工具类,提供 获取锁 acquireDistributedLock、释放锁releaseDistributedLock 方法

  • 添加zookeeper client 依赖(注:如果你添加了kafka 依赖,这里就不需要单独依赖了
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.5</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions>
</dependency>
  • 创建 zookeeperSession 工具类,代码如下,对应代码都有注释说明,故不做过多解释了
/*** * zookeeper 分布式锁工具类* @author bill* @date 2017年10月3日 上午11:39:08*/
public class ZookeeperSession {private ZooKeeper zookeeper;//计数器(同步锁),连接信号量,用于控制并发请求时,确保 zookeeper client 与 server 已连接private static CountDownLatch connectSemaphore = new CountDownLatch(1);private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperSession.class);public ZookeeperSession(){try {// 连接 zookeeper serverthis.zookeeper = new ZooKeeper("192.168.0.16:2181,192.168.0.17:2181,192.168.0.18:2181", 50000, new ZookeeperWatcher());// 等待,保证 client、server连接connectSemaphore.await();LOGGER.debug(" zookeeper session established ...");} catch (Exception e) {e.printStackTrace();}}/***  获取分布式锁* @param productId 商品id*/public void acquireDistributedLock(Long productId){String path = "/product-lock-" + productId;try {// 尝试获取分布式锁zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);LOGGER.debug("success to acquire lock for productId [{}]", productId);} catch (Exception e) {// 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止int count = 0;while(true){try {// 睡眠一下下,为了测试效果,生产环境,可以20msThread.sleep(1000);// 再次尝试获取分布式锁zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e2) {// 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止LOGGER.debug("repeat to acquire lock for productId:[{}] - count:[{}] ...", productId, count);count ++;continue;}LOGGER.debug("success to acquire lock for productId:[{}] after count:[{}] repeat 。。。", productId, count);break;}}}/*** 释放分布式锁* @param productId 商品id*/public void releaseDistributedLock(Long productId){String path = "/product-lock-" + productId;try {// 删除node,释放锁zookeeper.delete(path, -1);} catch (Exception e) {e.printStackTrace();}}/*** 创建 zookeeper session watcher* @author bill* @date 2017年10月3日 下午12:05:21*/private class ZookeeperWatcher implements Watcher{@Overridepublic void process(WatchedEvent evt) {LOGGER.debug("receive zookeeper watched event: {}", evt.getState());if(KeeperState.SyncConnected == evt.getState()){// client、server 已连接 是否等待信号量锁connectSemaphore.countDown();}}}/*** 单例有很多种方式去实现,这里采取绝对线程安全的一种方式* 静态内部类的方式,去初始化单例*/private static class Singleton {private static ZookeeperSession instance;static{instance = new ZookeeperSession();}public static ZookeeperSession getInstance(){return instance;}}/*** jvm 的机制去保证多线程并发安全* 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化*/public static ZookeeperSession getIntance(){return Singleton.getInstance();}/*** 初始化单例 zookeeperSession*/public static void init(){getIntance();}
}

好,工具类写完了呢,我们不要忘了,对它进行初始化,笔者这里就直接在 InitListener 监听器中初始化了,如下图:

初始化zookeeper

那,接下来要干什么呢,自然就是缓存主动更新 或者 缓存被动重建代码中,加入分布式锁啦,让多个请求串行执行,即可

下面笔者先讲 缓存主动更新 中怎么做,既然我们是通过接收kafka 商品变更消息去更新缓存,那么对应的就是在消费kafka 消息的时候先获取分布式锁,得到锁后,对比时间版本,决定是否更新缓存

缓存主动更新

我们找呀找,终于找到了消费kafka 商品变更消息线程 KafkaMessageProcessor

在更新之前先获取锁,得到锁后,先获取redis 中 数据跟 当前商品数据时间版本对比,当前数据比缓存数据更靠后(更新),则更新,相关代码如下:

        // 在数据写入redis 缓存之前,先获取 zookeeper 分布式锁,确保缓存重建冲突ZookeeperSession zkSession = ZookeeperSession.getIntance();zkSession.acquireDistributedLock(productId);// 获取到了锁// 先从redis 中获取当前最新数据ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productId);if(null != redisLastProductInfo){//比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redistry {Date date = sdf.parse(productInfo.getModifiedTime());Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());if(date.before(redisLastProductInfoDate)){LOGGER.debug("无需更新  > 现有数据 date:[{}] - before redis 最新版本  date:[{}]", date, redisLastProductInfoDate);// 无需更新,直接返回return;}} catch (ParseException e) {e.printStackTrace();}LOGGER.debug("current product info date is after redis product info date , to update redis");}else{LOGGER.debug("product Info is null, to update redis");}/** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- start*/try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}/** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- end*/// 更新本地 ehcache 缓存cacheService.saveProductInfo2LocalCache(productInfo);LOGGER.debug("获取刚保存到本地缓存的商品信息:[{}]", cacheService.getProductInfoFromLocalCache(productId));// 更新redis 缓存cacheService.saveProductInfo2RedisCache(productInfo);// 释放 zookeeper 分布式锁zkSession.releaseDistributedLock(productId);

好了,缓存主动更新就完了,其实就这么简单,你懂了没有?

接下来继续,缓存被动重建那就是从http 入手了,请求进来后,先到redis cluster中获取商品数据,发现没有,然后又到本地ehcache 中获取,发现也没有,这时候就到源数据服务中拉取mysql 商品数据,这时候是不是要更新到redis cluster 以及 ehcache 中呢,那是必须的,所以这里就有发生缓存重建冲突的可能。

缓存被动重建

注:这里的核心实现和缓存主动更新差不多,但是处理流程稍微有点不一样

  • 创建一个缓存重建队列,提供加入队列、获取队列数据方法
  • 创建一个缓存重建队列消费线程,设置商品数据缓存,同时做缓存重建冲突处理
  • 请求进来,如果缓存中都没有商品数据,到源数据服务拉取商品数据,然后将商品数据加入缓存重建队列,同时响应http 商品数据

下面直接 coding 吧!

  • 创建一个缓存重建队列 RebuildCacheQueue,这是一个单例类,代码如下:
/*** * 重建缓存的内存队列* @author bill* @date 2017年10月3日 上午11:39:48*/
public class RebuildCacheQueue {/*** 内存队列*/private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<ProductInfo>(1000);/*** 将商品信息对象加入队列* @param productInfo 商品信息对象*/public void putProductInfo(ProductInfo productInfo){try {queue.put(productInfo);} catch (InterruptedException e) {e.printStackTrace();}}/***  从队列中获取商品信息对象* @return 商品信息对象*/public ProductInfo takeProductInfo(){try {return queue.take();} catch (InterruptedException e) {e.printStackTrace();}return null;}/*** 单例有很多种方式去实现,这里采取绝对线程安全的一种方式* 静态内部类的方式,去初始化单例*/private static class Singleton {private static RebuildCacheQueue instance;static {instance = new RebuildCacheQueue();}private static RebuildCacheQueue getInstance(){return instance;}}/*** jvm 的机制去保证多线程并发安全* 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化*/public static RebuildCacheQueue getInstance(){return Singleton.getInstance();}
}
  • 创建一个缓存重建队列消费线程 RebuilCacheThread,进行重建缓存队列消费,没什么好说的了,上面流程已经讲了很清楚了,直接看代码吧:
/*** * 重建缓存队列消费线程* @author bill* @date 2017年10月3日 上午11:52:40*/
public class RebuilCacheThread implements Runnable{private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProcessor.class);private static final SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");@Overridepublic void run() {// 获取重建缓存队列实例RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance(); // 获取zookeeperSession 实例ZookeeperSession zkSession = ZookeeperSession.getIntance();CacheService cacheService = SpringContext.applicationContext.getBean(CacheService.class);while(true){ProductInfo productInfo = rebuildCacheQueue.takeProductInfo();// 获取zookeeper分布式锁zkSession.acquireDistributedLock(productInfo.getId());// 获取到了锁// 先从redis 中获取当前最新数据ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productInfo.getId());if(null != redisLastProductInfo){//比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redistry {Date date = sdf.parse(productInfo.getModifiedTime());Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());if(date.before(redisLastProductInfoDate)){LOGGER.debug("无需更新  > 现有数据 date:[{}] - before redis 最新版本  date:[{}]", date, redisLastProductInfoDate);// 无需更新,直接返回continue;}} catch (ParseException e) {e.printStackTrace();}LOGGER.debug("current product info date is after redis product info date , to update redis");}else{LOGGER.debug("product Info is null, to update redis");}// 更新本地 ehcache 缓存cacheService.saveProductInfo2LocalCache(productInfo);// redis 缓存cacheService.saveProductInfo2RedisCache(productInfo);// 释放 zookeeper 分布式锁zkSession.releaseDistributedLock(productInfo.getId());}}
}
  • http 请求时,无缓存数据,重建缓存,加入重建缓存队列
    /*** 获取商品信息* @param productId 商品id* @return 商品信息*/@GetMapping("/getProductInfo")public ProductInfo getProductInfo(Long productId){ProductInfo productInfo = null;try {productInfo = cacheService.getProductInfoFromRedisCache(productId);LOGGER.debug("从redis中获取缓存,商品信息: {}", productInfo); if(productInfo == null){productInfo = cacheService.getProductInfoFromLocalCache(productId);LOGGER.debug("从ehcache中获取缓存,商品信息: {}", productInfo);}if(productInfo == null){//走 数据源重新拉数据并重建缓存,注意这里笔者就直接写死数据了String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:01\"}";productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);// 将数据推送到一个内存队列中消费(重建缓存的内存队列)RebuildCacheQueue.getInstance().putProductInfo(productInfo);}} catch (Exception e) {}return productInfo;}

好了,缓存主动更新 和 缓存被动重建三部曲就完了,可以很清楚的看到,代码量不多,主要集中在更新缓存前获取分布式锁即可,结合缓存重建分析图

接下来要干啥呢,有个很重要的环节没有做,那就是测试,不到黄河不死心,看不到效果,我也不信,那我们就拿出来遛遛吧

代码测试

测试数据

缓存主动更新

kafka 生产数据:

 {"serviceId":"productInfoService","productId":10}

从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:00):

String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:00\"}";

缓存被动重建

http 请求:

http://localhost:81/getProductInfo?productId=10

从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:01):

String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:01\"}";

准备环境:
启动 redis cluster
启动 zookeeper 集群
启动 kafka 集群
启动 缓存服务(缓存项目服务)

注:由于笔者使用Windows 系统,可能避免windows 中 centos ip与主机名映射不了,所以作以下配置,推荐使用 SwitchHosts 的工具,很方便

C:\Windows\System32\drivers\etc\hosts*   ################################### 配置本地hosts #################### 很重要 ######################*   # 缓存架构方案*       192.168.0.16 my-cache1*       192.168.0.17 my-cache2*       192.168.0.18 my-cache3*   ################################### 配置本地hosts #################### 很重要 ######################

利用kafka-console-producer.sh 生产一条商品变更消息,并回车(由于代码中更新时为了演示效果,休眠了10s,把握时间)

cd /usr/local/kafka && bin/kafka-console-producer.sh --broker-list my-cache1:9092,my-cache2:9092,my-cache3:9092 --topic cache-message

商品消息如下:

{"serviceId":"productInfoService","productId":10}

浏览器 http 请求

http://localhost:81/getProductInfo?productId=10

测试效果

期望效果,先看到控制台打印kafka 的消费日志,等kafka 消费线程释放分布式锁后,才能看到缓存被动重建 获取分布式锁,并更新redis,并且返回modifiedTime:2017-10-3 12:30:01 的商品信息,同时redis cluster 中的数据也必须是 modifiedTime:2017-10-3 12:30:01 的商品信息,表示测试通过,这里都说得我有点迫不及待了,来吧,试试

主动更新缓存

浏览器请求

打印日志

redis cluster 最新商品数据

好了,是不是很激动呀,今天的讲解就到这里哈,赶紧去试试吧!

注:这里只是以商品信息为例来讲解利用分布式锁解决缓存重建冲突,其他如商铺信息等也是同理,这里希望告诉你的是方法,就不一一演示了。

代码地址附上:https://github.com/bill5/cache-project/tree/master/cache-cache

作者:逐暗者
链接:https://www.jianshu.com/p/151512add01e

架构系列---利用zookeeper 分布式锁解决缓存重建冲突实战相关推荐

  1. Spring Cache使用Redisson分布式锁解决缓存击穿问题

    文章目录 1 什么是缓存击穿 2 为什么要使用分布式锁 3 什么是Redisson 4 Spring Boot集成Redisson 4.1 添加maven依赖 4.2 配置yml 4.3 配置Redi ...

  2. 死磕 java同步系列之redis分布式锁进化史

    问题 (1)redis如何实现分布式锁? (2)redis分布式锁有哪些优点? (3)redis分布式锁有哪些缺点? (4)redis实现分布式锁有没有现成的轮子可以使用? 简介 Redis(全称:R ...

  3. java redis的同步_java同步系列之redis分布式锁进化史

    标题: 死磕 java同步系列之redis分布式锁进化史 - 彤哥读源码 - 博客园 转帖原地址: https://www.cnblogs.com/tong-yuan/p/11621361.html ...

  4. Redis分布式锁防止缓存击穿

    缓存击穿 和缓存穿透不同的是,缓存击穿是指:缓存中没有,但是数据库中存在的热点数据. 例如:首页的热点新闻,并发访问量非常大的热点数据,如果缓存过期失效,服务器会去查询DB,这时候如果大量的并发去查询 ...

  5. 【高并发秒杀系统】对分布式锁、缓存、消息队列、限流等的原理分析及代码实现

    前言:在一些商城项目中,秒杀是不可或缺的.但是,如果将普通的购买.消费等业务流程用于秒杀系统,不做任何的处理,会导致请求阻塞严重.超买超卖等严重后果,服务器.数据库也可能因为瞬时的流量而奔溃.所以,设 ...

  6. 关于分布式锁的面试题都在这里了|Reids分布式锁|ZooKeeper分布式锁

    我今天班儿都没上,就为了赶紧把这篇文章分布式锁早点写完.我真的不能再贴心了. 边喝茶边构思,你们可不要白嫖了! 三连来一遍? 引言 为什么要学习分布式锁? 最简单的理由就是作为一个社招程序员,面试的时 ...

  7. zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  8. Zookeeper分布式锁原理

    1.分布式锁介绍 单机应用开发,涉及并发同步的时候,我们往往采用synchronized 或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题. 但当我 ...

  9. 简单介绍redis分布式锁解决表单重复提交的问题

    在系统中,有些接口如果重复提交,可能会造成脏数据或者其他的严重的问题,所以我们一般会对与数据库有交互的接口进行重复处理.本文就详细的介绍一下redis分布式锁解决表单重复提交,感兴趣的可以了解一下 假 ...

最新文章

  1. RAID2.0核心思想:数据保护与物理资源管理域分离
  2. vue中使用百度地图为啥是空白_vue中使用腾讯地图(尝试篇)
  3. 多CPU,多核,多进程,多线程以及进程和线程的简单理解以及区别
  4. 前端学习(2370):组件之间的通讯方式
  5. CSS3学习笔记-技术提示
  6. android虚拟机的使用教程,Android 虚拟机可以这么用了 ?
  7. 元宇宙系统全面学习线路
  8. 我的世界电脑服务器怎么显示键盘,我的世界电脑版操作按键
  9. 使用百度ai识别身份证信息
  10. 二维码制作教程分享,大家一起来学习吧!
  11. 计算机显卡怎样安装方法,电脑装机小知识,新手如何正确安装好独立显卡
  12. 传智播客软件测试学习视频汇总:
  13. 王道数据结构课代表 - 考研数据结构 第五章 树和二叉树 究极精华总结笔记
  14. C. Ehab and Path-etic MEXs
  15. Apache 屏蔽YisouSpider一搜蜘蛛神马的方法
  16. 携职教育:中级职称有用吗?怎么评中级?
  17. Python小游戏——Pygame制作2048小游戏
  18. 优化器的RBO和CBO
  19. 每周3课:电机、舵机、LM35温度传感器的使用方法 | Mixly纯干货课程
  20. 互联网广告的背后是什么(3):今日头条DSP的基本信息和主要特点

热门文章

  1. 《草书识别》隐私政策
  2. 检测并发程序Bug:[PLDI2021] Canary: Practical Static Detection of Inter-thread Value-Flow Bugs
  3. [HTML5]配置Ngnix服务器支持manifest
  4. Koa洋葱圈模型源码浅析(`await next()`为什么能够形成洋葱圈模型?)
  5. 如何才能做到用户持续增长?
  6. stm32【RGB_LED_WS2812灯珠】
  7. 计算机开机无法选择用户界面,带有win10双系统中win10关机后开机无法进入启动菜单选择及BIOS界面的原因你知道吗?...
  8. vi、vim的使用 查找关键字命令(一文彻底搞懂)
  9. Ubuntu整个系统迁移到其他盘办法
  10. 安卓java 模拟点击类_Android模拟用户点击的实现方法