本文主要研究一下redisson的分布式锁

maven

     <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.8.1</version></dependency>
复制代码

实例

    @Testpublic void testDistributedLock(){Config config = new Config();
//        config.setTransportMode(TransportMode.EPOLL);config.useSingleServer().setAddress("redis://192.168.99.100:6379");RedissonClient redisson = Redisson.create(config);IntStream.rangeClosed(1,5).parallel().forEach(i -> {executeLock(redisson);});executeLock(redisson);}public void executeLock(RedissonClient redisson){RLock lock = redisson.getLock("myLock");boolean locked = false;try{LOGGER.info("try lock");locked = lock.tryLock();
//            locked = lock.tryLock(1,2,TimeUnit.MINUTES);LOGGER.info("get lock result:{}",locked);if(locked){TimeUnit.HOURS.sleep(1);LOGGER.info("get lock and finish");}}catch (Exception e){e.printStackTrace();}finally {LOGGER.info("enter unlock");if(locked){lock.unlock();}}}
复制代码

源码解析

RedissonLock.tryLock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

    @Overridepublic boolean tryLock() {return get(tryLockAsync());}@Overridepublic RFuture<Boolean> tryLockAsync() {return tryLockAsync(Thread.currentThread().getId());}@Overridepublic RFuture<Boolean> tryLockAsync(long threadId) {return tryAcquireOnceAsync(-1, null, threadId);}private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {return;}Boolean ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}protected String getLockName(long threadId) {return id + ":" + threadId;}private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = renewExpirationAsync(threadId);future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {task.cancel();}}protected RFuture<Boolean> renewExpirationAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}
复制代码
  • 这里leaseTime没有设置的话,默认是-1,使用的是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),默认为30秒
  • tryLockInnerAsync使用的是一段lua脚本,该脚本有3个参数,第一个参数为KEYS数组,后面几个参数为ARGV数组的元素
  • 这里key的值为调用方指定的这个redissonLock的名称,两个变量,第一个为leaseTime,第二个为锁的名称,使用redissonLock的id+线程id
  • lua脚本第一个方法判断redissonLock的hashmap是否存在,如果不存在则创建,该hashmap有一个entry的key为锁名称,valude为1,之后设置该hashmap失效时间为leaseTime
  • lua脚本第二个方法是在redissonLock的hashmap存在的情况下,将该锁名的value增1,同时设置失效时间为leaseTime
  • 最后返回该redissonLock名称的key的ttl
  • 执行成功之后判断ttl是否还有值,有的话则调用scheduleExpirationRenewal,防止lock未执行完就失效
  • scheduleExpirationRenewal是注册一个延时任务,在internalLockLeaseTime / 3的时候触发,执行的方法是renewExpirationAsync,将该锁失效时间重置回internalLockLeaseTime
  • scheduleExpirationRenewal里头给scheduleExpirationRenewal任务增加listener,如果设置成功之后还会再次递归调用scheduleExpirationRenewal重新注册延时任务
  • tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法是指定自动解锁时间时调用的方法,它与tryAcquireOnceAsync的区别在于,它对ttl的方回值采用long值来判断,如果是null,才执行延长失效时间的定时任务,而tryAcquireOnceAsync方法采用的是BooleanNullReplayConvertor,只要返回不是null,则返回true

RedissonLock.unlock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

    @Overridepublic void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException)e.getCause();} else {throw e;}}//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);}@Overridepublic RFuture<Void> unlockAsync(final long threadId) {final RPromise<Void> result = new RedissonPromise<Void>();RFuture<Boolean> future = unlockInnerAsync(threadId);future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {cancelExpirationRenewal(threadId);result.tryFailure(future.cause());return;}Boolean opStatus = future.getNow();if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}if (opStatus) {cancelExpirationRenewal(null);}result.trySuccess(null);}});return result;}protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}String getChannelName() {return prefixName("redisson_lock__channel", getName());}void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = expirationRenewalMap.get(getEntryName());if (task != null && (threadId == null || task.getThreadId() == threadId)) {expirationRenewalMap.remove(getEntryName());task.getTimeout().cancel();}}
