目录

  • 基于 `Redis` 的分布式锁
    • `Redis` 实现分布式锁的主要步骤
    • `Redis` 实现分布式锁的问题
  • 基于 `Redisson` 的分布式锁
    • `Redisson` 概述
    • `Redisson` 实现分布式锁
    • `Redisson` 分布式锁的源码
      • `RLock` 接口
      • 加锁
        • `RedissonLock` 类实现的 `tryLock()`
        • `tryAcquire()`
        • `tryAcquireAsync()`
      • 解锁
        • `RedissonLock` 类中实现的 `unlock()`
        • `unlockAsync()`
        • `unlockInnerAsync()`
    • `Redisson` 分布式锁实现原理

基于 Redis 的分布式锁

其实 Redis 官网已经给出了实现:https://redis.io/topics/distlock,说各种书籍和博客用了各种手段去用 Redis 实现分布式锁,建议用 Redlock 实现,这样更规范、更安全

我们默认指定大家用的是 Redis 2.6.12 及更高的版本,就不再去讲 setnx、expire 这种了,直接 set 命令加锁

set key value[expiration EX seconds|PX milliseconds] [NX|XX]# 例如
SET resource_name my_random_value NX PX 30000

SET 命令的行为可以通过一系列参数来修改

  • EX second:设置键的过期时间为 second 秒。SET key value EX second 效果等同于 SETEX key second value
  • PX millisecond:设置键的过期时间为 millisecond 毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value
  • NX:只在键不存在时,才对键进行设置操作。SET key value NX 效果等同于 SETNX key value
  • XX :只在键已经存在时,才对键进行设置操作

这条指令的意思:当 key——resource_name 不存在时创建这样的 key,设值为 my_random_value,并设置过期时间 30000 毫秒。因为 Redis 是单线程的,这一条指令不会被打断,所以是原子性的操作

Redis 实现分布式锁的主要步骤

  • 指定一个 key 作为锁标记,存入 Redis 中,指定一个唯一的标识作为 value
  • key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性
  • 设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性
  • 当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 解铃还须系铃人

设置一个随机值的意思是在解锁时候判断 key 的值和我们存储的随机数是不是一样,一样的话,才是自己的锁,直接 del 解锁就行

当然这个两个操作要保证原子性,所以 Redis 给出了一段 lua 脚本(Redis 服务器会单线程原子性执行 lua 脚本,保证 lua 脚本在处理的过程中不会被任意其它请求打断)

if redis.call("get",KEYS[1]) == ARGV[1] thenreturn redis.call("del",KEYS[1])
elsereturn 0
end

Redis 实现分布式锁的问题

  • 获取锁时,过期时间要设置多少合适呢?预估一个合适的时间,其实没那么容易,比如操作资源的时间最慢可能要 10s,而我们只设置了 5s 就过期,那就存在锁提前过期的风险。这个问题先记下,我们先看下 Javaer 要怎么在代码中用 Redis
  • 容错性如何保证呢?Redis 挂了怎么办,你可能会说上主从、上集群,但也会出现这样的极端情况,当我们上锁后,主节点就挂了,这个时候还没来的急同步到从节点,主从切换后锁还是丢了

带着这两个问题,我们接着看

基于 Redisson 的分布式锁

Redisson 概述

redissonRedis 官方的分布式锁组件。GitHub 地址:https://github.com/redisson/redisson

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格。它不仅提供了一系列的分布式的 Java 常用对象,还实现了可重入锁 ReentrantLock、公平锁 FairLock、联锁 MultiLock、 红锁 RedLock、 读写锁 ReadWriteLock 等,还提供了许多分布式服务。Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对 Redis 的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,只是配置的不同

