用 Redis 实现分布式锁(Java 版)

  • 核心代码
  • 完整代码

  分布式锁是一种解决分布式临界资源并发读写的一种技术。本文详细介绍了在 Java 中使用 Redis 实现分布式锁的方法。为了方便,这里使用了 Spring Boot 中的 RedisTemplate 来与 Redis 进行交互。本文的分布式锁有如下功能:

  • 是分布式锁,互斥粒度为线程级。

  • 可重入。同一线程可多次上锁。

  • 锁不唯一。可以持有多个名称不同的锁,不同名的锁之间的创建与释放互相独立。

  • 支持锁过期自动释放。

  • 支持持锁超时自动异步续时。


【渐进式问答】【Q & A】

  1. Q:Redis 实现分布式锁的原理是什么?

    A:锁本质上是一种逻辑控制,使用一个布尔型的变量就可以。比方说,可以让 Redis 中的某个键存在表示上了某种锁,当 Redis 中没有这个键时表示没有上这个锁。

    而 Redis 是独立于用户程序的一种拥有集群功能的全局分布式应用,因此可以用于实现分布式锁。

  2. Q:如何实现 Redis 分布式锁的线程级可重入。

    A:可以使用 ThreadLocal 记录每个线程当前上锁的重入次数。每当上锁时,就将记录中的重入次数加 1。每当释放锁时,就将其减 1。特别地,在释放锁时,如果重入次数为 1,就真正地在 Redis 中删除此锁。

  3. Q:对于这种情况如何应对:一个程序在设置了 Redis 分布式锁之后,然后业务代码中抛出了异常,结果程序跳过了后面的释放锁代码就退出了。

    A:可以将加分布式锁的代码置于一个 try 块 中,然后在 try 块 后面加不含 catch 块finally 子句,并在 finally 子句 中编写释放锁的代码。这样,无论中途发生了什么异常,释放锁的代码一定会执行。

  4. Q:在问题【3】中,如果一个程序在没有获得锁的情况下就退出,这不就可能会释放正在持有锁的程序的锁吗?

    A:对于这种情况可以借助 ThreadLocal,用两种方法来应对:

    • 使用 ThreadLocal 为每个线程生成一个 ID,然后将此 ID 存于 Redis 锁中,等释放锁之时,检查锁中的 ID 与本线程的 ID 是否一致。如果一致才真正释放锁。

    • 利用本 Redis 锁的互斥性。使用 ThreadLocal 记录每个线程当前上锁的重入次数。因为本 Redis 锁是互斥锁,所以只可能有一个线程,它的当前上锁次数大于 0。因此,释放锁的时候只需要判断自己当前的上锁次数是否为 0 即可。如果不为 0,才真正释放锁。

      本文使用的是这种方法。

  5. Q:对于这种情况如何应对:一个程序在设置了 Redis 分布式锁之后,还没来得及释放该锁就崩溃了。此时,所有的程序都无法获取受该锁束缚的资源。

    A:可以选择在上锁的同时引入超时时间。此时如果问题中的程序崩溃时,锁会自动释放。

  6. Q:在问题【5】中,如果该程序在上锁之后还没有来得及设置超过时间就崩溃呢?

    A:可以让上锁和设置超过时间这两个操作变成同一个原子操作。

    现在,Spring Boot 中的 RedisTemplate 有这种 API 可以实现这一点。

    如果有的技术没有提供这种 API,可以使用 Redis 中的 Eval 命令,这个命令支持运行一段 Lua 脚本,这个命令是原子性的。


    【错误的解决方案】

    • Q1:在问题【5】中,如果该程序在上锁之后还没有来得及设置超过时间就崩溃呢?

    • A1:可以将本次上锁时间作为 Redis 锁的值存入,同时规定某个键存在表示上了某种锁,没有这个键时表示没有上这个锁。然后令读取锁的程序通过比较上锁时间与当前时间来判断此锁有没有过期。

    • Q2:如果锁过期了,如何保证只有一个程序可以获得锁?

    • A2:可以使用类似于乐观锁的机制,在上锁时同时将上锁应用的 ID 存入,然后在加锁之后再读取锁数据,判断最后加锁成功的是不是自己即可。

    • Q3:要怎么做到对“最后加锁”的判断?如何解决这种情况:两个程序都要加锁,而第一个程序执行很快,加锁之后又认为自己成功加上了锁。然后第二个执行较慢的程序将锁覆盖,也认为自己成功加上了锁。现在,两个程序都认为自己加上了锁。

    • A3:这确实是错误的解决方案。


  7. Q:在问题【5】中,如果该程序在上锁后业务代码执行时间过长而锁超时怎么办?

    A:可以在加锁之后开启一个子线程进行异步周期性地续时。当释放锁时,再中断结束这个续时线程。

  8. Q:在问题【7】中,每次上锁都开启新线程,这个开销是不是有点大了?

    A:那可以选择让同一个名称的锁对应同一个续时线程。具体来说,事先开启一个续时线程,这个续时线程不会因锁释放而销毁。然后让这个续时线程完成所有线程上锁的续时任务。

  9. Q:在问题【8】中,如果程序需要使用 1w 个锁来锁 1w 条不同的数据,那这样在后台开启 1w 个续时线程是不是容易溢出?

    A:可以在创建续时线程时设置续时线程的个数上限。如果达到上限,可以采取很多策略,比如令新的续时线程像问题【7】一样在锁释放时销毁。

  10. Q:问一个与创建 Redis 分布式锁无关的问题。对于秒杀的业务,假设购买商品前要加锁,如果没有拿到锁,会自旋等待。现在如果有 1w 个购买请求,但商品数只有 100 个,这就意味着理论上在秒杀结束之后,有 9900 个请求是不需要拿到锁的。如何保证这一点?如何防止这样的一种情况:明明秒杀已经结束了,剩下的 9900 个请求仍然在自旋排队拿锁,并在拿到锁之后执行业务代码。

    A:如果这个秒杀项目使用了一种高速缓存技术,可以选择在秒杀结束之后,将秒杀结束这一信号存于高速缓存中。当请求在自旋等待时,不断在高速缓存中查询秒杀是否结束,如果是就结束自旋。同时在拿到锁之后,也要查询秒杀是否结束,如果是就跳过某些业务代码。

  11. Q:在问题【10】中,为什么在拿到锁之后,也要查询秒杀是否结束?

    A:在线程在自旋等待过程中,其可能会位于自旋等待过程中的任何一个时间点。如果有大量的线程位于拿锁的时间点,那么当其它其它线程释放锁时,即便是秒杀结束了,自旋等待中判断秒杀是否结束的代码也不会起作用。因为当它拿到锁的时候,就会马上退出循环,而不会经历这个自旋中的判断代码。因此在拿到锁之后,也要执行这个判断代码。


