redis ,redisson 分布式锁深入剖析
目录
为什么要用分布式锁?
分布式锁所遵循的原则?
redis 分布式锁
redis 原始分布式锁实现
加锁
释放锁
redis 分布式锁存在的问题
redisson 实现分布式锁
redisson 是什么
redisson 加锁方法源码剖析
redisson watchdog 是什么?
redisson 释放锁源码
redis ,redisson 分布式锁存在的问题
参考
为什么要用分布式锁?
为了保证一个方法或属性在高并发的情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制( jvm 锁),但是随着业务发展的需要,原来单体单机部署的系统被演化成分布式集全系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这时原来的jvm锁失效,单纯的应用并不能提供分布式锁的能力,为了解决这个问题,这个时候就需要 分布式锁了。
分布式锁所遵循的原则?
- 互斥性: 在分布式系统环境下, 一个锁只能被一个线程持有.
- 高可用: 不会发生死锁、即使客户端崩溃也可超时释放锁.
- 非阻塞: 获取锁失败即返回.
redis 分布式锁
redis 原始分布式锁实现
加锁
redis 分布式锁背景首先是基于setnx 实现,setnx 当key 不存在时才会创建value ,并且返回1 ,否则key 值存在,创建value 失败,返回0;基于这个属性,我们可以满足分布式做的互斥性。但是还会存在一个问题,比如客户端上锁后,还未释放锁,异常宕机或者hang 住了。这时候其他客户端就始终无法获取锁,造成业务不可用情况;
解决方案就是在给锁的key 设置过期时间,及expire key score ; 需要注意的是,要保证setNx 操作和expire 两个操作是原子性的,否则setNx 设置后,expire 还未执行,同样无法解决上述问题。redis 想要保证两个操作是原子性,可以通过lua 脚本来实现;实现方法如下
/***@描述 *@参数 @Param lockKey 锁key* @param lockSeconds 过期时间*@返回值 *@创建人 corn*@创建时间 2021/3/8*/private boolean doTryLock(String lockKey, int lockSeconds) {RedisScript<Boolean> SETNX_AND_EXPIRE_SCRIPT;StringBuilder sb = new StringBuilder();sb.append("if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n");sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");sb.append("\treturn true\n");sb.append("else\n");sb.append("\treturn false\n");sb.append("end");SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);return stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, Collections.singletonList(lockKey), lockValue,String.valueOf(lockSeconds));}
释放锁
锁的释放要遵循解铃还须系铃人,不可以出现a 把b 的锁给释放,这样的话锁就失去了意义;redis 释放锁首先判断释放锁的线程是否是加锁的线程,如果是允许删除,不是则无法删除key。集成实现下图所示
/***@描述 keys[1] 表示lockey 值,keys[2] 表示lockvalue 值*@参数 *@返回值 *@创建人 corn*@创建时间 2021/3/8*/public void unlock(String lockKey) {RedisScript<Boolean> DEL_IF_GET_EQUALS;StringBuilder sbr = new StringBuilder();sbr.append("if (redis.call('get', KEYS[1]) == ARGV[1]) then\n");sbr.append("\tredis.call('del', KEYS[1])\n");sbr.append("\treturn true\n");sbr.append("else\n");sbr.append("\treturn false\n");sbr.append("end");DEL_IF_GET_EQUALS = new RedisScriptImpl<Boolean>(sbr.toString(), Boolean.class);// 忽略结果stringRedisTemplate.execute(DEL_IF_GET_EQUALS, Collections.singletonList(lockKey), lockValue);}
redis 分布式锁存在的问题
上面的redis 分布式锁解决方案近乎完美,但是需要考虑的一种情况,就是锁续期的问题,比如我们锁的超时时间设置3s ,但是业务逻辑复杂还是其他原因导致在3s 内导致锁被释放了,这样其他的客户端同样拿到了锁,这样就没有做到锁的互斥性,同样出现并发问题。
解决问题的思路:
延长锁过期时间(治标不治本)
为锁添加守护线程,进行续期(推荐,redisson watch dog 实现)
基于redis 订阅 pub/sub 实现
redisson 实现分布式锁
基于redis 原始分布式锁的一些不便,可以考虑引入redisson 来解决这些问题;比如watch dog 就可以解决锁续期的问题;
redisson 是什么
redisson 是基于redis 基础上实现的java 驻内存数据网格,Redisson还采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能等等。详细可以参考redisson 官网了解;redisson 官网译文
springboot 集成redisson 也很简单,可以参考基于spring boot 实现redisson,redisson 分布式锁属性和使用文档
redisson 加锁方法源码剖析
/***@描述 redisson 加锁源码分析(redis lock 核心在RedssonLock类中)*@参数 @param waitTime 获取锁等待时间,等待时间未获取到锁,立刻返回* @param leaseTime 锁超时时间,查过锁超时时间立即释放* @param unit 时间单位*@返回值*@创建人 corn*@创建时间 2021/3/8*/public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);// 获取当前时间 和当前线程idlong current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 尝试加锁,如果加锁失败,则返回上个锁ttl 时间,返回null 表示加锁成功Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;} else {// 当前时间和进入方法时的时间的时间差如果大于waitTime 等待时间,则加锁失败返回false;time -= System.currentTimeMillis() - current;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);return false;} else {// 订阅当前线程id 释放事件current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);// 阻塞等待锁释放,返回false 表示当前等待时间超过锁最大等待时间,取消订阅,返回加锁失败;返回true 进入循环等待if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {this.unsubscribe(subscribeFuture, threadId);}});}this.acquireFailed(waitTime, unit, threadId);return false;} else {boolean var16;try {time -= System.currentTimeMillis() - current;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);boolean var20 = false;return var20;}do {// do..while 循环尝试获取锁long currentTime = System.currentTimeMillis();ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {var16 = true;return var16;}time -= System.currentTimeMillis() - currentTime;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);var16 = false;return var16;}currentTime = System.currentTimeMillis();// 利用共享锁来阻塞等待判断是否允许等待共享锁,允许则加入共享锁等待释放信号// 1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;// 2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。if (ttl >= 0L && ttl < time) {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;} while(time > 0L);this.acquireFailed(waitTime, unit, threadId);var16 = false;} finally {this.unsubscribe(subscribeFuture, threadId);}return var16;}}}/***@描述 tryAcquire 所调用的设置锁的核心方法*@参数 [waitTime, leaseTime, unit]*@返回值 boolean*@创建人 corn*@创建时间 2021/3/8*/private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 如果leaseTime 设置了非-1 值,则redisson 锁过释放事件未leaseTime。if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 如果未设置leaseTime 或者leaseTime 值为-1 ,则启用redisson的看门狗机制,redisson watch dog 默认lock时间是30s,然后会有一个守护线程// 每隔 lockWatchdogTimeout/3 (例如默认是10秒跑一次) 秒就会检查锁是否存在,如果存在则进行续期;RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining == null) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}/***@描述 加锁核心方法,最终还是通过lua 脚本来实现的*@参数 [waitTime, leaseTime, unit]*@返回值 boolean*@创建人 corn*@创建时间 2021/3/8*/<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));}}}
redisson watchdog 是什么?
通过redission 加锁源码阅读,了解到一个概念就是watch dog (俗称看门狗);他的作用,就是解决redis 原生分布式锁的锁续期的问题;首先需要注意的问题,就是watch dog 只有在不设置leaseTime ,或者leaseTime 为-1 时,才会有效,否则锁的强制过期时间是我们设置的leaseTime ;watch dog 默认锁的时间是30s ,他会每隔 lockWatchdogTimeout/3 (例如默认是10秒跑一次) 秒就会检查锁是否存在,如果存在则进行续期;
redisson 释放锁源码
/***@描述 *@参数 *@返回值 *@创建人 corn*@创建时间 2021/3/8*/public void unlock() {try {// 传入线程的id ,满足解锁时,解铃还须系铃人this.get(this.unlockAsync(Thread.currentThread().getId()));} catch (RedisException var2) {if (var2.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException)var2.getCause();} else {throw var2;}}}/***@描述 *@参数*@返回值 *@创建人 corn*@创建时间 2021/3/8*/public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise();// 传入线程id ,调用解锁核心方法RFuture<Boolean> future = this.unlockInnerAsync(threadId);// 解锁回调future.onComplete((opStatus, e) -> {this.cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);} else if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);result.tryFailure(cause);} else {result.trySuccess((Object)null);}});return result;}/***@描述 解锁核心方法;原理和redis 原生分布式锁相似*@参数*@返回值*@创建人 corn*@创建时间 2021/3/8*/protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,// 若锁存在,但唯一标识不匹配:则表明锁不被当前线程使用,当前线程不允许解锁其他线程持有的锁"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end;" +// 锁存在,则将可重入计数器-1" local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +// 若可重入计数器》0 表明当前线程还有锁还有可冲入,不能释放锁。并为其设置过期时间" if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); " +// 若可重入计数器《0 ,表明锁释放完毕,并通知订阅消息,去唤醒其他等待获取锁的线程"return 0; else redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; end; return nil;",Arrays.asList(this.getName(),this.getChannelName()),LockPubSub.UNLOCK_MESSAGE,this.internalLockLeaseTime,this.getLockName(threadId));}
redisson 释放锁过程中,出现了一个概念,就是可重入锁,可重入锁简单讲就是一个线程获取到了锁,可以再次获取锁而不出现死锁现象;synchronized,ReentrantLock 都是可重入锁;
redis ,redisson 分布式锁存在的问题
1. redis ,redisson 分布式锁的高可用,归根还是依赖于redis 的集群,redis 常用集群方式,基于官网的redis cluster,或者中间件codis (使用较多,相对官网redis cluster 更加成熟);假如在master->slave 同步数据 的时候,master 突然挂了,锁并么有同步到从服务器,此时经过从新选举出一个新的master ,并不知道redis 锁的信息,那么这是就会产生并发问题;
解决方案:
1.针对这个问题,redis 作者给出了一个方案:就是采取红锁(redlock) 的方式,大概思想是在理想情况采用N 个master 方式部署redis ,这些节点完全独立,不存在主从复制或者其他集群机制,在上锁时,同时向N 个master 发送加锁命令,如果成功加锁个数> n/2+1 (例如有3个master 节点,如果加锁成功为2,那么算加锁成功,否则加锁失败)则加锁成功,否则加锁失败;这种算法在某种意义上来讲,可以解决redis 单节点挂掉导致锁不存在问题;
2. 使用zk 锁
参考
1. redisson 操作文档
2. redlock 作者认为redlock 可行文档
redis ,redisson 分布式锁深入剖析相关推荐
- redis redisson 分布式锁 WRONGTYPE Operation against a key holding the wrong kind of value
在使用redisson加锁的时候报错如下 trylock WRONGTYPE Operation against a key holding the wrong kind of value 错误场景: ...
- Redis进阶- Redisson分布式锁实现原理及源码解析
文章目录 Pre 用法 Redisson分布式锁实现原理 Redisson分布式锁源码分析 redisson.getLock(lockKey) 的逻辑 redissonLock.lock()的逻辑 r ...
- 京东秒杀系统模块的Redis分布式锁深度剖析,没给你讲明白你打我
1|0背景 目前开发过程中,按照公司规范,需要依赖框架中的缓存组件.不得不说,做组件的大牛对CRUD操作的封装,连接池.缓存路由.缓存安全性的管控都处理的无可挑剔.但是有一个小问题,该组件没有对分布式 ...
- Redis实战——Redisson分布式锁
目录 1 基于Redis中setnx方法的分布式锁的问题 2 Redisson 2.1 什么是Redisson 2.2 Redisson实现分布式锁快速入门 2.3 Redisson 可重入锁原理 什 ...
- Redis:Redisson分布式锁的使用(推荐使用)
Redis:Redisson分布式锁的使用(生产环境下)(推荐使用) 关键词 基于NIO的Netty框架,生产环境使用分布式锁 redisson加锁:lua脚本加锁(其他客户端自旋) 自动延时机制:启 ...
- 积跬步以至千里,深入剖析Redis实战——分布式锁和延时队列
前言 之前咱们简单介绍了一下Redis的简单结构,相信很多读者看着比较入门.的确,笔者在介绍任何技术时,都是由浅及深的路数,为的是刚入门不久的新人,毕竟相对于久经沙场的老将,新人更需要这方便的普及. ...
- 再也不用担心面试官让我用Redis实现分布式锁啦(二、Redisson实现分布式锁)
目录 一.Jedis实现分布式锁 二.Redisson实现分布式锁(单机Redis) 一.引入依赖(3.5.7) 二.配置redis 三.配置RedisonConfig 四.提供锁接口及实现,方便统一 ...
- Redis高阶使用之Redisson分布式锁源码解析
一.何为分布式锁 1.锁synchronized lock 单机锁 2.我们的一些互斥资源 不能并行执行 需要一个东西来保证是串行执行的 锁synchronized 3.分布式锁:这个锁存储一定是独立 ...
- Redis实现分布式锁的深入探究
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 一.分布式锁简介 锁 是一种用来解决多个执行线程 访问共享资源 错 ...
最新文章
- java 内存测试_请你说一下java jvm的内存机制
- 在matlab中清除command history中的内容
- python画图哆啦a梦-【Python】绘制哆啦A梦
- 深度拆解:直播带货的现状与未来?
- oracle经典增删该查,oracle基本语法(增删改查
- 解决Win7英文版显示中文乱码
- cfg桩设备型号_试桩、试验桩、工程桩是一回事吗?
- unity 3d物体描边效果_从零开始的卡通渲染描边篇
- 洗牌算法汇总以及测试洗牌程序的正确性
- PS教程第十五课:图层是最基本的要求
- linux c++开发_Linux/Windows下进行C/C++开发的差异
- [转]非极大值抑制(Non-Maximum Suppression)
- 蓝牙通信-搜索附近的蓝牙设备
- 手机上怎么去掉a 标签中的img点击时的阴影?
- 软件工程课程设计小组人员分工
- 【Week 8 作业 B】猫猫向前冲
- ffmpeg linux 升级_linux系统部署ffmpeg视频转码环境及使用方法 | linux系统运维
- IO流-节点流和处理流(涵盖底层调用关系)
- 为什么我们要掌握Linux系统编程?
- 8086CPU指令系统 串操作指令和处理机控制指令