Reddission 分布式锁原理

总结:

  • 使用无参的tryLock()方法时,redisson会自动添加一个定时任务,定时刷新锁的失效时间,如果unlock时失败,则会出现该锁一直不释放的情况, 因为定时刷新的任务一直存在。
  • 使用两个参数的tryLock(long waitTime, TimeUnit unit)方法时,比无参的多了个功能就是在waitTime内,重试获取锁,直到超时,返回失败
  • tryLock(long waitTime, long leaseTime, TimeUnit unit)传释放时间leaseTime时,在waitTime内,重试获取锁,直到超时,返回失败。但不会添加定时刷新锁的失效时间的任务

加锁过程与解锁过程

redission 加锁采用了redis 的hash结构,

源码进入RedissionLock无参的tryLock()

尝试获取锁的代码


将当前获取锁线程的id传进去

   private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// leaseTime 就是租约时间,就是Redis key 的过期时间// 无参的tryLock 这里必定为-1 不会进入这个判断if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}// commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 获取到的值//    private long lockWatchdogTimeout = 30 * 1000; 这个是获取的默认的超时时间30sRFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);//等待获取加锁的结果ttlRemainingFuture.onComplete((ttlRemaining, e) -> {// 存在异常 直接返回if (e != null) {return;}// 加锁成功,下面代码实现锁超时重试,也就是看门狗的逻辑// lock acquiredif (ttlRemaining) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}

这块代码执行获取锁的流程
锁可重入的逻辑在lua脚本中

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// 将超时时间保存在RedissonLock的 internalLockLeaseTime 变量中,用来解决锁超时问题 watchDog机制internalLockLeaseTime = unit.toMillis(leaseTime);// 真实的获取锁的过程 // getName()     lock = redissonClient.getLock("order"); 中的order//Collections.singletonList(getName()) 获取锁的key也就是当前锁的名称  传入lua 脚本中的key 对应KEYS[1]// internalLockLeaseTime 超时时间 传入lua脚本的参数[1] 对应ARGV[1]// getLockName(threadId) 线程唯一标识   对应hash结构的field hash结构的value对应重入次数return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

上面 字符串对应的lua文件,这个是我自己写的 和代码中的逻辑是一样的

--- KEYS[1] 是锁的key
--- ARGV[1] 是超时时间
--- ARGV[2] 是锁线程名称
local key = KEYS[1] --锁的key
local releaseTime = ARGV[1] -- 锁超时时间
local threadId = ARGV[2]  -- 线程唯一标识
--- 所不存在执行的流程
if (redis.call("exists", key) == 0) then-- 不存在获取锁redis.call("hset", key, threadId, '1');-- 设置有效期redis.call("pexpire", key, releaseTime);return nil;
end
--- 锁存在执行的流程
if (redis.call("hexists", key, threadId) == 1) then-- 锁重入 当前线程对应的value增加一redis.call("hincrby",key,threadId,'1');-- 重置超时时间redis.call("pexpire", key, releaseTime);return nil;  --返回结果
end--- 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。 否则,以毫秒为单位,返回 key 的剩余生存时间。
return redis.call('pttl', KEYS[1])

scheduleExpirationRenewal(long threadId)

   private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();//判断map 中是否存在当前线程对象的实体,如果存在则返回实体,如果不存在则创建新的返回nullExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);// 看门狗实现,创建新的实体需要增加看门狗的逻辑renewExpiration();}}

renewExpiration

   private void renewExpiration() {// 从map中获取ExpirationEntry 如果为null则直接返回ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 如果不为空则创建一个延时任务 task 与Timer返回的TimerTask关联的句柄。Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//在重新设置超时时间RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}//internalLockLeaseTime   就是我们之前获取到的leaseTime  不传默认30秒}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 将延时任务放入到ExpirationEntry 中    ee.setTimeout(task);}// 重新设置超时时间的代码protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}

源码进入RedissionLock两个参数的tryLock(long waitTime, TimeUnit unit)


 @Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 将等待时间转换成毫秒long time = unit.toMillis(waitTime);// 获取当前时间long current = System.currentTimeMillis();// 获取当前线程idlong threadId = Thread.currentThread().getId();// 源码看下图Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));}

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}//  获取锁成功,添加锁超时延时任务if (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// 判断是否是重入的锁ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 是的话,直接将线程id添加到oldEntry中if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);//添加锁延时任务    源码是无参的renewExpirationrenewExpiration();}}

