原理

一般写法如下:

        RLock lock = redissonClient.getLock("myLock");lock.lock();try{//业务逻辑}finally {lock.unlock();}

其实redis分布式锁就是基于redis的hash数据类型实现的,key为:锁名称,即myLock,field为:uuid+threadId,value为:上锁次数,从此可以看出redis锁是可重入的

一. 初始化锁RedissonLock

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();//若锁被其它线程占用,使用redis的发布订阅pub/sub功能来订阅释放锁消息this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {//1.RedissonObject.name赋值super(commandExecutor, name);//2.执行lua脚本的执行器this.commandExecutor = commandExecutor;//3.创建ConnectionManager时产生id:UUID id = UUID.randomUUID();e12badb5-7396-4f05-8290-aaa352e04bc4//被用来当做 和threadId组成 value值,用作判断加锁和释放锁是否是同一个线程的校验this.id = commandExecutor.getConnectionManager().getId();//4.Config的lockWatchdogTimeout 默认lockWatchdogTimeout = 30 * 1000this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;
}

二:上锁lock

//RedissonLock.lock()@Override
public void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();}
}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//1.获取当前线程idlong threadId = Thread.currentThread().getId();//2.尝试上锁Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//3.锁获取成功,直接返回 lock acquiredif (ttl == null) {return;}//4.锁被其它线程占用,订阅释放锁通知,并同步阻塞future完成,即获取释放信号量通知RFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {//同步阻塞future完成,即创建监听runnable,如果是第一个获取锁失败的线程则真正创建释放锁监听//int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();//最大阻塞时间为:3s+1.5s*3commandExecutor.syncSubscription(future);}try {//5.收到释放信号量的通知后,进入死循环尝试获取锁while (true) {//5.1再次尝试加锁,如果获取到锁,直接返回,停止死循环ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for message//5.2锁未获取成功,说明有其它线程持有锁,阻塞获取锁if (ttl >= 0) {try {// 在锁剩余时间内,阻塞等待获取信号量Semaphore, Semaphore.release()会在订阅释放锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁时,会广播释放锁消息,监听器接收释放锁消息后,释放信号量,最终会唤醒阻塞在这里的线程//tryAcquire()让当前线程阻塞获取信号量,,避免了在while无限循环中频繁请求获取锁// 若在最大等待时间内仍未获取到信号量,进入下一个循环尝试获取锁future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 锁过期时间小于零, 则一直等if (interruptibly) {future.getNow().getLatch().acquire();} else {future.getNow().getLatch().acquireUninterruptibly();}}}} finally {//6.取消订阅unsubscribe(future, threadId);}
//        get(lockAsync(leaseTime, unit));}//尝试获取锁并等待上锁结果
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}//执行lua脚本尝试获取锁
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;//1.异步执行lua脚本,尝试上锁if (leaseTime != -1) {//使用自定义锁时间ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//使用默认的30sttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//2.等待上锁结果ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {//上锁不成功return;}// lock acquired上锁成功if (ttlRemaining == null) {if (leaseTime != -1) {//重置锁时间为自定义时间internalLockLeaseTime = unit.toMillis(leaseTime);} else {//2.1使用默认时间上锁即30s,定时任务每隔10s给锁重置一下过期时间直到锁被释放//使用自定义时间就不会不断给锁续延过期时间scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
}    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//keys[1]为锁名,即redissonClient.getLock("myLock")中的myLock//ARGV[1]:锁时间,即lock.lock(3, TimeUnit.MINUTES),3*60s,默认internalLockLeaseTime=30s//ARGV[2]:锁的唯一标识,uuid + ":" + threadId加索的整体逻辑使用hset报错上锁的线程,key为索命myLock,field为uuid+threadId,value为field上锁的次数//1.若key不存在,则进行加锁,返回nil,即null"if (redis.call('exists', KEYS[1]) == 0) then " +//1.1对key加锁,并设置加索次数为1"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//1.2设置key的过期时间"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//2.若key存在,判断上锁的是否为此线程,若是次数+1,返回nil,即null"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//2.1 上锁线程数+1"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//2.2 重置过期时间"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//3.走到这,说明锁获取不成功,即被其它线程已占用,则返回锁的剩余时间"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

