文章目录

  • 1. synchronized解决单应用下并发安全问题
  • 2. 手写redis分布式锁解决分布式环境下并发安全问题
  • 3. redisson分布式锁解决分布式环境下并发安全问题
    • 3.0 Redission分布式锁的常用API
    • 3.1 lock.lock()
    • 3.2 lock.unlock()
    • 3.3 Redission分布式锁的应用
  • 4. redisson的其他的分布式锁

redis由于操作的是内存空间,访问速度比磁盘io(数据库)高很多,所以当项目中有较高的访问量时,热门数据一般会存放在redis中,这样可以有效减轻数据库的压力。但是在享受redis高性能的同时,也会存在各种缓存失效、并发安全、数据不一致等问题!

1. synchronized解决单应用下并发安全问题

说到多线程并发问题,根据以往经验,首先想到的应该是加锁保证并发安全。在单体项目架构中,确实是这样的,以减库存为例,操作如下

①:从redis中获取库存数量
②:如果库存大于0,执行减库存。如果库存小于0,给出提示
③:将减少后剩余的库存写入redis中

在上面三个步骤的执行过程中,如果多线程并发执行,那将会有数据不一致的问题。

比如线程1 和 线程2同时获取到库存数量为50,两个线程各自在自己的工作空间内减库存,并写入redis。这样一来,本来应该剩余48个库存,实际上redis的库存量却为49个,这样就会因数据不一致引发了超卖问题!

解决方案:使用synchronized代码块,或者在方法上加synchronized