Redisson 实现分布式锁

Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("").setDatabase(1);
RedissonClient redissonClient = Redisson.create(config);
RLock disLock = redissonClient.getLock("mylock");
boolean isLock;try {/*尝试获取锁的最大等待时间是100秒,超过这个值还没获取到,就认为获取失败锁的持有时是10秒*/isLock = disLock.tryLock(100, 10, TimeUnit.MILLISECONDS);if (isLock) {// 做自己的业务Thread.sleep(10000);}
} catch (Exception e) {e.printStackTrace();
} finally {disLock.unlock();
}

Redisson 分布式锁的源码

先看下 RLock 的类关系


跟着源码,可以发现 RedissonLockRLock 的直接实现,也是我们加锁、解锁操作的核心类

RLock 接口

// 获取RLock对象
RLock lock = redisson.getLock("myLock");

RLock 提供了各种锁方法,我们来解读下这个接口方法。可以看到继承自 JDKLock 接口和 Reddsion 的异步锁接口 RLockAsync

public interface RLock extends Lock, RLockAsync {/*获取锁的名字*/String getName();/*** 这个叫终端锁操作,表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过* Thread.currentThread().interrupt(); 方法真正中断该线程*/void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;/*** 这个应该是最常用的,尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;/*** 锁的有效期设置为 leaseTime,过期后自动失效* 如果 leaseTime 设置为 -1, 表示不主动过期*/void lock(long leaseTime, TimeUnit unit);boolean forceUnlock();/*检查是否被另一个线程锁住*/boolean isLocked();/*检查当前线线程是否持有该锁*/boolean isHeldByCurrentThread();/*这个就明了了,检查指定线程是否持有锁*/boolean isHeldByThread(long threadId);/*返回当前线程持有锁的次数*/int getHoldCount();/*返回锁的剩余时间*/long remainTimeToLive();
}

加锁

RedissonLock 类实现的 tryLock()

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 获取等锁的最长时间long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();// 取得当前线程id(判断是否可重入锁的关键)long threadId = Thread.currentThread().getId();// 核心点1:尝试获取锁,若返回值为null,则表示已获取到锁,返回的ttl就是key的剩余存活时间Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;}// 还可以容忍的等待时长 = 获取锁能容忍的最大等待时长 - 执行完上述操作流程的时间time -= System.currentTimeMillis() - current;if (time <= 0) {// 等不到了,直接返回失败acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();/*** 核心点2* 订阅解锁消息 redisson_lock__channel:{$KEY},并通过await方法阻塞等待锁释放,解决了无效的锁申请浪费资源的问题:* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争* 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败* 当 this.await返回true,进入循环尝试获取锁*/RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}// ttl 不为空,表示已经有这样的key了,只能阻塞等待try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 来个死循环,继续尝试着获取锁while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}currentTime = System.currentTimeMillis();/*** 核心点3:根据锁TTL,调整阻塞等待时长;* 1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免在while循环中频繁请求获锁;*  当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程* 2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;*///调用信号量的方法来阻塞线程,时长为锁等待时间和租期时间中较小的那个if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件unsubscribe(subscribeFuture, threadId);}
}

tryAcquire()

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {/*调用方法如下*/return get(tryAcquireAsync(leaseTime, unit, threadId));
}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// leaseTime != -1 说明没过期if (leaseTime != -1) {// 实质是异步执行加锁Lua脚本ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 否则,已经过期了,传参变为新的时间(续期后)ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 续期scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
}

tryAcquireAsync()

异步执行加锁 Lua 脚本

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,// 1.如果缓存中的key不存在,则执行 hincrby 命令(hincrby key UUID+threadId 1), 设值重入次数1// 然后通过 pexpire 命令设置锁的过期时间(即锁的租约时间)// 返回空值 nil ,表示获取锁成功"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果key已经存在,并且value也匹配,表示是当前线程持有的锁,则执行 hincrby 命令,重入次数加1,并且设置失效时间"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果key已经存在,但是value不匹配,说明锁已经被其他线程持有,通过 pttl 命令获取锁的剩余存活时间并返回,至此获取锁失败"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
  • KEYS[1] 就是 Collections.singletonList(getName()),表示分布式锁的 key
  • ARGV[1] 就是 internalLockLeaseTime,即锁的租约时间(持有锁的有效时间),默认30s
  • ARGV[2] 就是 getLockName(threadId),是获取锁时 set 的唯一值 value,即 UUID+threadId

解锁

RedissonLock 类中实现的 unlock()

