目录

  • (一)分布式锁
    • (1)本地锁
    • (2)分布式锁演进——阶段一(加锁)
    • (3)分布式锁演进——阶段二(锁超时)
    • (4)分布式锁演进——阶段三
    • (5)分布式锁演进——阶段四
    • (6)分布式锁核心
  • (二)Redisson——专业的分布式锁解决方案
    • (1)配置,使用程序化配置(来自官网)
    • (2)使用redisson加锁,自动续期
    • (3)Redisson-lock看门狗原理
    • (4)redisson读写锁测试
    • (5)redisson信号量(semaphore)
    • (6)闭锁(CountDownLatch)
    • (7)案例终极优化
  • (三)缓存数据的一致性
    • (1)双写模式
    • (2)失效模式
    • (3)缓存一致性,解决方案
    • (4)缓存一致性解决-Canal

(一)分布式锁

锁是为了解决缓存击穿问题,防止所有请求突然全部查询数据库中某个数据。加锁后,请求一个一个进入,缓解数据库的压力。

(1)本地锁

案例是之前缓存中整理的例子。

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithLocalLock(){//只要是同一把锁,就能锁住需要这个锁的所有线程//1.synchronized(this):SpringBoot所有的组件在容器中都是单例的。synchronized(this){//得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询String catelogJSON = redisTemplate.opsForValue().get("catelogJSON");List<CategoryEntity> selectList = baseMapper.selectList(null);if(!StringUtils.isEmpty(catelogJSON)){//缓存不为null,直接返回Map<String,  List<Catelog2Vo>> result = JSON.parseObject(catalogJsonFromDb,new TypeReference<Map<String, List<Catelog2Vo>>{});return result;}//............查询数据库的操作..............//3.查到的数据再放入缓存,将对象转为json放在缓存中String s = JSON.toJSONString(catelog2Vos);redisTemplate.opsForValue().set("catelogJSON",s,1,TimeUnit.Days);return catelog2Vos;}
}

本地锁使用synchronized(this),只能锁住当前进程,当服务部署多个,会有多个同时查询数据库

为了锁住所有的东西,我们就需要加分布式锁。

(2)分布式锁演进——阶段一(加锁)

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock(){//1. 占分布式锁,去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","11111");if(lock){//加锁成功。。。执行业务Map<String, List<Catelog2Vo>>  dataFromDb = getDataFromDb();redisTemplate.delete("lock");//执行完毕,删除锁return dataFromDb;}else{//加锁失败...重试。synchronized//休眠100ms重试return getCatalogJsonFromDbWithRedisLock(); //自旋的方式}
}

存在的问题:如果getDataFromDb()程序出错,直接退出该方法,还未来得及删锁。将会造成死锁。为了解决此问题,设置锁的过期时间,出现了阶段二。

(3)分布式锁演进——阶段二(锁超时)

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock(){//1. 占分布式锁,去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","11111");if(lock){//加锁成功。。。执行业务//2.设置过期时间redisTemplate.expire("lock",30,TimeUnit.SECONDS);Map<String, List<Catelog2Vo>>  dataFromDb = getDataFromDb();redisTemplate.delete("lock");//执行完毕,删除锁return dataFromDb;}else{//加锁失败...重试。synchronized//休眠100ms重试return getCatalogJsonFromDbWithRedisLock(); //自旋的方式}
}

存在的问题:若是刚拿到锁,机器宕机了,还是会死锁。占锁与设置过期时间不是原子操作。

解决方式:过期时间和占位必须是原子操作。由此,出现了阶段三

(4)分布式锁演进——阶段三

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock(){//1. 占分布式锁,去redis占坑并设置过期时间,必须和加锁是同步的,原子的Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","11111",300,TimeUnit.SECONDS);if(lock){//加锁成功。。。执行业务Map<String, List<Catelog2Vo>>  dataFromDb = getDataFromDb();redisTemplate.delete("lock");//执行完毕,删除锁return dataFromDb;}else{//加锁失败...重试。synchronized//休眠100ms重试return getCatalogJsonFromDbWithRedisLock(); //自旋的方式}
}