代码示例:

     synchronized (this) {//获取库存int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {//减库存int realStock = stock - 1;//更新缓存stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}

注意

  1. 使用synchronized代码块,或者在方法上加synchronized,都属于本地锁,只针对单个应用起作用,锁的对象是this,因为springboot默认单例,不会产生多个实例,如果在分布式环境下,因为锁的不是单个实例,所以本地锁不起作用!!!

  2. 查到的数据一定要在synchronized代码块内放入redis,“查库存-减库存-更新缓存” 要保证原子操作

2. 手写redis分布式锁解决分布式环境下并发安全问题

分布式环境下因为有多个应用实例,比如使用了8个商品微服务,用来做负载均衡。本地锁synchronized只能锁住当前服务下的对象,不能锁住8个对象,会产生缓存失效问题。

多应用情况下可以考虑使用redis的set NX PX来实现锁的功能!

NX : 如果当前key有值,则无法修改(修改失败返回null,成功返回ok)
PX: 设置当前key的过期时间(毫秒)
EX: 设置当前key的过期时间(秒)

步骤如下:

     boolean ok = set key value nxif(ok){   获取库存...减少库存...更新库存...}finally{删除key}

但还可能存在以下问题:

问题一:如果执行业务代码执行抛出异常,没人删除锁,就会造成死锁!

解决:设置锁的过期时间,set NX EX 设置分布式锁 和 设置过期时间一定要保证原子操作

问题二:如果业务代码超长(45秒),执行时间超过了锁的自动过期时间(30秒),当前主线程执行完业务代码后,删除锁时把别的线程的锁删除了,怎么办?

解决:
①:设置分布式锁(占坑)时,给value加上一个uuid,删除锁时比较value,如果相等才执行删除,保证每个线程只能删除自己的锁!
②:使用定时器给锁自动续命。主线程在执行任务时,开辟出一个子线程执行定时任务,每隔一定时间检测当前主线程的锁是否还存在,如果存在,再续30秒时间。如果不在,则结束子线程!

问题三:执行删除锁前,当前线程要从redis先获取到key的值,与自己的uuid比较,在这个从redis获取到key的值网络交互的过程中,是要消耗一定时间的。比如在获取到自己的key值返回的过程中,刚巧redis中自己的key过期了,这样,执行删除时还是把别人的key给删了!怎么办?

解决:使用redis提供的lua脚本,这个问题和第一条问题类似,都是需要保证原子性!

//redis 官网提供的解锁lua脚本
String lua =  "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

最终代码示例如下:

 //从数据库中获取首页数据(分布式锁)public Map<String, List<Catelog2Vo>> getCatelogJsonFromDbWithRedisLock() {/**1 多个线程先去占分布式锁 ,去redis中占个坑相当于 set nx (key :lock, value:1111)*(1)设置过期时间,防止业务逻辑出现异常,锁没人删除,出现死锁!*(2)设置uuid,保证每个线程都删除的是自己的锁*/String uuid = UUID.randomUUID().toString();Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);if (lock) {try {//加锁成功 ..执行业务逻辑获取库存...减少库存...更新库存...} finally {/**执行完先比较value 再释放锁* 弃用原因:redis获取到key的值网络交互的过程中,是要消耗一定时间的。* 比如在获取到自己的key值返回的过程中,刚巧redis中自己的key过期了,这样,执行删除时还是把别人的key给删了!String lock1 = redisTemplate.opsForValue().get("lock");if (uuid.equals(lock1)){//比较,相等时才删除自己的锁,防止误删别人的redisTemplate.delete("lock");}*///保证解锁的原子性:使用redis 官网提供的解锁lua脚本String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";//调用redisTemplate提供的调用脚本方法!execute//Integer:返回值类型  成功返回1 失败返回0//如果uuid和redis中的值一样才删除redisTemplate.execute(new DefaultRedisScript<Integer>(lua, Integer.class), Arrays.asList("lock"), uuid);}return dataFromDb;} else {//加锁失败,lock已经有值,被别的线程占用,重试,自旋try {Thread.sleep(100); //睡眠100毫秒,防止线程溢出return getCatelogJsonFromDbWithRedisLock(); //自旋} catch (InterruptedException e) {e.printStackTrace();}}return null;}

3. redisson分布式锁解决分布式环境下并发安全问题

redisson完全实现了juc的功能,不仅有锁,还都是分布式锁!

问题1:redisson框架的实现原理,为什么会满足分布式锁的要求?

原理图如上,虽然redisson可以保证大部分时候分布式锁的安全性,但是有些特殊情况,还是会出现并发安全问题!比如哨兵架构下:redis的主节点master 在向slave节点同步数据时,主节点挂掉了,剩余的slave节点通过哨兵选举出了一个新的master节点。此时旧的master节点中setnx的key还没有同步到新的master节点,新的master节点中由于已经对外提供服务,其他线程也会由setnx创建key,这样就存在多个setnx的key,此时redis的setnx失效!可能会出现并发安全(超卖)等问题!

解决方案:

  • ①:如果必须保证强一致性,可以使用zookeeper实现分布式锁,但zookeeper没有redis的性能高。需要权衡利弊来使用,其实大部分情况下。可以容忍一些极小概率才可能出险的问题,如果出现,可以使用日志记录下来并通过人工客服协助解决!
  • ②:如果还是想用redis解决这个问题,可以使用红锁redlock。redlock规定使用setnx命令时不再只向一个master节点发送命令,而是向所有节点发送,当超过半数以上节点加锁成功时才算加锁成功!但是不建议使用redis的redlock,最主要的原因还是性能问题!以前只需要与一台redis交互,现在变为多台,性能下降!

代码示例

        String lockKey = "product_001";RLock redissonLock = redisson.getLock(lockKey);try {//加锁redissonLock.lock();  // setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS)int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {//释放锁redissonLock.unlock();}

3.0 Redission分布式锁的常用API

public interface RRLock {//----------------------Lock接口方法-----------------------/*** 加锁 锁的有效期默认30秒*/void lock();/*** tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .*/boolean tryLock();/*** tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。** @param time 等待时间* @param unit 时间单位 小时、分、秒、毫秒等*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/*** 解锁*/void unlock();/*** 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过* Thread.currentThread().interrupt(); 方法真正中断该线程*/void lockInterruptibly();//----------------------RLock接口方法-----------------------/*** 加锁 上面是默认30秒这里可以手动设置锁的有效时间** @param leaseTime 锁有效时间* @param unit      时间单位 小时、分、秒、毫秒等*/void lock(long leaseTime, TimeUnit unit);/*** 这里比上面多一个参数,多添加一个锁的有效时间** @param waitTime  等待时间* @param leaseTime 锁有效时间* @param unit      时间单位 小时、分、秒、毫秒等*      * tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,* 一般分布式锁建议用void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍*/boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;/*** 检验该锁是否被线程使用,如果被使用返回True*/boolean isLocked();/*** 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)* 这个比上面那个实用*/boolean isHeldByCurrentThread();/*** 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间* @param leaseTime  锁有效时间* @param unit       时间单位 小时、分、秒、毫秒等*/void lockInterruptibly(long leaseTime, TimeUnit unit);
}
    /* 加锁 上面是默认30秒这里可以手动设置锁的有效时间** @param leaseTime 锁有效时间* @param unit      时间单位 小时、分、秒、毫秒等*/void lock(long leaseTime, TimeUnit unit);/*** 这里比上面多一个参数,多添加一个锁的有效时间** @param waitTime  等待时间* @param leaseTime 锁有效时间* @param unit      时间单位 小时、分、秒、毫秒等* * tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,* 一般分布式锁建议用void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍*/boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;/*** 检验该锁是否被线程使用,如果被使用返回True*/boolean isLocked();/*** 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)* 这个比上面那个实用*/boolean isHeldByCurrentThread();/*** 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间* @param leaseTime  锁有效时间* @param unit       时间单位 小时、分、秒、毫秒等*/void lockInterruptibly(long leaseTime, TimeUnit unit);
}

3.1 lock.lock()

lock:redisson在调用lock方法时,会使用lua脚本尝试获取锁。

  • ①:如果获取到锁,则以hash类型来存储分布式锁,默认30秒过期,并绑定当前线程。然后创建一个看门狗监听器,每隔10秒监听当前线程的锁是否过期,如果业务时间超长,10秒后锁未过期,看门狗就会再次为当前线程的锁续期30秒!
  • ②:如果通过比对线程发现锁发生了重入,就使用hincrby命令进行自增。这个步骤和ReentranLock类似!
  • ③:如果没有获取到锁,则返回锁的过期时间,用于看门狗判断!

lock.lock源码

    @Overridepublic void lock() {try {//代表是可中断的锁lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
    @Overridepublic void lockInterruptibly() throws InterruptedException {//这里默认传入的锁的过期时间是 -1 ,代表由redission的看门狗来控制lockInterruptibly(-1, null);}
    private Long tryAcquire(long leaseTime, TimeUnit unit) {if (leaseTime != -1) {return get(tryLockInnerAsync(leaseTime, unit, Thread.currentThread().getId()));}return get(tryLockInnerAsync(Thread.currentThread().getId()));}
  • 如果使用lock.lock();来加锁,则默认传入的过期时间为-1 ,走上述代码中下边的方法的逻辑,由redission的看门狗机制来控制锁的过期和续期

        private Future<Long> tryLockInnerAsync(final long threadId) {// 1. 使用lua脚本获取锁 // LOCK_EXPIRATION_INTERVAL_SECONDS = 30秒Future<Long> ttlRemaining = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId);// 2. 开启一个子线程。设置看门狗,每隔10秒获取一次锁ttlRemaining.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();if (ttlRemaining == null) {//看门狗逻辑scheduleExpirationRenewal();}}});return ttlRemaining;}
    

    tryLockInnerAsync获取锁方法也是通过lua脚本来保证原子性的,lua脚本如下!

        Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

    scheduleExpirationRenewal方法,看门狗逻辑如下:

        private void scheduleExpirationRenewal() {if (expirationRenewalMap.containsKey(getEntryName())) {return;}//看门狗 也是一个定时线程池Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {Future<Boolean> future = expireAsync(internalLockLeaseTime, TimeUnit.MILLISECONDS);future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal();}}});}// 定时线程池,每隔 internalLockLeaseTime / 3 = 10秒,检查一次锁是否过期!}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
    
  • 如果使用lock.lock(5,TimeUnit.SECONDS);来加锁,自己设置这把锁5秒后过期,则走上述代码中if中的的方法逻辑,不再使用redission的看门狗机制来控制锁的过期和续期,直接使用lua脚本获取锁,没有续期功能!

        Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
    

问题:lock.lock() 和 lock.lock(10, TimeUnit.SECONDS)两种加锁方式有什么区别?

  • 首先如果我们自己设置了过期时间,就不会调用看门狗机制,不会给锁自动续期,当业务逻辑时间大于锁的超时时间时,会抛出异常!
  • 如果我们传递了锁的超时时间,就会发送redis执行脚本,进行占锁,超时时间就是我们自己设置的时间,没有定时任务续期机制!
  • 如果我们未指定锁的超时时间,就默认使用30s【lockWatchdogTimeout看门狗的默认时间】,只要占锁成功,就会启动一个定时任务,重新给锁设置30秒过期时间,定时任务每隔10秒执行一次,为看门狗时间的1/3

3.2 lock.unlock()

unlock:redisson在调用unlock方法时,也会使用lua脚本尝试解锁

  • unlock会使用del命令删除锁,如果是可重入锁,则一层一层删除!

unlock的Lua脚本如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}

3.3 Redission分布式锁的应用

以 获取商品详情信息为例

方式一:使用lock.tryLock(0, 5, TimeUnit.SECONDS)

    /*** 获取商品详情信息* @param id 产品ID*/public PmsProductParam getProductInfoNew(Long id) {//从 redis 中获取商品信息PmsProductParam productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);if (null != productInfo) {//如果redis中有值直接返回return productInfo;}//如果redis中没值,就要查数据库,加分布式锁// 1. 获取分布式锁RLock lock = redission.getLock(lockPath + id);try {try {/**  加锁*  @param waitTime 等待时间:0s*  @param leaseTime 锁有效时间:5s*  @param unit 时间单位 小时、分、秒、毫秒等*/if (lock.tryLock(0, 5, TimeUnit.SECONDS)) {// 2. 查数据库productInfo = portalProductDao.getProductInfo(id);System.out.println("走数据库" + id);if (null == productInfo) {return null;}FlashPromotionParam promotion = flashPromotionProductDao.getFlashPromotion(id);if (!ObjectUtils.isEmpty(promotion)) {productInfo.setFlashPromotionCount(promotion.getRelation().get(0).getFlashPromotionCount());productInfo.setFlashPromotionLimit(promotion.getRelation().get(0).getFlashPromotionLimit());productInfo.setFlashPromotionPrice(promotion.getRelation().get(0).getFlashPromotionPrice());productInfo.setFlashPromotionRelationId(promotion.getRelation().get(0).getId());productInfo.setFlashPromotionEndDate(promotion.getEndDate());productInfo.setFlashPromotionStartDate(promotion.getStartDate());productInfo.setFlashPromotionStatus(promotion.getStatus());}// 3. 把查数据库的结果写入redisredisOpsUtil.set(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, productInfo, 360, TimeUnit.SECONDS);} else {Thread.sleep(50); //休眠getProductInfo(id); //没获取到锁的线程开始自旋}} catch (InterruptedException e) {e.printStackTrace();}} finally {// 最后 释放锁if (lock.isLocked()){if (lock.isHeldByCurrentThread()){lock.unlock();}}}return productInfo;}

方式二:使用lock.lock();双重检测redis中的数据

    /*** 获取商品详情信息* @param id 产品ID*/public PmsProductParam getProductInfoNew(Long id) {//从 redis 中获取商品信息PmsProductParam productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);if (null != productInfo) {//如果redis中有值直接返回return productInfo;}//如果redis中没值,就要查数据库,加分布式锁// 1. 获取分布式锁RLock lock = redission.getLock(lockPath + id);try {try {// 2.加锁lock.lock();// 双重检测,再次查询redis 如果有数据直接返回productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);if (null != productInfo) {return productInfo;}// 3.查数据库// 4.存redis//不需要自旋,没获取到就等待finally{// 5.释放锁lock.unlock();}

4. redisson的其他的分布式锁

4.1 读写锁
读写锁的作用是什么?
答:保证一定能读到最新的数据,写锁是一个排它锁(互斥锁),读锁是一个共享锁。当正常读的时候不用等待,如果写数据时,所有读锁必须等待写锁释放,才能读到数据!

读数据过程中,写数据用不用等待?
答:用,只要有写锁参与,就必须等待,高并发读时,无需等待,相当于没有加锁

  /**读写锁的作用是什么?答:保证一定能读到最新的数据,写锁是一个排它锁(互斥锁),读锁是一个共享锁。当正常读的时候不用等待,如果写数据时,所有读锁必须等待写锁释放,才能读到数据!*/@GetMapping("write")@ResponseBodypublic String writeLock(){//获取读写锁rw_lock  互斥锁RReadWriteLock aaa = redissonClient.getReadWriteLock("rw_lock");//获取写锁RLock lock = aaa.writeLock();//加写锁lock.lock();String s= UUID.randomUUID().toString();try {//业务代码redisTemplate.opsForValue().set("aaa",s);Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}finally {//业务代码执行完,释放写锁lock.unlock();}return s;}@GetMapping("read")@ResponseBodypublic String readLock(){//获取读写锁rw_lock 共享锁RReadWriteLock aaa = redissonClient.getReadWriteLock("rw_lock");//获取读锁RLock lock = aaa.readLock();//加读锁lock.lock();String s = "";try {//业务代码s = (String) redisTemplate.opsForValue().get("aaa");} catch (Exception e) {e.printStackTrace();}finally {//业务代码执行完,释放读锁lock.unlock();}return s;}

6.2 闭锁 // 场景:班级里5个人全部走完,才能锁门

    //分布式闭锁,//  场景:班级里5个人全部走完,才能锁门@GetMapping("/lockdoor")@ResponseBodypublic String lockDoor() throws Exception{//获取闭锁RCountDownLatch door = redissonClient.getCountDownLatch("door");//设置闭锁的数量door.trySetCount(5L);//在count数量执行完之前,等待!door.await();//count执行完,不再等待,执行业务代码return "人都走完了!放假!";}@GetMapping("/gogogo")@ResponseBodypublic String go() throws Exception{//获取闭锁RCountDownLatch door = redissonClient.getCountDownLatch("door");//每次请求,闭锁设置的数量 -1door.countDown();return "走了一个人";}

6.3 信号量
解释 :类似停车时等车位,只有当有车开走,空出车位时才能停
分布式场景应用:做限流,指定10000个总请求量,到达10000个请求时,阻塞(或者返回false),等其他请求释放再执行

问:acquire() 和 tryAcquire()的区别?
答:acquire()会一直等待车位!
tryAcquire()如果没有车位会返回false!

代码

   //信号量 占侧位@GetMapping("/park")@ResponseBodypublic String park() throws Exception {//获取信号量 比如redis中park原始有3个车位RSemaphore park = redissonClient.getSemaphore("park");//占一个车位,如果没有车位,就阻塞等待park.acquire();boolean b = park.tryAcquire();if (b){//执行业务}else {//给出提示return "error";}return "ok=》"+b;}//信号量 车开走了@GetMapping("/go1")@ResponseBodypublic String go1() throws Exception {//获取信号量 比如redis中park原始有3个车位RSemaphore park = redissonClient.getSemaphore("park");//释放一个车位park.release();return "ok=》";}

1、可重入锁,指一个类中,A方法和B方法同时上一把锁,当A获得锁后,B方法也可以执行。
2.公平锁:先申请锁的线程先得到锁,其余在队列中等待锁释放。

更多详情请关注 redis官网 redisson的其他分布式锁

redis专题:使用redis实现分布式锁相关推荐

  1. redis实现轮询算法_【07期】Redis中是如何实现分布式锁的?

    点击上方"Java面试题精选",关注公众号 面试刷图,查缺补漏 分布式锁常见的三种实现方式: 数据库乐观锁: 基于Redis的分布式锁: 基于ZooKeeper的分布式锁. 本地面 ...

  2. 【面试题】Redis中是如何实现分布式锁的

    分布式锁常见的三种实现方式: 数据库乐观锁: 基于Redis的分布式锁: 基于ZooKeeper的分布式锁. Redis的分布式锁 Redis要实现分布式锁,以下条件应该得到满足 互斥性:在任意时刻, ...

  3. Redis系列教程(八):分布式锁的由来、及Redis分布式锁的实现详解

    在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务.分布式锁等.那具体什么是分布式锁,分布式锁应用在哪些业务场景.如何来实现分布式锁呢?今天来探讨分布式锁这个话题. ...

  4. 使用 Redis的SETNX命令实现分布式锁

    SETNX命令简介 SETNX key value 将key的值设为value,并且仅当key不存在. 若给定的key已经存在,则SETNX不做任何操作. SETNX 是SET if Not eXis ...

  5. 基于redis集群实现的分布式锁,可用于秒杀,定时器。

    在分布式系统中,经常会出现需要竞争同一资源的情况,使用redis可以实现分布式锁. 前提:redis集群已经整合项目,并且可以直接注入JedisCluster使用: @Autowiredprivate ...

  6. Redis 基础 - 优惠券秒杀《分布式锁(初级)》

    参考 Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 Redis 基础 - 优惠券秒杀 ...

  7. 基于Redis(setnx)实现分布式锁

    什么是分布式锁? 分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰来保证一致性. 分布式锁需要具 ...

  8. 场景应用:Redis使用setnx命令实现分布式锁

    文章目录 何时需要分布式锁? 如何实现分布式锁? 探究单节点锁:setnx命令 1. 加锁: 2. 解锁: 带来的问题 探究分布式锁:RedLock算法 补充:setnx命令学习 何时需要分布式锁? ...

  9. java 通过redis实现倒计时_突破Java面试(42) - Redis amp; ZooKeeper两种分布式锁实现的优劣...

    0 Github 1 面试题 一般实现分布式锁都有哪些方式?使用redis如何设计分布式锁?使用zk来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 2 考点分析 一般先问问你zk,然后 ...

  10. java redis set 过期时间_redis分布式锁自动延长过期时间

    分布式系统概念与设计(原书第5版) 93.8元 包邮 (需用券) 去购买 > 背景项目组已经有个分布式锁注解(参考前文<记一次分布式锁注解化>),但是在设置锁过期时间时,需要去预估业 ...

最新文章

  1. 福利 | 从生物学到神经元:人工神经网络 ( ANN ) 简介
  2. 使用 Android Studio 跑新浪微博SDK Demo遇到的问题及解决
  3. python批量下载文件-Python实现批量下载文件
  4. 00069_DateFormate
  5. Python基础教程:一个单列split转换为多行的练习题
  6. gulp+browserSync自动刷新页面
  7. Python机器学习算法 — 逻辑回归(Logistic Regression)
  8. activiti 批量 mysql_Activiti6系列(3)- 快速体验
  9. 第67课 选择排序 改进例67.1 《小学生C++编程入门》
  10. 博图能打开s7200吗_域名掉备案了,还能打开吗?域名掉备案了怎么办?
  11. 9 QM配置-检验计划配置-维护检验类型
  12. 2016030207 - sql50题练习(脚本)
  13. 洪水填充算法_区域填充算法和多边形填充的扫描线算法
  14. git命令行操作指南(git指令及使用场景详解及git stash、git branch、git分支关联等)
  15. matlab中solver函数_matlab solve函数的用法
  16. win7计算机怎么初始化,win7怎么初始化电脑的方式
  17. 求素数(质数)算法的N种境界 - 试除法和初级筛法
  18. 结合mahout的数据挖掘算法介绍
  19. BatchUpdateException: Incorrect string value: '\xE9\x87\x8D\xE5\xBA\x86...'
  20. http协议和https协议对应的端口号

热门文章

  1. python映射的主要特点_Python入门 4——字典及其映射
  2. PyQt5笔记(07) -- 变换控件颜色
  3. 从AIDL一窥Android Binder机制
  4. Java 异步回调机制实例解析
  5. DotText源码学习——从配置文件Web.config入手(一)
  6. shell 相关知识(1)
  7. 从远程服务器获取数据
  8. 经典SQL回顾之晋级篇
  9. 如何在eclipse中使用XYLayout布局?在此介绍如何把XYLayout导入到eclipse .
  10. jsp session