2019独角兽企业重金招聘Python工程师标准>>>

1. 前因

以前实现过一个Redis实现的全局锁, 虽然能用, 但是感觉很不完善, 不可重入, 参数太多等等.

最近看到了一个新的Redis客户端Redisson, 看了下源码, 发现了一个比较好的锁实现RLock, 于是记录下.

2. Maven依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>1.2.1</version>
</dependency>

3. 初试

Redisson中RLock的使用很简单, 来看看一个最简单的例子.

import org.redisson.Redisson;
import org.redisson.core.RLock;public class Temp {public static void main(String[] args) throws Exception {Redisson redisson = Redisson.create();RLock lock = redisson.getLock("haogrgr");lock.lock();try {System.out.println("hagogrgr");}finally {lock.unlock();}redisson.shutdown();}}

4. RLock接口

通过上面的例子可以看出, 使用起来和juc里面的Lock接口使用很类似, 那么来看看RLock这个接口.

Rlock
|
----------Lock|----------void lock()|----------void lockInterruptibly()|----------boolean tryLock()|----------boolean tryLock(long time, TimeUnit unit)|----------void unlock()|----------Condition newCondition()
|
----------RObject|----------String getName()|----------void delete()
|
----------void lockInterruptibly(long leaseTime, TimeUnit unit)
|
----------boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
|
----------void lock(long leaseTime, TimeUnit unit)
|
----------void forceUnlock()
|
----------boolean isLocked();
|
----------boolean isHeldByCurrentThread()
|
----------int getHoldCount()

可以看到, 该接口主要继承了Lock接口, 然后扩展了部分方法, 比如:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)

新加入的leaseTime主要是用来设置锁的过期时间, 形象的解释就是, 如果超过leaseTime还没有解锁的话, 我就强制解锁.

5. RLock接口的实现

具体的实现类是RedissonLock, 下面来大概看看实现原理. 先看看 (3) 中例子执行时, 所运行的命令(通过monitor命令):

127.0.0.1:6379> monitor
OK
1434959509.494805 [0 127.0.0.1:57911] "SETNX" "haogrgr" "{\"@class\":\"org.redisson.RedissonLock$LockValue\",\"counter\":1,\"id\":\"c374addc-523f-4943-b6e0-c26f7ab061e3\",\"threadId\":1}"
1434959509.494805 [0 127.0.0.1:57911] "GET" "haogrgr"
1434959509.524805 [0 127.0.0.1:57911] "MULTI"
1434959509.529805 [0 127.0.0.1:57911] "DEL" "haogrgr"
1434959509.529805 [0 127.0.0.1:57911] "PUBLISH" "redisson__lock__channel__{haogrgr}" "0"
1434959509.529805 [0 127.0.0.1:57911] "EXEC"

可以看到, 大概原理是, 通过判断Redis中是否有某一key, 来判断是加锁还是等待, 最后的publish是一个解锁后, 通知阻塞在lock的线程.

分布式锁的实现依赖的单点, 这里Redis就是单点, 通过在Redis中维护状态信息来实现全局的锁. 那么来看看RedissonLock如何

实现可重入, 保证原子性等等细节.

6. 加锁源码分析

从最简单的无参数的lock参数来看源码.

public void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}
}public void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);    //leaseTime : -1 表示key不设置过期时间
}public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {Long ttl;if (leaseTime != -1) {ttl = tryLockInner(leaseTime, unit);} else {ttl = tryLockInner();}// lock acquiredif (ttl == null) {return;}subscribe().awaitUninterruptibly();try {while (true) {if (leaseTime != -1) {ttl = tryLockInner(leaseTime, unit);} else {ttl = tryLockInner();}// lock acquiredif (ttl == null) {break;}// waiting for messageRedissonLockEntry entry = ENTRIES.get(getEntryName());if (ttl >= 0) {entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {entry.getLatch().acquire();}}} finally {unsubscribe();}
}

代码有点多, 但是没关系, 慢慢分解, 由于这里我们是调用无参数的lock方法, 所以最后执行到的方法是:

private Long tryLockInner() {final LockValue currentLock = new LockValue(id, Thread.currentThread().getId());    //保存锁的状态: 客户端UUID+线程ID来唯一标识某一JVM实例的某一线程currentLock.incCounter();    //用来保存重入次数, 实现可重入功能, 初始情况是1//Redisson封装了交互的细节, 具体的逻辑为execute方法逻辑.return connectionManager.write(getName(), new SyncOperation<LockValue, Long>() {@Overridepublic Long execute(RedisConnection<Object, LockValue> connection) {Boolean res = connection.setnx(getName(), currentLock);    //如果key:haogrgr不存在, 就set并返回true, 否则返回falseif (!res) {    //如果设置失败, 那么表示有锁竞争了, 于是获取当前锁的状态, 如果拥有者是当前线程, 就累加重入次数并set新值connection.watch(getName());    //通过watch命令配合multi来实现简单的事务功能LockValue lock = (LockValue) connection.get(getName());if (lock != null && lock.equals(currentLock)) {    //LockValue的equals实现为比较客户id和threadid是否一样lock.incCounter();    //如果当前线程已经获取过锁, 则累加加锁次数, 并set更新connection.multi();connection.set(getName(), lock);if (connection.exec().size() == 1) {return null;    //set成功, }}connection.unwatch();//走到这里, 说明上面set的时候, 其他客户端在  watch之后->set之前 有其他客户端修改了key值//则获取key的过期时间, 如果是永不过期, 则返回-1, 具体处理后面说明Long ttl = connection.pttl(getName());return ttl;}return null;}});
}

tryLockInner的逻辑已经看完了,  可以知道, 有三种情况:

(1) key不存在, 加锁:

当key不存在时, 设置锁的初始状态并set, 具体来看就是 setnx   haogrgr   LockValue{ id: Redisson对象的id,  threadId: 当前线程id,  counter: 当前重入次数,这里为第一次获取,所以为1}

通过上面的操作. 达到获取锁的目的, 通过setnx来达到实现类似于  if(map.get(key) == null) { map.put(key) } 的功能, 防止多个客户端同时set时, 新值覆盖老值.

(2)key存在, 且获取锁的当前线程, 重入:

这里就是锁重入的情况, 也就是锁的拥有者第二次调用lock方法, 这时, 通过先get, 然后比较客户端ID和当前线程ID来判断拥有锁的线程是不是当前线程.(客户端ID+线程ID才能唯一定位锁拥有者线程)

判断发现当前是重入情况, 则累加LockValue的counter, 然后重新set回去, 这里使用到了watch和multi命令, 防止   get -> set   期间其他客户端修改了key的值.

(3)key存在, 且是其他线程获取的锁, 等待:

首先尝试获取锁(setnx), 失败后发现锁拥有者不是当前线程, 则获取key的过期时间, 返回过期时间