存在的问题:业务执行时间过长,锁过期了。别的请求占有锁,我们可能把别人的锁删除了。

(5)分布式锁演进——阶段四

设置锁的值是uuid,删除时取值,判断和当前值是否一致,一致再删除

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock(){//1. 占分布式锁,去redis占坑String uuid = UUID.randomUUID().toString();Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,300,TimeUnit.SECONDS); //2.设置过期时间,必须和加锁是同步的,原子的if(lock){//加锁成功。。。执行业务Map<String, List<Catelog2Vo>>  dataFromDb = getDataFromDb();//获取值对比+对比成功删除=原子操作String lockValue = redisTemplate.opsForValue().get("lock");if(uuid.equals(lockValue)){//删除我自己的锁redisTemplate.delete("lock");}return dataFromDb;}else{//加锁失败...重试。synchronized//休眠100ms重试return getCatalogJsonFromDbWithRedisLock(); //自旋的方式}
}

存在的问题:判断是否是当前值与删除锁不是原子操作,有可能在判断完刚好锁过期。

解决:使用redis+Lua脚本,保证原子性

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock(){//1. 占分布式锁,去redis占坑String uuid = UUID.randomUUID().toString();Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,300,TimeUnit.SECONDS);//设置过期时间,必须和加锁是同步的,原子的if(lock){//加锁成功。。。执行业务Map<String, List<Catelog2Vo>>  dataFromDb = getDataFromDb();//获取值对比+对比成功删除=原子操作 Lua脚本解锁//KEYS[1]) == ARGV[1] ,相当于key和值String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),Arrays.asList("lock"),uuid); //lock1为0删除失败,lock1为1删除成功return dataFromDb;}else{//加锁失败...重试。synchronized//休眠100ms重试return getCatalogJsonFromDbWithRedisLock(); //自旋的方式}
}

(6)分布式锁核心

总结

  • 三要素:加锁,解锁,锁超时
  • 加锁保证原子性,解锁保证原子性

解决方案

  1. 使用set key value 过期时间 nx 加锁(nx表示值不存在才能设置成功)
  2. value的设置不要使用固定字符串,使用随机字符串
  3. 通过Lua脚本删除指定的锁,而不是Del命令。

另外的问题,关于redis过期后,自动续期

简单的做法,把过期时间延长,查询数据库的部分加try…finally…不管最终是否出现问题都删除锁。

(二)Redisson——专业的分布式锁解决方案

(1)配置,使用程序化配置(来自官网)

Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useClusterServers()//可以用"redis://"来启用SSL连接.addNodeAddress("redis://127.0.0.1:7181");

本案例中添加的redisson配置:MyRedissonConfig.class

@Configuration
public class MyRedissonConfig{/******所有对Redisson的使用都是通过RedissonClient对象*/@Bean(destroyMethod="shutdown")public RedissonClient redisson() throws IOException{//1.创建配置Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");//2.根据Config创建出RedissonClient示例RedissonClient redissonClient = Redisson.create(config);return redissonClient;}
}

(2)使用redisson加锁,自动续期

锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时常,锁自动过期被删除。

加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

即使解锁代码没有执行,过了30s,锁自动过期。

@Autowired
RedissonClient redisson;@ResponseBody
@GetMapping("/hello")
public String hello(){//1.获取一把锁,只要锁的名字一样,就是同一把锁RLock lock = redisson.getLock("my-lock");//2.加锁lock.lock(); //进入了AQS同步队列排队,等待唤醒。默认加的锁都是30stry{System.out.println("加锁成功,执行业务。。。"+Thread.currentThread().getId());Thread.sleep(30000);}catch(Exception e){}finally{//3.解锁 假设解锁代码没有运行,redisson会不会出现死锁——不会,有默认超时时间。System.out.println("释放锁。。。"+Thread.currentThread().getId());lock.unlock();}return "hello";
}

(3)Redisson-lock看门狗原理

