在开始提到Redis分布式锁之前,我想跟大家聊点Redis的基础知识。

说一下Redis的两个命令:

SETNX key value

setnx 是SET if Not eXists(如果不存在,则 SET)的简写。

用法如图,如果不存在set成功返回int的1,这个key存在了返回0。

SETEX key seconds value

将值 value 关联到 key ,并将 key 的生存时间设为 seconds (以秒为单位)。

如果 key 已经存在,setex命令将覆写旧值。

有小伙伴肯定会疑惑万一set value 成功 set time失败,那不就傻了么,这啊Redis官网想到了。

setex是一个原子性(atomic)操作,关联值和设置生存时间两个动作会在同一时间内完成。

我设置了10秒的失效时间,ttl命令可以查看倒计时,负的说明已经到期了。

跟大家讲这两个命名也是有原因的,因为他们是Redis实现分布式锁的关键。

正文

开始前还是看看场景:

我依然是创建了很多个线程去扣减库存inventory,不出意外的库存扣减顺序变了,最终的结果也是不对的。

单机加synchronized或者Lock这些常规操作我就不说了好吧,结果肯定是对的。

我先实现一个简单的Redis锁,然后我们再实现分布式锁,可能更方便大家的理解。

还记得上面我说过的命令么,实现一个单机的其实比较简单,你们先思考一下,别往下看。

setnx

可以看到,第一个成功了,没释放锁,后面的都失败了,至少顺序问题问题是解决了,只要加锁,缩放后面的拿到,释放如此循环,就能保证按照顺序执行。

但是你们也发现问题了,还是一样的,第一个仔set成功了,但是突然挂了,那锁就一直在那无法得到释放,后面的线程也永远得不到锁,又死锁了。

所以....

setex

知道我之前说这个命令的原因了吧,设置一个过期时间,就算线程1挂了,也会在失效时间到了,自动释放。

我这里就用到了nx和px的结合参数,就是set值并且加了过期时间,这里我还设置了一个过期时间,就是这时间内如果第二个没拿到第一个的锁,就退出阻塞了,因为可能是客户端断连了。

加锁

整体加锁的逻辑比较简单,大家基本上都能看懂,不过我拿到当前时间去减开始时间的操作感觉有点笨, System.currentTimeMillis()消耗很大的。

/*** 加锁** @param id* @return*/
public boolean lock(String id) {Long start = System.currentTimeMillis();try {for (; ; ) {//SET命令返回OK ,则证明获取锁成功String lock = jedis.set(LOCK_KEY, id, params);if ("OK".equals(lock)) {return true;}//否则循环等待,在timeout时间内仍未获取到锁,则获取失败long l = System.currentTimeMillis() - start;if (l >= timeout) {return false;}try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} finally {jedis.close();}
}

System.currentTimeMillis消耗大,每个线程进来都这样,我之前写代码,就会在服务器启动的时候,开一个线程不断去拿,调用方直接获取值就好了,不过也不是最优解,日期类还是有很多好方法的。

@Service
public class TimeServcie {private static long time;static {new Thread(new Runnable(){@Overridepublic void run() {while (true){try {Thread.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}long cur = System.currentTimeMillis();setTime(cur);}}}).start();}public static long getTime() {return time;}public static void setTime(long time) {TimeServcie.time = time;}
}

解锁

解锁的逻辑更加简单,就是一段Lua的拼装,把Key做了删除。

你们发现没,我上面加锁解锁都用了UUID,这就是为了保证,谁加锁了谁解锁,要是你删掉了我的锁,那不乱套了嘛。

LUA是原子性的,也比较简单,就是判断一下Key和我们参数是否相等,是的话就删除,返回成功1,0就是失败。

/*** 解锁** @param id* @return*/
public boolean unlock(String id) {String script ="if redis.call('get',KEYS[1]) == ARGV[1] then" +"   return redis.call('del',KEYS[1]) " +"else" +"   return 0 " +"end";try {String result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();return "1".equals(result) ? true : false;} finally {jedis.close();}
}

验证

我们可以用我们写的Redis锁试试效果,可以看到都按照顺序去执行了

思考

大家是不是觉得完美了,但是上面的锁,有不少瑕疵的,我没思考很多点,你或许可以思考一下,源码我都开源到我的GItHub了。

而且,锁一般都是需要可重入行的,上面的线程都是执行完了就释放了,无法再次进入了,进去也是重新加锁了,对于一个锁的设计来说肯定不是很合理的。

我不打算手写,因为都有现成的,别人帮我们写好了。

redisson

redisson的锁,就实现了可重入了,但是他的源码比较晦涩难懂。

使用起来很简单,因为他们底层都封装好了,你连接上你的Redis客户端,他帮你做了我上面写的一切,然后更完美。

简单看看他的使用吧,跟正常使用Lock没啥区别。

ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(inventory, inventory, 10L, SECONDS, linkedBlockingQueue);
long start = System.currentTimeMillis();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
final RedissonClient client = Redisson.create(config);
final RLock lock = client.getLock("lock1");for (int i = 0; i <= NUM; i++) {threadPoolExecutor.execute(new Runnable() {public void run() {lock.lock();inventory--;System.out.println(inventory);lock.unlock();}});
}
long end = System.currentTimeMillis();
System.out.println("执行线程数:" + NUM + "   总耗时:" + (end - start) + "  库存数为:" + inventory);

上面可以看到我用到了getLock,其实就是获取一个锁的实例。

RedissionLock也没做啥,就是熟悉的初始化。

public RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);
}public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);//命令执行器this.commandExecutor = commandExecutor;//UUID字符串this.id = commandExecutor.getConnectionManager().getId();//内部锁过期时间this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;
}