@Override
public void unlock() {try {// 获取当前调用解锁操作的线程IDget(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {// IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}

unlockAsync()

@Override
public RFuture<Void> unlockAsync(long threadId) {// 构建一个结果RedissonPromiseRPromise<Void> result = new RedissonPromise<>();// 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {// 取消看门狗的续期任务cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;
}

unlockInnerAsync()

// 真正的内部解锁的方法,执行解锁的Lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 如果分布式锁存在,但是value不匹配,表示锁已经被其他线程占用,无权释放锁,那么直接返回空值(解铃还须系铃人)"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +// 如果value匹配,则就是当前线程占有分布式锁,那么将重入次数减1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只能更新失效时间,还不能删除"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +// 重入次数减1后的值如果为0,这时就可以删除这个KEY,并发布解锁消息,返回1"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Redisson 分布式锁实现原理

关于原理可查看:https://blog.csdn.net/weixin_38192427/article/details/115319628

关于更为详细的 Redisson 源码解析 1:https://blog.csdn.net/qq_24384579/article/details/90451059
关于更为详细的 Redisson 源码解析 2:https://artisan.blog.csdn.net/article/details/105449188

Redis与Redisson的分布式锁相关推荐

  1. Redis实现分布式锁全局锁—Redis客户端Redisson中分布式锁RLock实现

    2019独角兽企业重金招聘Python工程师标准>>> 1. 前因 以前实现过一个Redis实现的全局锁, 虽然能用, 但是感觉很不完善, 不可重入, 参数太多等等. 最近看到了一个 ...

  2. 【Redis】Redis 使用 redisson 做分布式锁 复盘 maven 依赖 netty 冲突

    本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载.有问题可以先私聊我,本人每天都在线,会帮助需要的人. 文章目录 1.概述 1.概述 今天想要做一个功能,大 ...

  3. redisson redlock(基于redisson框架和redis集群使用分布式锁)

    一.关于分布式锁的两篇文章 文章1 文章2 二.redis分布式锁存在的问题 redis实现分布式锁有很多种方案,比较完善的方案应该是用setNx + lua进行实现.简单实现如下: java代码-加 ...

  4. 再也不用担心面试官让我用Redis实现分布式锁啦(二、Redisson实现分布式锁)

    目录 一.Jedis实现分布式锁 二.Redisson实现分布式锁(单机Redis) 一.引入依赖(3.5.7) 二.配置redis 三.配置RedisonConfig 四.提供锁接口及实现,方便统一 ...

  5. redisson的锁的类型_绝对干货:利用redisson完成分布式锁功能

    在单体架构中,我们使用synchronize或者Lock就能完成上锁同步的操作,但是这些在分布式,微服务的今天,失去了作用. 分布式锁的实现一般有三种解决方案:基于数据库表实现 基于缓存实现,比如re ...

  6. java设计前期工作基础和存在的困难_Java秒杀系统实战系列-基于Redisson的分布式锁优化秒杀逻辑...

    本文是"Java秒杀系统实战系列文章"的第十五篇,本文我们将借助综合中间件Redisson优化"秒杀系统中秒杀的核心业务逻辑",解决Redis的原子操作在优化秒 ...

  7. Redisson 实现分布式锁

    Java提供的内置锁机制只在单机系统里有效,在分布式系统,一般多机部署的环境下无法保证线程安全,这时需要在整个系统提供一个全局唯一的锁. 可以通过Redis,ZooKeeper,数据库实现. Redi ...

  8. Spring Boot学习总结(19)——使用Redisson实现分布式锁

    一.什么是分布式? 要想说什么是分布式,那么首先要知道分布式之前的系统是什么样的架构,之前的架构又存在什么样的问题? 单体架构 分布式之前就是单体架构,单体架构顾名思义就是将所有的业务功能打包在一个应 ...

  9. 聊聊redisson的分布式锁

    序 本文主要研究一下redisson的分布式锁 maven <dependency><groupId>org.redisson</groupId><artif ...

  10. redis set 超时_redis分布式锁3种实现方式对比分析总结

    我在这篇文章提到了分布式锁,但没有展开来讲,抛砖引玉,今天就来说说高并发服务编程中的redis分布式锁. 这里罗列出3种redis实现的分布式锁,并分别对比说明各自特点. Redis单实例分布式锁 实 ...

最新文章

  1. R语言dplyr包if_else条件判断选择函数实战
  2. 安装linux办公软件,Centos7如何安装开源办公软件Libreoffice
  3. python3语法错误-关于在python3.7当中的语法错误!
  4. Android dialog 全屏
  5. POJ 3250 Bad Hair Day (单调栈)
  6. 数据结构与算法(Python)– 回溯法(Backtracking algorithm)
  7. Spring注解开发-@Scope作用域注解
  8. how does Fiori Mock server serve OData request with 202
  9. php养老院管理系统,XYCMS养老院建站系统 v3.8
  10. nagios服务配置
  11. iOS开发UI篇—常见的项目文件介绍
  12. MATLAB灰度显示和彩色显示
  13. 【视频直播篇七】Aliplayer的使用
  14. BCNF无损分解例题
  15. php中case的使用,php:switchcase语句的使用案例
  16. 浅谈python-docx的缩进问题——如何缩进两个字符
  17. hp服务器时间修改,HP服务器bios时间设置确认
  18. How to delete files beginning with -- in Linux
  19. SpringBoot整合dubbo详解(阿里官方dubbo-spring-boot-starter)
  20. 【电源设计】01电源参数及LDO

热门文章

  1. 容器技术Docker K8s 5 容器技术在阿里巴巴落地历程
  2. StarUML接口视图修改为类的形式
  3. 计算二叉树的叶子结点个数
  4. Linux系统重要日志文件
  5. 利用反射给属性赋值,调用方法,调用构造器--getDeclaredField,getDeclaredMethod,getDeclardConstructor
  6. elisa数据处理过程图解_ELISA操作流程
  7. 小波变换 分离影像低频部分_连续小波变换(1)
  8. 【数字图像处理系列一】opencv-python快速入门篇
  9. [译]直观理解信息论
  10. Neural Style Transfer