  1. 如果我们传递了锁的超时时间,就发送给Redis执行脚本,进行占锁,默认超时就是我们指定的时间。
 lock.lock(10,TimeUnit.Seconds); //10s自动解锁,自动解锁时间一定要大于业务的执行时间。锁时间到了后不会自动续期
  1. 如果我们未指定锁的超时时间,就使用30*1000【LockWatchdogTimeout看门狗的默认时间】。只要占锁成功,就会执行一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10秒都会自动再次续期,续成30秒

(4)redisson读写锁测试

写锁是一个排他锁(互斥锁,独享锁),读锁是一个共享锁。写锁没释放,读锁必须等待

// 写锁
@ResponseBody
@GetMapping("/write")
public String writeValue(){RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");RLock rLock = lock.writeLock(); //得到写锁String s = "";try{//1.改数据加写锁,读数据加读锁rLock.lock();s = UUID.randomUUID().toString();Thread.sleep(); redisTemplate.opsForValue().set("writeValue",s);}catch(Exception e){e.printStackTrace();}finally{rLock.unlock();}return s;
}//读锁
@ResponseBody
@GetMapping("/read")
public String readValue(){RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");RLock rLock = lock.readLock(); //加读锁String s = "";rLock.lock();try{s = redisTemplate.opsForValue().get("writeValue");}catch(Exception e){e.printStackTrace();}finally{rLock.unlock();}return s;
}

总结