加锁

有没有发现很多跟Lock很多相似的地方呢?

尝试加锁,拿到当前线程,然后我开头说的ttl也看到了,是不是一切都是那么熟悉?

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//当前线程IDlong threadId = Thread.currentThread().getId();//尝试获取锁Long ttl = tryAcquire(leaseTime, unit, threadId);// 如果ttl为空,则证明获取锁成功if (ttl == null) {return;}//如果获取锁失败,则订阅到对应这个锁的channelRFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {//再次尝试获取锁ttl = tryAcquire(leaseTime, unit, threadId);//ttl为空,说明成功获取锁,返回if (ttl == null) {break;}//ttl大于0 则等待ttl时间后继续尝试获取if (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {//取消对channel的订阅unsubscribe(future, threadId);}//get(lockAsync(leaseTime, unit));
}

获取锁

获取锁的时候,也比较简单,你可以看到,他也是不断刷新过期时间,跟我上面不断去拿当前时间,校验过期是一个道理,只是我比较粗糙。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {//如果带有过期时间,则按照普通方式获取锁if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//先按照30秒的过期时间来执行获取锁的方法RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
}

底层加锁逻辑

你可能会想这么多操作,在一起不是原子性不还是有问题么?

大佬们肯定想得到呀,所以还是LUA,他使用了Hash的数据结构。

主要是判断锁是否存在,存在就设置过期时间,如果锁已经存在了,那对比一下线程,线程是一个那就证明可以重入,锁在了,但是不是当前线程,证明别人还没释放,那就把剩余时间返回,加锁失败。