【编程难点】(这些问题的答案不方便文字描述,这里从略。读者可以在文末笔者的源代码中找到解决方案)

  1. 在规定一个分布式锁对应一个续时线程的情况下,如果需要使用多个锁,如何避免多线程并发时,为每一个锁创建了多个续时线程?

  2. 如何在多线程共用同一续时线程的情况下,控制此续时线程的续时停止与恢复?

  3. 如何保证在得到和释放分布式锁时,续时线程能立刻感知到?(如果续时线程刚好在休眠,那它就不能立刻感知到)

  4. 如何防止续时线程意外中止?


核心代码

package org.wangpai.demo.lock;import java.util.concurrent.TimeUnit;
import lombok.Setter;
import org.springframework.data.redis.core.RedisTemplate;/*** 分布式可重入锁** @since 2022-3-13*/
public class DistributedReentrantLock {@Setterprivate static RedisTemplate<String, String> redisTemplate;private final String name;/*** 线程级可重入** @since 2022-3-13*/private final ThreadLocal<Integer> lockedTimes = new ThreadLocal<>();@Setterprivate int lockedDuration = 10;private TimeUnit lockedDurationUnit = TimeUnit.SECONDS;public DistributedReentrantLock(String name) {this.name = name;this.lockedTimes.set(0);}/*** 尝试加锁,如果失败,返回 false** @since 2022-3-13*/public boolean tryLock(long timeout, TimeUnit unit) {var times = this.lockedTimes.get();boolean isSuccessful = true;if (times == 0) {isSuccessful = redisTemplate.opsForValue().setIfAbsent(this.name, this.name, timeout, unit);}if (isSuccessful) {this.lockedTimes.set(times + 1);var renewal = DistributedLockFactory.getRenewal(this.name);renewal.setTimeRenewal(this.lockedDuration).setTimeWaiting(this.lockedDuration / 2).resume();}return isSuccessful;}/*** 尝试最多持续 60s 的锁** @since 2022-3-13*/public boolean tryLock() {return this.tryLock(this.lockedDuration, this.lockedDurationUnit);}/*** 尝试加锁,如果失败,返回 false** @since 2022-3-13*/public boolean tryLock(long timeout) {return this.tryLock(timeout, this.lockedDurationUnit);}/*** 只有本线程上过锁时,调用此方法才有效** @since 2022-3-13*/public void unlock() {var times = this.lockedTimes.get();if (times == 0) {System.out.println("本线程没有上过锁,解锁失败");return;}// 本线程是否上过锁if (times == 1) {/*** 因为这个锁是互斥锁,所以只要本线程加锁过,其它线程不可能可以加锁,* 因此这锁一定是本线程加的,故无需验证线程 id*/redisTemplate.delete(this.name);var renewal = DistributedLockFactory.getRenewal(this.name);renewal.suspend();System.out.println("完全释放分布式锁");}this.lockedTimes.set(times - 1);}
}
package org.wangpai.demo.lock;import java.util.concurrent.ConcurrentHashMap;
import lombok.Setter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;/*** @since 2022-3-13*/
@Component
public class DistributedLockFactory {private static ConcurrentHashMap<String, LockRenewal> asynchronousRenewals = new ConcurrentHashMap<>();private static LockTypeRegister lockTypeRegister;@Setterprivate static volatile int threadLimit = 100;public DistributedLockFactory(RedisTemplate<String, String> redisTemplate, LockTypeRegister register) {DistributedReentrantLock.setRedisTemplate(redisTemplate);LockRenewal.setRedisTemplate(redisTemplate);lockTypeRegister = register;}public static DistributedReentrantLock getDistributedLock(LockType lockType, String originKey) {var lockKey = LockTypeUtil.keyCompound(lockType, originKey);// 双重检查锁定:第一重判断if (!asynchronousRenewals.containsKey(lockKey)) {var lock = lockTypeRegister.getRegister().get(lockType);try {lock.lock(); // 对 lockType 上锁// 双重检查锁定:第二重判断if (!asynchronousRenewals.containsKey(lockKey)) {var timeRenewal = new LockRenewal();timeRenewal.setLockKey(lockKey).setStarted(true);// 当总线程数达到上限时,设置 timeRenewal 快速销毁if (asynchronousRenewals.entrySet().size() >= threadLimit) {timeRenewal.setFastClosed(true);}asynchronousRenewals.put(lockKey, timeRenewal);var renewalThead = new Thread(timeRenewal);timeRenewal.setRunningThread(renewalThead);renewalThead.start();}} finally {lock.unlock();}}return new DistributedReentrantLock(lockKey);}public static LockRenewal getRenewal(String name) {return asynchronousRenewals.get(name);}
}
package org.wangpai.demo.lock;import java.util.concurrent.TimeUnit;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.springframework.data.redis.core.RedisTemplate;/*** 为了避免反复新建线程的开销,此类会事先就后台运行,然后供所有的线程共用** @since 2022-3-19*/
@Accessors(chain = true)
public class LockRenewal implements Runnable {@Setterprivate static RedisTemplate<String, String> redisTemplate;@Setterprivate Thread runningThread;/*** 锁的名称** @since 2022-3-19*/@Setterprivate volatile String lockKey;/*** 控制线程的启动与终止** @since 2022-3-19*/@Setterprivate volatile boolean started = false;/*** 控制续时任务的暂停与恢复** @since 2022-3-19*/private volatile boolean isRunning = false;/*** 当系统的总线程数过高时,将此字段置位。此时当 isSuspended 也为 true 时,销毁本线程,而不是静默执行空任务** @since 2022-3-19*/@Setterprivate volatile boolean fastClosed = false;/*** 控制续时任务执行间隔时间,单位:秒** 注意:timeWaiting 值不能大于 timeRenewal 值。建议 timeWaiting 为 timeRenewal 的 1/3。* timeWaiting 与 timeRenewal 过于接近容易导致碰巧因启动时间差,而使续时任务正处于休眠状态而没有及时续时** @since 2022-3-19*/@Setterprivate volatile long timeWaiting = 20;/*** 控制续时时长,单位:秒。** @since 2022-3-19*/@Setterprivate volatile long timeRenewal = 60;private int count = 0;@Overridepublic void run() {System.out.println("续时线程启动");while (this.started) {try {if (this.fastClosed && !this.isRunning) {return;}// 第一步应该先休眠,而不应该马上续时try {Thread.sleep(this.timeWaiting * 1000);} catch (InterruptedException interruptedException) {// 续时任务被外部中断时,线程不退出this.afterInterrupt();continue; // 中断后应该重新开始}this.count++;if (this.isRunning) {this.renewDistributedLock();}} catch (Throwable throwable) {// 此 catch 块是为了避免中途某代码引发异常而导致此线程意外中止throwable.printStackTrace();}}System.out.println("续时线程终止");}/*** 此方法必须中断续时任务的休眠** @since 2022-3-19*/public void resume() {this.isRunning = true;this.count = 0;this.runningThread.interrupt();}/*** 此方法必须中断续时任务的休眠** @since 2022-3-19*/public void suspend() {this.isRunning = false;this.count = 0;this.runningThread.interrupt();}private void afterInterrupt() {this.runningThread.isInterrupted(); // 清除中断标志System.out.println("续时任务休眠中断,计数重置");}private void renewDistributedLock() {redisTemplate.expire(this.lockKey, this.timeRenewal, TimeUnit.SECONDS);System.out.println("第" + this.count + "次续时成功");}
}
package org.wangpai.demo.service;import org.springframework.boot.autoconfigure.cache.CacheType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.wangpai.demo.lock.DistributedLockFactory;
import org.wangpai.demo.lock.LockType;/*** @since 2022-3-20*/
@Service
public class DemoService {@Transactionalpublic DemoService demoService() {final var spinTime = 1; // 自旋时间,单位:秒var someKey="someKey";var lock = DistributedLockFactory.getDistributedLock(LockType.LOCK_1, someKey);try {int count = 0;// 获取分布式锁while (!lock.tryLock()) {try {Thread.sleep(spinTime * 1000);} catch (InterruptedException exception) {exception.printStackTrace();}// TODO:判断现在是否已经不需要得到锁了。如果是,退出此自旋System.out.println("第" + (++count) + "次没有拿到锁,尝试下一次");}System.out.println("得到分布式锁");// TODO:判断现在是否已经不需要得到锁了。如果是,直接放弃锁System.out.println("得到分布式锁,但可能已经不需要了"); // TODO:需要将此日志更正为更具体的日志信息// TODO:业务代码} finally {System.out.println("尝试释放分布式锁");// 无论前面是否抛出异常,此处都要释放锁。这不会释放别人的锁lock.unlock();}// TODO:不需要上锁的业务代码return this;}}

完整代码