那么接下来看看tryLockInner调用完成后的处理代码.

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {Long ttl;if (leaseTime != -1) {ttl = tryLockInner(leaseTime, unit);} else {ttl = tryLockInner();   //lock()方法调用会走的逻辑}// lock acquiredif (ttl == null) {   //加锁成功(新获取锁, 重入情况) tryLockInner会返回null, 失败会返回key超时时间, 或者-1(key未设置超时时间)return;   //加锁成功, 返回}//subscribe这个方法代码有点多, Redisson通过netty来和redis通讯, 然后subscribe返回的是一个Future类型,//Future的awaitUninterruptibly()调用会阻塞, 然后Redisson通过Redis的pubsub来监听unlock的topic(getChannelName())//例如, 5中所看到的命令 "PUBLISH" "redisson__lock__channel__{haogrgr}" "0"//当解锁时, 会向名为 getChannelName() 的topic来发送解锁消息("0")//而这里 subscribe() 中监听这个topic, 在订阅成功时就会唤醒阻塞在awaitUninterruptibly()的方法. //所以线程在这里只会阻塞很短的时间(订阅成功即唤醒, 并不代表已经解锁)subscribe().awaitUninterruptibly();try {while (true) {    //循环, 不断重试lockif (leaseTime != -1) {ttl = tryLockInner(leaseTime, unit);} else {ttl = tryLockInner();   //不多说了}// lock acquiredif (ttl == null) {break;}// 这里才是真正的等待解锁消息, 收到解锁消息, 就唤醒, 然后尝试获取锁, 成功返回, 失败则阻塞在acquire().// 收到订阅成功消息, 则唤醒阻塞上面的subscribe().awaitUninterruptibly();// 收到解锁消息, 则唤醒阻塞在下面的entry.getLatch().acquire();RedissonLockEntry entry = ENTRIES.get(getEntryName());if (ttl >= 0) {entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {entry.getLatch().acquire();}}} finally {unsubscribe();  //加锁成功或异常,解除订阅}
}

主要的代码都加上了详细的注释, subscribe() 方法的代码复杂些, 但具体就是利用redis的pubsub提供一个通知机制来减少不断的重试.

很多的Redis锁实现都是失败后sleep一定时间后重试, 在锁被占用时间较长时, 不断的重试是浪费, 而sleep也会导致不必要的时间浪费(在sleep期间可能已经解锁了), sleep时间太长, 时间浪费, 太短, 重试次数会增加~~~.

到这里lock的逻辑已经看完了, 其他的比如tryLock方法逻辑和lock类似, 不过加了超时时间, 然后还有一种lock方法就是对key加上了过期时间.

7. 解锁源码

unlock的逻辑相对简单.

public void unlock() {connectionManager.write(getName(), new SyncOperation<Object, Void>() {@Overridepublic Void execute(RedisConnection<Object, Object> connection) {LockValue lock = (LockValue) connection.get(getName());if (lock != null) {LockValue currentLock = new LockValue(id, Thread.currentThread().getId());if (lock.equals(currentLock)) {if (lock.getCounter() > 1) {lock.decCounter();connection.set(getName(), lock);} else {unlock(connection);}} else {throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "+ id + " thread-id: " + Thread.currentThread().getId());}} else {// could be deleted}return null;}});
}private void unlock(RedisConnection<Object, Object> connection) {int counter = 0;while (counter < 5) {connection.multi();connection.del(getName());connection.publish(getChannelName(), unlockMessage);List<Object> res = connection.exec();if (res.size() == 2) {return;}counter++;}throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: "+ id + " thread-id: " + Thread.currentThread().getId());
}

具体的逻辑比较简单, 我就不注释了, 大概就是, 如果是多次重入的, 就以此递减然后 set, 如果是只lock一次的, 就删除, 然后publish一条解锁的message到getChannelName() tocpic.

这里解锁会重试五次, 失败就抛异常.

8.总结

逻辑并不复杂, 但是通过记录客户端ID和线程ID来唯一标识线程, 实现重入功能, 通过pub sub功能来减少空转.

优点: 实现了Lock的大部分功能, 提供了特殊情况方法(如:强制解锁, 判断当前线程是否已经获取锁, 超时强制解锁等功能), 可重入, 减少重试.

缺点: 使用依赖Redisson, 而Redisson依赖netty, 如果简单使用, 引入了较多的依赖, pub sub的实时性需要测试, 没有监控等功能, 查问题麻烦, 统计功能也没有(例如慢lock日志, 2333333).

感觉靠谱, 可以用,233333333.

转载于:https://my.oschina.net/haogrgr/blog/469439

Redis实现分布式锁全局锁—Redis客户端Redisson中分布式锁RLock实现相关推荐

  1. redisson的锁的类型_利用Redisson实现分布式锁,并防止重复提交

    关于Redisson的基础概念,参照Redisson基础. 要想实现此功能需要以下几步: 1.依赖包 这里用的是jdk8+的版本 org.redisson redisson 3.3.2 2.Sprin ...

  2. mysql悲观锁只用于读取吗_MySQL中悲观锁和乐观锁到底是什么?

    索引和锁是数据库中的两个核心知识点,隔离级别的实现都是通过锁来完成的 按照锁颗粒对锁进行划分 ? 锁用来对数据进行锁定,我们可以从锁定对象的粒度大小来对锁进行划分,分别为行锁.页锁和表锁.行锁就是按照 ...

  3. 阿里mysql锁_【mysql】mysql中的锁机制

    一.分类 MySQL的锁机制不同的存储引擎支持不同的锁机制,分为表级锁.行级锁.页面锁.MyISAM和MEMORY存储引擎采用的是表级锁(table-level locking):BDB存储引擎采用的 ...

  4. mysql 查询 锁表_怎么查找mysql中的锁表语句?

    查看sql server数据库被锁表可以用用如下语句: 也可以用如下语句: 拓展资料: 锁定数据库的一个表的区别SELECT * FROM table WITH (HOLDLOCK) 其他事务可以读取 ...

  5. 老夫带你深度剖析Redisson实现分布式锁的原理

    Redis实现分布式锁的原理 前面讲了Redis在实际业务场景中的应用,那么下面再来了解一下Redisson功能性场景的应用,也就是大家经常使用的分布式锁的实现场景. 引入redisson依赖 < ...

  6. Redisson实现分布式锁(3)—项目落地实现

    Redisson实现分布式锁(3)-项目落地实现 有关Redisson实现分布式锁前面写了两篇博客作为该项目落地的铺垫. 1.Redisson实现分布式锁(1)-原理 2.Redisson实现分布式锁 ...

  7. sql 闩锁 原因_关于SQL Server中的闩锁

    sql 闩锁 原因 SQL Server locks, discussed in the article All about locking in SQL Server, which is appli ...

  8. JAVA偏向锁的什么时候释放_Java中的偏向锁

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 Java偏向锁(Biased Locking)是Java6引入的一项多线程优化. 偏向锁,顾名思义,它会偏向于第一个访问锁的线程,如果在运行过程中,同步锁 ...

  9. 一篇blog带你了解java中的锁

    前言 最近在复习锁这一块,对java中的锁进行整理,本文介绍各种锁,希望给大家带来帮助. Java的锁 乐观锁 乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人 ...

最新文章

  1. 接口测试用例设计思路_用了Swagger2后,接口设计文档,测试用例都不用自己写了,爽...
  2. mysql模糊查询的优化方法--亲自实践
  3. HSQLDB两条基本命令
  4. IDEA把Java Web导出为war文件
  5. ubuntu挂载windows下的文件目录的步骤
  6. C++学习之路 | PTA乙级—— 1025 反转链表 (20分)(精简)
  7. flexcell控件 许可证信息没有找到_报表控件 ActiveReports 全面迎来 .Net Core 时代
  8. shell之for循环使用---更新中
  9. Java 容器之 Connection栈队列及一些常用
  10. textarea光标处插入文字
  11. Struts2升级版本至2.5.10,高危漏洞又来了
  12. SaaS应用出路何在?
  13. android bugly qq,Android如何快速集成腾讯Bugly
  14. Java poi生成Excel加密文件
  15. Servlet内存马
  16. docker 安装mysql8.0并且暴漏外部的连接
  17. 《中国垒球协会》:新春贺词
  18. CS0012 错误。必须添加对程序集”xxxxx,Version=4.0.0.0,Culture=neutral,PublicKeyToken=xxxxxxx“的引用
  19. 数据分析师是否是青春饭,对年龄有限制吗?
  20. C#中文和UNICODE编码转换【转】

热门文章

  1. 读书狂想之《穷爸爸,富爸爸》财富观
  2. oracle索引大小暴增_oracle 如何预估将要创建的索引的大小
  3. Count Color(poj 2777)
  4. 插入10000条数据测试DB性能
  5. 解决set /p yn= 接受键盘输入导致ECHO 处于关闭状态的问题
  6. 解决win8.1企业版安装WP8 SDK出现“根据当前系统时钟或签名文件中的时间戳验证时要求的证书不在有效期内”的问题
  7. stale element reference: element is not attached to the page document 异常
  8. 阿里云高可用-容灾解决方案
  9. Java执行main方法,异常为:could not find the main class.program will exit
  10. C# 加载图片image --(C#)Image.FromFile 方法会锁住文件的原因及可能的解决方法