是不是有点绕,多理解一遍。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     long threadId, RedisStrictCommand<T> command) {//过期时间internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//如果锁不存在,则通过hset设置它的值,并设置过期时间"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果锁已存在,但并非本线程,则返回过期时间ttl"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

解锁

锁的释放主要是publish释放锁的信息,然后做校验,一样会判断是否当前线程,成功就释放锁,还有个hincrby递减的操作,锁的值大于0说明是可重入锁,那就刷新过期时间。

如果值小于0了,那删掉Key释放锁。

是不是又和AQS很像了?

AQS就是通过一个volatile修饰status去看锁的状态,也会看数值判断是否是可重入的。

所以我说代码的设计,最后就万剑归一,都是一样的。

public RFuture<Void> unlockAsync(final long threadId) {final RPromise<Void> result = new RedissonPromise<Void>();//解锁方法RFuture<Boolean> future = unlockInnerAsync(threadId);future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {cancelExpirationRenewal(threadId);result.tryFailure(future.cause());return;}//获取返回值Boolean opStatus = future.getNow();//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}//解锁成功,取消刷新过期时间的那个定时任务if (opStatus) {cancelExpirationRenewal(null);}result.trySuccess(null);}});return result;
}protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,//如果锁已经不存在, 发布锁释放的消息"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//通过hincrby递减1的方式,释放一次锁//若剩余次数大于0 ,则刷新过期时间"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +//否则证明锁已经释放,删除key并发布锁释放的消息"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}

字节面试:如何用Redis实现一个分布式锁?相关推荐

  1. 【Redis笔记】一起学习Redis | 如何利用Redis实现一个分布式锁?

    一起学习Redis | 如何利用Redis实现一个分布式锁? 前提知识 什么是分布式锁? 为什么需要分布式锁? 分布式锁的5要素和三种实现方式 实现分布式锁 思考思考 基础方案 改进方案 保证setn ...

  2. 小王,在 Java 中如何利用 redis 实现一个分布式锁服务呢???

    作者:杨高超 juejin.im/post/5a4984af6fb9a0450b66bc57 在现代的编程语言中,接触过多线程编程的程序员多多少少对锁有一定的了解.简单的说,多线程中的锁就是在多线程环 ...

  3. 基于Redis实现一个分布式锁

    与分布式锁相对应的是「单机锁」,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来「互斥」,以保证共享变量的正确性,其使用范围是在「同一个进程」中. 一.为什么需要分布式锁 ...

  4. Redis面试常问3 如何实现分布式锁 记住Redis的原子性

    Redis面试常问3 如何实现分布式锁 上面的伪代码有问题 从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改: http://redisdoc.com/string ...

  5. Redlock——Redis集群分布式锁

    欢迎关注方志朋的博客,回复"666"获面试宝典 前言 分布式锁是一种非常有用的技术手段.实现高效的分布式锁有三个属性需要考虑: 安全属性:互斥,不管什么时候,只有一个客户端持有锁 ...

  6. 阿里JAVA面试题剖析:一般实现分布式锁都有哪些方式?使用 Redis 如何设计分布式锁?...

    面试原题 一般实现分布式锁都有哪些方式?使用 redis 如何设计分布式锁?使用 zk 来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 面试官心理分析 其实一般问问题,都是这么问的,先 ...

  7. 基于 Redis 实现的分布式锁

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:我的大学到研究生自学 Java 之路,过程艰辛,不放弃,保持热情,最终发现我是这样拿到大厂 offer 的! 作 ...

  8. Redis:Redisson分布式锁的使用(推荐使用)

    Redis:Redisson分布式锁的使用(生产环境下)(推荐使用) 关键词 基于NIO的Netty框架,生产环境使用分布式锁 redisson加锁:lua脚本加锁(其他客户端自旋) 自动延时机制:启 ...

  9. redistemplate分布式锁实现_基于 Redis SETNX 实现分布式锁

    环境与配置 Redis 任意版本即可 SpringBoot 任意版本即可,但是需要依赖 spring-boot-starter-data-redis <dependency><gro ...

最新文章

  1. 微型计算机步进电机控制,步进电机的微型计算机控制
  2. 如何快速开发一个博客
  3. Landsat 8 OLI_TIRS 卫星数字产品
  4. winFrom简单引用Webservice
  5. 计算机应用基础中专教材pdf,中等职业教育通用教材-计算机应用基础.pdf
  6. CodeForces - 1366E Two Arrays(组合数学+思维)
  7. 美汽车销售商使用RFID汽车,加快销售速度
  8. 03_SpringCloud整合Ribbon实现负载均衡
  9. DatabaseMetaData 获取mysql表和字段注释
  10. 让UpdatePanel支持上传文件:解决当页面显式设置document.domain时提示的500错误
  11. 用了10年海尔家电,青岛一音乐老师为海尔写了1首歌
  12. SylixOS Makefile 源代码解析
  13. hbase权威指南-客户端API高级特性
  14. 透视特洛伊木马程序开发技术(转)
  15. msconfig蓝屏_电脑msconfig改动后蓝屏怎么修复
  16. ips 测试软件,IPS测试方法.doc
  17. 李白:下终南山过斛斯山人宿置酒
  18. 利用α-β搜索的博弈树算法编写一字棋游戏 python
  19. android 电视语音遥控器,基于遥控器的Android电视语音聊天系统及其方法与流程
  20. JS将秒数换算成时分秒 以及转化为年月日 时分秒

热门文章

  1. 基于jquery的tab切换
  2. centos 7 vs centos6 的不同
  3. Selenium2+python自动化34-获取百度输入联想词
  4. runtime简单的使用解决实际问题(交换方法)
  5. MySQL 5.6 for Windows 解压缩版配置安装(转)
  6. 浮点数:一种有漏洞的抽象【译】
  7. 12、(12.4.2)保护模式下数据段和栈段保护
  8. 5v 3.3v电平转换电路_MOS管电平转换电路,硬件工程师居家旅行、看门护院的必备良药...
  9. 如何在input输入框中加一个搜索的小图片_仿淘宝搜索栏
  10. LeetCode---binary-tree-inorder-traversal