复制代码
  • unlockInnerAsync通过lua脚本来释放锁,该lua使用两个key,一个是redissonLock名称,一个是channelName
  • 该lua使用的变量有三个,一个是pubSub的unlockMessage,默认为0,一个是internalLockLeaseTime,默认为commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),一个是锁名称
  • 如果该redissonLock不存在,则直接发布unlock消息返回1;如果该锁不存在则返回nil;
  • 如果该锁存在则将其计数-1,如果counter大于0,则重置下失效时间,返回0;如果counter不大于0,则删除该redissonLock锁,发布unlockMessage,返回1;如果上面条件都没有命中返回nil
  • unlockAsync里头对unlockInnerAsync注册了FutureListener,主要是调用cancelExpirationRenewal,取消掉scheduleExpirationRenewal任务

LockPubSub

redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {public static final Long unlockMessage = 0L;@Overrideprotected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(unlockMessage)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}value.getLatch().release();}}}
复制代码
  • 接收到unlockMessage的时候,会调用RedissonLockEntry的listener,然后触发latch的release
  • tryAcquireOnceAsync这个方法默认没有创建LockPubSub,而且没有指定自动解锁时间,则定时任务会一直延长失效时间,这个可能存在锁一直没释放的风险

小结

加锁有如下注意事项:

  • 加锁需要设置超时时间,防止出现死锁
  • 加锁以及设置超时时间的时候,需要保证两个操作的原子性,因而最好使用lua脚本或者使用支持NX以及EX的set方法
  • 加锁的时候需要把加锁的调用方信息,比如线程id给记录下来,这个在解锁的时候需要使用
  • 对于加锁时长不确定的任务,为防止任务未执行完导致超时被释放,需要对尚未运行完的任务延长失效时间

解锁有如下注意事项:

  • 解锁一系列操作(判断key是否存在,存在的话删除key等)需要保证原子性,因而最好使用lua脚本
  • 解锁需要判断调用方是否与加锁时记录的是否一致,防止锁被误删
  • 如果有延续失效时间的延时任务,在解锁的时候,需要终止掉该任务

doc

  • 一分钟实现分布式锁
  • 这才是真正的分布式锁
  • 漫画:什么是分布式锁?
  • Redis分布式锁的正确实现方式(Java版)
  • 基于Redis的分布式锁到底安全吗(上)?
  • 基于Redis的分布式锁到底安全吗(下)?
  • Redis分布式锁的正确实现姿势
  • Redisson分布式锁浅析