Redission 分布式锁原理相关推荐

  1. redis和redission分布式锁原理及区别

    redis和redission分布式锁原理及区别 我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解. 结 ...

  2. api 创建zookeeper客户端_zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  3. zookeeper 分布式锁原理

    zookeeper 分布式锁原理: 1 大家也许都很熟悉了多个线程或者多个进程间的共享锁的实现方式了,但是在分布式场景中我们会面临多个Server之间的锁的问题,实现的复杂度比较高.利用基于googl ...

  4. Apache Curator之分布式锁原理(二)

    本文主要讲解如下内容: 为什么要使用分布式锁? 分布式锁特性! 分布式锁的实现方式有哪些? Curator分布式锁原理 Curator分布式锁实现类UML及相关类的介绍 基于Redis,数据库实现分布 ...

  5. zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  6. Redisson实现分布式锁原理

    Redisson实现分布式锁原理 一.高效分布式锁 当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的. 1.互斥 ...

  7. spring定时任务(Scheduled)运行阻塞不执行/Redission分布式锁阻塞问题

    spring定时任务(Scheduled)运行阻塞不执行/Redission分布式锁阻塞问题 最近项目中发现一个bug,排查了很久,最后发现问题所在,在此记录一下. 问题描述: 项目运行一段时间后,c ...

  8. Redis分布式锁原理

    业务背景: 后台定时任务刷新Redis的数据到数据库中,有多台机器开启了此定时同步的任务,但是需要其中一台工作,其他的作为备用,提高可用性.使用Redis分布式锁进行限制,拿到锁的机器去执行具体业务, ...

  9. Zookeeper分布式锁原理

    1.分布式锁介绍 单机应用开发,涉及并发同步的时候,我们往往采用synchronized 或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题. 但当我 ...

最新文章

  1. java中的Random()注意!
  2. celery源码分析:multi命令分析
  3. 移动app测试的多样性_app移动端接口性能测试
  4. linux 多源代码文件编译
  5. java 乘法 位移_java 位移运算与乘法运算
  6. SRAM(静态随机存储器)
  7. shp2sde命令行方式向arcsde批量导入数据脚本的生成步骤
  8. 俞敏洪最新干货演讲:在动荡的时代做不动荡的自己
  9. java 延迟加载_hibernate延迟加载(懒加载)教程讲解
  10. 呷哺呷哺:预期2021年净亏损约2.75亿元至2.95亿元
  11. cadence导入dxf文件_DXF如何导入为图纸?
  12. 拓端tecdat|R语言中的风险价值模型度量指标TVaR与VaR
  13. JAVE实现音频截取并上传OSS
  14. 【NanoPi2试用体验】nanopi2下的二维码识别
  15. Word文档恢复,2大方案教你找回没有保存或者被删除的数据
  16. ce修改魔兽争霸服务器存档,魔兽争霸3用CE修改找金钱基址
  17. 韦东山嵌入式linux第一期_裸机实战之开发板熟悉与体验篇
  18. 按键拨号声音 DTMF MATLAB程序样例
  19. 视频去水印的Python代码
  20. IOS停机卡免流线路下载更新

热门文章

  1. 7-360 ZJW系列之电玩积木
  2. ChatGPT的朋友们:大语言模型经典论文一次读到吐
  3. MachineKey
  4. Python逃生游戏
  5. datagrid表头合并
  6. MyBatis Plus Caused by: java.lang.ClassNotFoundException: org.mybatis.logging.LoggerFactory
  7. 软件测试--测试过程模型(V,W,H,X)
  8. linux下配置gradle环境
  9. 抖音1000个粉丝有什么用?能赚多少钱?
  10. 用于渗透测试的10种漏洞扫描工具