  1. 读+读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
  2. 写+读:等待写锁释放
  3. 写+写:阻塞方式
  4. 读+写:有读锁;写也需要等待

只要有写的存在,都必须等待

(5)redisson信号量(semaphore)

可以做分布式限流,一共允许多少的流量,超额了等待或直接返回false。秒杀也可以用

/**场景:车库停车,一共只有3车位**/
@GetMapping("/park")
@ResponseBody
public String  park() throws InterruptedException{RSemaphore park = redisson.getSemaphore("/park");//park.acquire();//获取一个信号,获取一个值,占一个车位(redis中存在的是剩余的车位数量)boolean b = park.tryAcquire(); //直接返回获取结果,不做等待if(b){//执行业务}else{return "error";}return "ok=>"+b;
}@GetMapping("/go")
@ResponseBody
public String  go() throws InterruptedException{RSemaphore park = redisson.getSemaphore("/park");park.release();//释放一个车位return "ok";
}

(6)闭锁(CountDownLatch)

/**
**放假,锁门
* 1班没人了,
** 5个班全部走完,我们可以锁大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException{//在Redis中设置属性为door,值为5。RCountDownLatch door = redissonClient.getCountDownLatch("door");door.trySetCount(5);  door.await();  //等待闭锁都完成return "放假了";
}@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo() throws InterruptedException{RCountDownLatch door = redissonClient.getCountDownLatch("door");door.countDown();//计数减一return id+"班的人都走了。。。。";
}

(7)案例终极优化

 public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedissonLock(){//锁的名字。锁的粒度,越细越快//锁的粒度:具体缓存的是某个数据,11-号商品;product-11-lock ;product-12-lock product-lockRLock lock = redisson.getLock("CatalogJson-lock");lock.lock();Map<String, List<Catelog2Vo>>  dataFromDb;try{dataFromDb = getDataFromDb();}finally{lock.unlock();}return dataFromDb;
}

(三)缓存数据的一致性

缓存里的数据如何和数据库保持一致性

(1)双写模式

数据库数据修改,连带着修改缓存

存在的问题:大并发下,两个机器A,B同时先后请求。理论上应该是以机器B最后修改的数据为准,但是由于机器A处理比较慢,等机器B改完了,机器A才操作数据库,导致脏数据问题。

解决方案:给修改数据库和修改缓存操作加锁。看对这种误差的容忍度,若不需要过于精确,可以等待缓存中数据过期

(2)失效模式

数据库数据修改后,直接删除缓存中数据。等待下次主动查询进行更新

(3)缓存一致性,解决方案

(4)缓存一致性解决-Canal

一般解决方案:

  • 缓存的所有数据都加过期时间,数据过期下一次查询触发主动更新
  • 读写数据的时候,加上分布式读写锁。(经常写,经常读)

分布式锁及数据一致性相关推荐

  1. redis订阅执行一段时间自动停止_面试系列 redis 分布式锁amp;数据一致性

    分布式锁 多个系统同时操作一个redis,因为jvm锁是线程级别的,所以没有办法锁住多个系统. Redis锁实现: setnx key value 只有在key不存在时设置key的值 此时key相当于 ...

  2. 分布式锁和数据一致性的讨论——redis集群做分布式锁的风险

    文章目录 写在前面 分布式锁的三个属性 分布式锁就⼀定要实现这三个属性吗? 实现容错性 方法一:基于多个 Redis 节点实现分布式锁 问题一:进程可能会被挂起,直到锁的 TTL 过期 问题二:墙上时 ...

  3. 分布式场景下数据一致性的问题——【分布式锁】 Java常用技术方案

    2019独角兽企业重金招聘Python工程师标准>>> 前言: 由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题,那么就要利用分布式锁来解 ...

  4. etcd 笔记(08)— 基于 etcd 实现分布式锁

    1. 为什么需要分布式锁? 在分布式环境下,数据一致性问题一直是个难点.分布式与单机环境最大的不同在于它不是多线程而是多进程.由于多线程可以共享堆内存,因此可以简单地采取内存作为标记存储位置.而多进程 ...

  5. api 创建zookeeper客户端_zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  6. 真牛逼!我司用了7年的分布式锁方案...

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 提到数据一致性.操作原子性,诸如此类的一些与并发有关的词汇时不知道 ...

  7. 分布式下必备神器之分布式锁

    点击上方"方志朋",选择"设为星标" 做积极的人,而不是积极废人 今天这篇文章我们来聊聊在分布式环境下的一个神兵利器--分布式锁!在看这篇文章的时候,默认大家对 ...

  8. chubby分布式锁服务概述

    分布式锁与chubby 分布式锁,是控制分布式系统之间同步访问共享资源的一种方式.Chubby是一种面向松耦合的分布式系统的锁服务,通常用于为一个由适度规模的大量小型计算机构成的的松耦合的分布式系统提 ...

  9. 从零到一编码实现Redis分布式锁

    问: 不是有redission等现成工具吗?咋不用? 答: 不,我就想自己写一个! 陈建斌说 : 你这个男的怎么回事 ?! 有的同学,就是这么尿性.也能理解,不自己弄一下,怎么能理解透彻,那就一起来搞 ...

最新文章

  1. 用Discuz/UCenter账号实现Wifi登录认证
  2. frame,iframe,frameset用法和区别
  3. webpack+react多页面开发架构
  4. 基于DOS命令打war包
  5. sscanf的常见用法
  6. 旧闻新看 ---- 西门子为什么要收购TESIS PLMWare
  7. 什么是WEB?如何学习web
  8. 初识ObjectBox--Android平台
  9. Membership Leakage in Label-Only Exposures论文解读
  10. 腾讯云对象存储(cos) js jdk上传文件
  11. SubSonic学习(一)
  12. oppo手机android 版本号,OPPO R11有几个版本?OPPO R11各版本区别对比详细评测
  13. Python解决数字棒球游戏
  14. python 谷歌翻译接口_使用python调用谷歌翻译接口实现英文到中文的翻译
  15. 软件开发培训要学多久?怎么学?软件开发培训班多少钱?
  16. U盘 如何自定义U盘图标并彻底隐藏配置文件
  17. elasticsearch中对于空字符串的过滤操作
  18. Bootstrap浏览器兼容性
  19. XX一中母亲写给高一禽兽儿子的信
  20. 浙大数据结构课后题-堆的路径

热门文章

  1. Serverlet理解
  2. allocate,malloc,new和deallocate,free,delete的区别
  3. 首款有无代码系统搭建平台的固定资产管理系统EAM
  4. MoveWindow and SetWindowPos
  5. 克隆clone() (Java)
  6. HTTP状态码一共分5种类型
  7. 动态规划入门到熟悉,看不懂来打我啊
  8. linux安装nvm
  9. iis网站服务器端口设置,iis服务器端口设置方法
  10. 【附源码】Java计算机毕业设计宠物领养与物品捐赠小程序(程序+LW+部署)