上面值得注意的是:

  1. 只有lock.lcok()没有指定leaseTime时,即使用默认过期时间internalLockLeaseTime:30s时,才会每隔internalLockLeaseTime/3即10s重置一下锁的过期时间,即延续锁的过期时间,后面会讲到scheduleExpirationRenewal();如果lock.lock()上锁时指定leaseTime,不会续锁,那么到底使用那种方式合适那,每种方式都有各自优点和弊端:
        a. 指定leaseTime:
            优点:此种方式就算因为异常没有执行unlock(),也有自身的过期时间,等过期后,别的线程就能得到此锁了
            弊端:如果代码在超时释放锁时还未完成业务,就可能出现并发带来的数据问题,若怕出现此问题,可以将leaseTime设大一点;
        b. 不指定leaseTime:
            优点:有效解决了上面业务未执行完锁就释放了的问题
            弊端:如果因为代码异常导致取消续锁任务未执行到,该锁就永远不能自动释放,造成死锁,除非服务挂掉(续锁任务停止)或手动将key删除后,别的线程才能获取到该锁,所以,unLock()一般写在finally里,尽可能避免死锁

综上: 个人觉得还是指定合适的leaseTime好,能确保系统出现故障未释放锁后,在一定时间内能够主动去释放锁,避免造成死锁

  1. 在获取锁失败后,利用通过redis的channel 发布/订阅功能来实现订阅释放锁事件尝试再次获取锁,再次获取锁失败,利用Semaphore.tryAcquire()阻塞等待信号量,即等待锁释放,如果在等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false;如果等到了锁的释放事件的通知,则开始进入下一个while循环重试获取锁(由此处可看出,我们这讲的是非公平锁,redisson实现了公平锁,读写锁等等),避免了当获取不到锁时一直while死循环无效尝试获取锁造成资源浪费

获取锁的整体流程:

模拟多线程同时请求锁的情况:
(注意redis中锁值得变化)

三:释放锁unLock