  已上传至 GitCode 中,可免费下载:https://gitcode.net/wangpaiblog/20220321-distributedlock-redis

用 Redis 实现分布式锁(Java 版)相关推荐

  1. java如何保证redis设置过期时间的原子性_redis专题系列22 -- 如何优雅的基于redis实现分布式锁

    几个概念 线程锁:主要用来给方法.代码块加锁.当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段.线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比 ...

  2. java实现分布式redis锁_使用redis实现分布式锁

    # 简介: 当高并发访问某个接口的时候,如果这个接口访问的数据库中的资源,并且你的数据库事务级别是可重复读(Repeatable read)的话,确实是没有线程问题的,因为数据库锁的级别就够了:但是如 ...

  3. Java使用Redis实现分布式锁来防止重复提交问题

    如何用消息系统避免分布式事务? - 少年阿宾 - BlogJava http://www.blogjava.net/stevenjohn/archive/2018/01/04/433004.html ...

  4. Java基于redis实现分布式锁(SpringBoot)

    前言 分布式锁,其实原理是就是多台机器,去争抢一个资源,谁争抢成功,那么谁就持有了这把锁,然后去执行后续的业务逻辑,执行完毕后,把锁释放掉. 可以通过多种途径实现分布式锁,例如利用数据库(mysql等 ...

  5. java中使用Redis实现分布式锁

    前言 目前很多大型的互联网公司后端都采用了分布式架构来支撑前端应用,其中服务拆分就是分布式的一种体现,既然服务拆分了,那么多个服务协调工作就会出现一些资源竞争的情况.比如多个服务对同一个表中的数据进行 ...

  6. Java基于Redis实现分布式锁(原子性操作、续命)——90%以上都搞错了

    在使用分布式锁之前,要先思考一个问题,我们为什么要使用分布式锁? 这是因为,在分布式的部署环境下,原来的这个synchronized 只能在当前的JVM中加锁,不能跨JVM实现加锁,所以这种情况下我们 ...

  7. js 拉勾网效果_Node.js 中实践基于 Redis 的分布式锁实现

    在一些分布式环境下.多线程并发编程中,如果对同一资源进行读写操作,避免不了的一个就是资源竞争问题,通过引入分布式锁这一概念,可以解决数据一致性问题. 作者简介:五月君,Nodejs Developer ...

  8. 分布式锁的三种实现方式_基于 redis 的分布式锁实现

    云龙 资深运维开发工程师,负责游戏系统配置管理平台的设计和开发,目前专注于新 CMDB 系统的开发,平时也关注运维自动化,devops,python 开发等技术. 背景 CMDB 系统里面的机器数据会 ...

  9. 基于 Redis 实现分布式锁思考

    以下文章来源方志朋的博客,回复"666"获面试宝典 来源:blog.csdn.net/xuan_lu/article/details/111600302 分布式锁 基于redis实 ...

最新文章

  1. WordPress 2.9.2 使用感受
  2. 一张照片就能生成3D模型,GAN和自动编码器碰撞出奇迹,苏黎世联邦理工学院出品...
  3. Qt仿win7自动顶部最大化左侧右侧半屏效果
  4. 文档丨暴力破解性能问题
  5. android开不了机怎么办手机号码,手机开不了机怎么办 原因分析及其解决方法
  6. imac java7下载地址,如何在苹果电脑上安装JAVA开发工具,经验告诉你该这样
  7. 让我们深入了解PP YOLO做出的贡献
  8. MySQL必知必会(一)
  9. HCIE-Security Day3:防火墙特征和组网方式
  10. android webview 字体被放大,解决因为手机设置字体大小导致h5页面在webview中变形的BUG...
  11. hadoop 运行原理
  12. 外企常用英语词汇或短语
  13. spatial transformer network (STN)
  14. 《紫川》之远东战火 十二卷
  15. 1fichier.com-1TB免费FTP空间的使用
  16. Rank loss调研
  17. 电源管理芯片之 Regulator用法 Regulator framework
  18. java命令+eclipse.exe,我运行exe时Eclipse无法启动?
  19. WSL2 安装 Ubuntu-20.04 子系统CUDA(Win10和Win11)
  20. 【用户价值分析 RFM模型】用户价值分析

热门文章

  1. 第二天 PYTHON 基本数据类型 - 数字 - 字符串
  2. 疯狂.NET架构通用权限后台管理工具演示版2.0下载
  3. MySql增加字段、删除字段、修改字段名称、修改字段类型
  4. 前端开发笔记(2)css基础(上)
  5. springmvc中的全注解模式
  6. JSON解析---初识
  7. 如何使用动态链接库中的资源
  8. 工作多年的.NET程序员,是否建立了自己的开发知识库?分享制作电子书的经验...
  9. 《Effective C#》的读书笔记
  10. BAP存储属性的思想