聊聊redisson的分布式锁相关推荐

  1. redisson的锁的类型_绝对干货:利用redisson完成分布式锁功能

    在单体架构中,我们使用synchronize或者Lock就能完成上锁同步的操作,但是这些在分布式,微服务的今天,失去了作用. 分布式锁的实现一般有三种解决方案:基于数据库表实现 基于缓存实现,比如re ...

  2. java设计前期工作基础和存在的困难_Java秒杀系统实战系列-基于Redisson的分布式锁优化秒杀逻辑...

    本文是"Java秒杀系统实战系列文章"的第十五篇,本文我们将借助综合中间件Redisson优化"秒杀系统中秒杀的核心业务逻辑",解决Redis的原子操作在优化秒 ...

  3. 22-09-20 西安 谷粒商城(04)Redisson做分布式锁、布隆过滤器、AOP赋能、自定义注解做缓存管理、秒杀测试

    Redisson 1.Redisson做分布式锁  分布式锁主流的实现方案: 基于数据库实现分布式锁 基于缓存(Redis),性能最高 基于Zookeeper,可靠性最高 Redisson是一个在Re ...

  4. Redisson实现分布式锁原理

    Redisson实现分布式锁原理 一.高效分布式锁 当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的. 1.互斥 ...

  5. Java秒杀系统实战系列~基于Redisson的分布式锁优化秒杀逻辑

    摘要: 本篇博文是"Java秒杀系统实战系列文章"的第十五篇,本文我们将借助综合中间件Redisson优化"秒杀系统中秒杀的核心业务逻辑",解决Redis的原子 ...

  6. Redisson实现分布式锁(3)—项目落地实现

    Redisson实现分布式锁(3)-项目落地实现 有关Redisson实现分布式锁前面写了两篇博客作为该项目落地的铺垫. 1.Redisson实现分布式锁(1)-原理 2.Redisson实现分布式锁 ...

  7. 基于后端开发Redisson实现分布式锁源码分析解读

    一.分布式锁的概念和使用场景 分布式锁是控制分布式系统之间同步访问共享资源的一种方式. 在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问 ...

  8. Redisson 实现分布式锁原理

    Redisson实现分布式锁 有关Redisson作为实现分布式锁,总的分3大模块来讲. 1.Redisson实现分布式锁原理 2.Redisson实现分布式锁的源码解析 3.Redisson实现分布 ...

  9. 再也不用担心面试官让我用Redis实现分布式锁啦(二、Redisson实现分布式锁)

    目录 一.Jedis实现分布式锁 二.Redisson实现分布式锁(单机Redis) 一.引入依赖(3.5.7) 二.配置redis 三.配置RedisonConfig 四.提供锁接口及实现,方便统一 ...

最新文章

  1. 【USACO training】Chapter 1 入门
  2. “比特币耶稣”罗杰·沃推特赠币,留下BCH钱包地址就有份
  3. linux madplay运行完成,Madplay移植到mini2440全过程详解
  4. 数字货币 分层确定性钱包(HD Wallets)
  5. 对于scanf的使用一点体会心得
  6. 前端学习(1837):前端面试题之变量提升
  7. hdu1540/poj2892 线段数区间合并
  8. 【STM32】HAL库 STM32CubeMX教程六----定时器中断
  9. 华为郭平:很愿意使用高通芯片制造手机
  10. 对象删除某个属性_充分了解JavaScript中【对象】的概念(二)
  11. 垃圾回收算法_划重点 JVM G1 垃圾回收算法
  12. linux仿win7软件,Ubuntu/Linux Mint用上仿Win7/Win8主题
  13. NiFi 学习 —自己实现处理器
  14. 软件常见的各种版本英文缩写
  15. 区块链报告会心得体会3000_区块链讲座观后感6
  16. android通讯录项目分析,Android 通讯录展示
  17. MySQL Deamon少量解读
  18. CondaHTTPError: HTTP 000 CONNECTION FAILED for url <https://mirror.tuna.tsinghua.edu.cn/anaconda/pkg
  19. dz.27z.co index.php,dc vip中心 专业版v2.2.1 discuz插件 dzvip插件 vip会员插件 积分充值插件...
  20. ORACLE进阶(十)start with connect by 实现递归查询

热门文章

  1. CLR via C# 3 读书笔记(5):第1章 CLR执行模型 — 1.5 本地代码生成器工具:NGen.exe...
  2. 【转】WCF、Net remoting、Web service概念及区别
  3. mysql_query
  4. 十分钟了解HTTPS协议
  5. 在运行时切换 WinForm 程序的界面语言 System.ComponentModel.ComponentResourceManager .ApplyResources...
  6. 【传】玩转Android---UI篇---ImageButton(带图标的按钮)
  7. javascript 获取图片原始尺寸
  8. jQuery学习(一):鼠标移动显示大图
  9. Python 3 与 Javascript escape 传输确保数据正确方法和中文乱码解决方案
  10. 安装office2010出现了错误,提示要安装MSXML6.10.1129.0解决方法