//RedissonLock.unlock()@Override
public void unlock() {try {//释放当前线程持有锁get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}    @Override
public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<>();//1.利用lua脚本释放锁RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {//2.取消续锁的定时任务cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}//2.1 返回==null,说明此线程没有持有锁,抛异常if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;
}//使用lua脚本释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//keys[1]为锁名,即redissonClient.getLock("myLock")中的myLock// //keys[2]为订阅通道:redisson_lock__channel:{myLock}//ARGV[1]:发布锁释放事件类型:LockPubSub.UNLOCK_MESSAGE//ARGV[2]:锁过期时间//ARGV[3]:锁的唯一标识,uuid + ":" + threadId//1.若field不存在,即当前线程不持有锁,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//2.若field存在,value-1,即当前线程持有锁。再判断锁的重入次数,即value值"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//2.1若value-1后仍 >0,说明此线程lock了多次,重置锁剩余时间,返回0"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +//2.2若value-1后 不>0,说明此线程不再持有锁,删除key并发布LockPubSub.UNLOCK_MESSAGE事件,返回1"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +//3.返回null"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

释放锁的整理流程:

四:续锁 /取消续锁

  1. 锁续时:使用默认过期时间上锁成功后调用
// RedissonBaseLock.scheduleExpirationRenewal()//过期时间更新,当有新线程占用锁时就开始定时任务,定时更新key的过期时间
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {//1.说明锁已经有定时任务在续时了(因为锁可重入,重入时走此分支),增加锁重入次数,在是否要终止续锁时有用oldEntry.addThreadId(threadId);} else {//需要续锁次数+1entry.addThreadId(threadId);//2.要定时更新锁过期时间renewExpiration();}
}private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//使用默认过期时间上锁成功后,10后执行一次续锁任务,如果线程仍然占用着锁,则10s继续重置过期时间为internalLockLeaseTime//直到锁(即uuid +  name)没有对应的ExpirationEntry ,即没有线程占用锁定时任务停止//在取消续锁RedissonBaseLock.cancelExpirationRenewal()时,如果ExpirationEntry 没有对应的线程后,就删除对应的ExpirationEntry Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {//1.判断线程是否还持有锁ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//2.还有线程持有锁,利用lua脚本判断线程是否还持有锁,若是,重置过期时间RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// 3.说明线程还在持有锁,10s后继续重置过期时间,调用自己renewExpiration();}});}// 10s后执行}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}//判断threadId是否存在,存在重置key的过期时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//KEYS[1]:锁名,myLock//ARGV[1]:过期时间//ARGV[2]:uuid+threadId"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//1.若线程还持有锁,重置过期时间,返回1"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +//2.否则返回0"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}

值得注意的: 只有使用默认过期时间,即lock()时没有指定leaseTime时,才会续锁

  1. 取消续锁:在释放锁时调用
// RedissonBaseLock.cancelExpirationRenewal()//取消续锁的定时任务
protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {//1.线程上锁次数-1,如果-1后为0,则删除锁持有的此线程//次数用来适应重入锁的,如第三次重入,第一次unlock时,还有其它两次上着锁那,所以仍需续锁task.removeThreadId(threadId);}//2.若锁已没有上锁的线程了则取消在lock时设置的续锁的定时任务,且将锁对应的ExpirationEntry删除//删除后在lock成功后的那个续锁的定时任务就不会执行了,因为取不到锁对应的ExpirationEntry了if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}

存在的问题: 若lock()后时,因为异常续锁没有被取消,那么问题就来了,锁就会一直被持有,其它线程将永远获取不到锁,所以unlock()一定要放到finally{}中

五:未获取锁时订阅获取锁逻辑

这简单说一下如何利用发布订阅获取锁的流程,有关redis发布订阅详细内容会在专门的文章中讲解

  1. 订阅释放锁消息
PublishSubscribe//entryName:uuid:name//channelName:redisson_lock__channel:{name}public RFuture<E> subscribe(String entryName, String channelName) {//1.获取semaphore,在初始化时允许请求数为1,同一个锁获取的semaphore一样AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));RPromise<E> newPromise = new RedissonPromise<>();//2.添加到监听队列,并tryRun()semaphore.acquire(() -> {if (!newPromise.setUncancellable()) {semaphore.release();return;}E entry = entries.get(entryName);if (entry != null) {//2.1走到这,说明是第2,3...个线程获取锁失败,将监听器保存到。。。。entry.acquire();//执行监听任务semaphore.release();entry.getPromise().onComplete(new TransferListener<E>(newPromise));return;}//2.2走到这,说明是第一个获取锁失败的线程,//2.2.1创建RedissonLockEntry,默认允许信号量请求数为0,然后 +1E value = createEntry(newPromise);value.acquire();//2.2.2将创建的lockEntry添加到entries中,在续锁和取消续锁时用E oldValue = entries.putIfAbsent(entryName, value);if (oldValue != null) {oldValue.acquire();semaphore.release();oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));return;}//***2.2.3真正的创建并注册监听,监听redisson_lock__channel:{name}RedisPubSubListener<Object> listener = createListener(channelName, value);service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);});return newPromise;}//redisson_lock__channel:{name}private RedisPubSubListener<Object> createListener(String channelName, E value) {RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {//1.收到消息后,拉取第一个监听者任务,开始监听下一个释放锁消息,并释放一个请求@Overridepublic void onMessage(CharSequence channel, Object message) {if (!channelName.equals(channel.toString())) {return;}PublishSubscribe.this.onMessage(value, (Long) message);}@Overridepublic boolean onStatus(PubSubType type, CharSequence channel) {if (!channelName.equals(channel.toString())) {return false;}if (type == PubSubType.SUBSCRIBE) {value.getPromise().trySuccess(value);return true;}return false;}};return listener;}
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {public static final Long UNLOCK_MESSAGE = 0L;public static final Long READ_UNLOCK_MESSAGE = 1L;public LockPubSub(PublishSubscribeService service) {super(service);}@Overrideprotected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(UNLOCK_MESSAGE)) {//1.拉取监听队列中第一个任务并执行run()(又创建监听释放锁监听),//2.释放1个信号量唤醒等待的entry.getLatch().tryAcquire去尝试申请锁Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}value.getLatch().release();} else if (message.equals(READ_UNLOCK_MESSAGE)) {//2.使用RedissonWriteLock时,释放锁时为事件while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}}}
//AsyncSemaphore //默认为1,同时只能有1个监听,因为业务只能允许同一时间只能有一个线程获取锁,//如果counter=2,说明同时能有两个线程获取锁,未被了锁的设计private final AtomicInteger counter;//决定了要监听几次锁释放,即release几次信号量,也即几个线程正在等待获取锁private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>();//在初始化PublishSubscribeService创建,permits=1public AsyncSemaphore(int permits) {counter = new AtomicInteger(permits);}public void acquire(Runnable listener) {//1.将listener添加到监听队列,在收到释放锁消息时,就会从listeners中拉取首部runnable并尝试执行listeners.add(listener);//2.尝试创建释放锁监听tryRun();}//根据信号量counter决定是否创建释放锁监听private void tryRun() {//默认为1,同时只能有1个监听//1.说明是第2,3。。。个线程获取锁失败或在收到释放锁时想要拉取下一个监听任务,为null,即不再执行run()创建监听if (counter.get() == 0|| listeners.peek() == null) {return;}if (counter.decrementAndGet() >= 0) {//2.说明是第一个线程获取锁失败,可以获取信号量,直接执行监听者任务,不会阻塞Runnable listener = listeners.poll();if (listener == null) {counter.incrementAndGet();return;}listener.run();} else {//3.按理来说应该走不到此处if (counter.incrementAndGet() > 0) {//+1,调用自己执行监听tryRun();}}}

订阅释放锁整体流程:

  1. 取消订阅释放锁
//PublishSubscribe//entryName:uuid:name//channelName:redisson_lock__channel:{name}
public void unsubscribe(E entry, String entryName, String channelName) {AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));semaphore.acquire(() -> {if (entry.release() == 0) {//1.counter-1==0说明不再需要监听说明是在前面subscription()创建监听时创建的Entry//发布PubSubType.UNSUBSCRIBE事件,完成后并释放信号量// just an assertionboolean removed = entries.remove(entryName) == entry;if (!removed) {throw new IllegalStateException();}service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName)).onComplete((r, e) -> {semaphore.release();});} else {semaphore.release();}});}

六:redisson分布式锁问题

如果是redis-cluster模式,在高并发时,线程1对master写入了myLock锁,在异步赋值给master对应的slave节点时发生当即(此时还未写入到slave成功),主备切换,slave变为了master,此时线程2请求锁,发现没有线程锁定也获取到了锁,此时就有两个线程1,线程2同时获取到了锁,会造成脏数据

redisson分布式锁实现原理相关推荐

  1. Redis进阶- Redisson分布式锁实现原理及源码解析

    文章目录 Pre 用法 Redisson分布式锁实现原理 Redisson分布式锁源码分析 redisson.getLock(lockKey) 的逻辑 redissonLock.lock()的逻辑 r ...

  2. java redisson_Java使用Redisson分布式锁实现原理

    本篇文章摘自:https://www.jb51.net/article/149353.htm 由于时间有限,暂未验证 仅先做记录.有大家注意下哈(会尽快抽时间进行验证) 1. 基本用法 添加依赖 or ...

  3. 17、Redis、Zk分布式锁实现原理

    我们在编程有很多场景使用本地锁和分布式锁,但是是否考虑这些锁的原理是什么?本篇讨论下实现分布式锁的常见办法及他们实现原理. 一.使用锁的原则 使用本地锁和分布式锁是为了解决并发导致脏数据的场景,使用锁 ...

  4. 老夫带你深度剖析Redisson实现分布式锁的原理

    Redis实现分布式锁的原理 前面讲了Redis在实际业务场景中的应用,那么下面再来了解一下Redisson功能性场景的应用,也就是大家经常使用的分布式锁的实现场景. 引入redisson依赖 < ...

  5. Redisson分布式锁实战-2:解决wait_time之坑

    我们一起来分析一下原因,我们获取锁之后,我们只打印了一个日志,然后从配置文件里面拿到一个hour,然后就结束了,结束之后就来到finally里边,而这个时间并没有执行SQL语句,所以他的时间会非常非常 ...

  6. 年轻人,看看 Redisson 分布式锁—可重入锁吧!太重要了

    作者 | 李祥    责编 | 张文 来源 | 企鹅杏仁技术站(ID:xingren-tech) 引言 作为后端开发,对于所谓的线程安全.高并发等一系列名词肯定都不会陌生,相关的一些概念及技术框架是面 ...

  7. Redis实战——Redisson分布式锁

    目录 1 基于Redis中setnx方法的分布式锁的问题 2 Redisson 2.1 什么是Redisson 2.2 Redisson实现分布式锁快速入门 2.3 Redisson 可重入锁原理 什 ...

  8. redisson分布式锁,实战

    目录 什么时候用分布式锁? 分布式锁入门 超时设置 释放了不是自己加的锁 正确设置锁超时 加解锁代码位置有讲究 实现可重入锁 Redis Hash 可重入锁 主从架构带来的问题 什么是 Redlock ...

  9. Redis:Redisson分布式锁的使用(推荐使用)

    Redis:Redisson分布式锁的使用(生产环境下)(推荐使用) 关键词 基于NIO的Netty框架,生产环境使用分布式锁 redisson加锁:lua脚本加锁(其他客户端自旋) 自动延时机制:启 ...

  10. Redisson分布式锁详解

    概述 setnx分布式锁的问题 重入问题 重入问题是指获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,它的方法都是使用synchroniz ...

最新文章

  1. ExtJs之gridPanel的属性表格,编辑表格,表格分页,分组等技巧
  2. 10个python使用技巧
  3. 44个Java代码性能优化总结
  4. CentOS离线安装httpd服务
  5. 2022虎年背景全新UI头像框制作微信小程序源码
  6. 从单体架构迁移到微服务,8个关键的思考、实践和经验
  7. Gitlab的使用总结
  8. 20155320 2016-2017-2 《Java程序设计》第五周学习总结
  9. @程序员,如何轻松实现数据可视化?
  10. 网络安全——流量分析
  11. 浅谈JavaScript面向对象编程(转自酷勤网)
  12. 百度谷歌淘宝自定义搜索乱码问题的解决
  13. prosper loan data EDA分析(特征字典)
  14. ARCore:从哪里冒出来的ARCore
  15. Python·os.path.abspath和os.path.realpath区别
  16. python基础 判断题
  17. 常识1:机器语言与高级语言
  18. java 删除桌面快捷方式_万能方法用指定浏览器打开桌面上的网页快捷方式
  19. Unity 游戏设计模式 — 策略模式(Strategy)
  20. HTML简单练习案例

热门文章

  1. Kafka集群搭建(四节点)
  2. PAT日志 1031
  3. 央行二代征信系统即将上线 有哪些变化?
  4. vm虚拟机 加密密码 破解术(亲测可用!)
  5. nginx配置-优化静态资源
  6. 电子取证技术--概述
  7. 2018.11.07 NOIP训练 lzy的游戏(01背包)
  8. 基于深度学习的单视图三维重建算法学习路线
  9. HTML和CSS小知识点笔记
  10. 域名WHOIS信息隐私保护