介绍

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。

Redisson在基于NIONetty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

特色

  1. 支持云托管服务模式(同时支持亚马逊云的ElastiCache Redis和微软云的Azure Redis Cache):

    自动发现主节点变化

  2. 支持Redis集群模式(同时支持亚马逊云的ElastiCache Redis Cluster和微软云的Azure Redis Cache):

    自动发现主从节点

    自动更新状态和组态拓扑

    自动发现槽的变化

  3. 支持Redis哨兵模式:

    自动发现主、从和哨兵节点

    自动更新状态和组态拓扑

  4. 支持Redis主从模式

  5. 支持Redis单节模式

  6. 多节点模式均支持读写分离:从读主写,主读主写,主从混读主写

  7. 所有对象和接口均支持异步操作

  8. 自行管理的弹性异步连接池

  9. 所有操作线程安全

  10. 支持LUA脚本

  11. 提供分布式对象
    通用对象桶(Object Bucket)、二进制流(Binary Stream)、地理空间对象桶(Geospatial Bucket)、BitSet、原子整长形(AtomicLong)、原子双精度浮点数(AtomicDouble)、话题(订阅分发)、 布隆过滤器(Bloom Filter)和基数估计算法(HyperLogLog)

  12. 提供分布式集合
    映射(Map)、多值映射(Multimap)、集(Set)、列表(List)、有序集(SortedSet)、计分排序集(ScoredSortedSet)、字典排序集(LexSortedSet)、列队(Queue)、双端队列(Deque)、阻塞队列(Blocking Queue)、有界阻塞列队(Bounded Blocking Queue)、 阻塞双端列队(Blocking Deque)、阻塞公平列队(Blocking Fair Queue)、延迟列队(Delayed Queue)、优先队列(Priority Queue)和优先双端队列(Priority Deque)

  13. 提供分布式锁和同步器
    可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、 红锁(RedLock)、读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)

  14. 提供分布式服务
    分布式远程服务(Remote Service, RPC)、分布式实时对象(Live Object)服务、分布式执行服务(Executor Service)、分布式调度任务服务(Scheduler Service)和分布式映射归纳服务(MapReduce)

  15. 支持Spring框架

  16. 提供Spring Cache集成

  17. 提供Hibernate Cache集成

  18. 提供JCache实现

  19. 提供Tomcat Session Manager

  20. 提供Spring Session集成

  21. 支持异步流方式执行操作

  22. 支持Redis管道操作(批量执行)

  23. 支持安卓(Andriod)系统

  24. 支持断线自动重连

  25. 支持命令发送失败自动重试

  26. 支持OSGi

  27. 支持采用多种方式自动序列化和反序列化(Jackson JSON, Avro, Smile, CBOR, MsgPack, Kryo, FST, LZ4, Snappy和JDK序列化)

引入包

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.1</version></dependency>

https://mvnrepository.com/artifact/org.redisson/redisson

源码解析

基于3.12.1 版本。

RedissonClient

使用redis之前必须要创建连接,也即RedissonClient

RedissonClient是个接口,提供操作Redis的一系列命令。

Redisson类是RedissonClient接口的唯一实现。Redisson类提供了几个静态方法创建Client。

public static RedissonClient create();//使用默认配置
public static RedissonClient create(Config config)  //使用指定配置
public static RedissonRxClient createRx();          //RxJava2
public static RedissonRxClient createRx(Config config);
public static RedissonReactiveClient createReactive();//Reactive interface,Project Reactor
public static RedissonReactiveClient createReactive(Config config);

redisson支持3种方式访问Redis,大多数情况使用RedissonClient,即Redisson,同步操作。

在创建client时需要指定配置,通过Config类完成。

public class Config {//几种模式private SentinelServersConfig sentinelServersConfig;private MasterSlaveServersConfig masterSlaveServersConfig;private SingleServerConfig singleServerConfig;private ClusterServersConfig clusterServersConfig;private ReplicatedServersConfig replicatedServersConfig;//连接管理private ConnectionManager connectionManager;/*** Threads amount shared between all redis node clients*/private int threads = 16;private int nettyThreads = 32;/*** Redis key/value codec. FST codec is used by default*/private Codec codec;private ExecutorService executor;/*** Config option for enabling Redisson Reference feature.* Default value is TRUE*/private boolean referenceEnabled = true;private TransportMode transportMode = TransportMode.NIO;private EventLoopGroup eventLoopGroup;//锁默认释放时间,30秒private long lockWatchdogTimeout = 30 * 1000;private boolean keepPubSubOrder = true;private boolean decodeInExecutor = false;private boolean useScriptCache = false;private int minCleanUpDelay = 5;private int maxCleanUpDelay = 30*60;private int cleanUpKeysAmount = 100;/*** AddressResolverGroupFactory switch between default and round robin*/private AddressResolverGroupFactory addressResolverGroupFactory = new DnsAddressResolverGroupFactory();

使用方式:

// 1. 构造Config对象
Config = ...// 2. 构造Redisson实例
RedissonClient redisson = Redisson.create(config);// 3. 获取需要的对象
RMap map = redisson.getMap("myMap");RLock lock = redisson.getLock("myLock");RExecutorService executor = redisson.getExecutorService("myExecutorService");

锁 Lock

了解Redisson的锁,最好先了解Java JUC包中的各种锁。

Java Lock接口

//package java.util.concurrent.locks;
public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();
}

主要实现类:

RLock(重入锁)

RLock实现Java的重入锁。扩展了 Lock

public interface RLock extends Lock, RLockAsync {//获取锁对象名称String getName();void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;void lock(long leaseTime, TimeUnit unit);/**强制释放锁。如果锁成功释放则返回true。*/boolean forceUnlock();/**是否被任何线程锁定*/boolean isLocked();/*** 是否被线程锁定*/boolean isHeldByThread(long threadId);/*** 是否被当前线程锁定*/boolean isHeldByCurrentThread();/**被当前线程锁定次数(可冲入)*/int getHoldCount();long remainTimeToLive();}

锁的异步操作,都返回一个RFuture

public interface RLockAsync {RFuture<Boolean> forceUnlockAsync();RFuture<Void> unlockAsync();RFuture<Void> unlockAsync(long threadId);RFuture<Boolean> tryLockAsync();RFuture<Void> lockAsync();RFuture<Void> lockAsync(long threadId);RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId);RFuture<Boolean> tryLockAsync(long threadId);RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId);RFuture<Integer> getHoldCountAsync();RFuture<Boolean> isLockedAsync();RFuture<Long> remainTimeToLiveAsync();
}

RLock源码

获取锁getLock
    @Overridepublic RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);}

返回一个RedissonLock对象。

RedissonLock派生于RedissonExpirable,实现了RLock

public class RedissonLock extends RedissonExpirable implements RLock {//存储entryName和其过期时间private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();//锁默认释放时间,30秒protected long internalLockLeaseTime;final String id;final String entryName;protected final LockPubSub pubSub;//命令执行器,异步执行器final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}
}
lock()
    @Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}@Overridepublic void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();}}/*** leaseTime:锁过期时间,* interruptibly:是否允许中断。*/private void lock(long leaseTime, TimeUnit unit, boolean   interruptibly ) throws InterruptedException {long threadId = Thread.currentThread().getId();//加锁。 先尝试一次。Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired,null表示加锁成功。if (ttl == null) {return;}//加锁失败,则:订阅锁,等待锁释放。RFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {/**死循环,直到满足:*/while (true) {//再次尝试。ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired。获取到锁。if (ttl == null) {break;}// waiting for message。//通过 Semaphore  来阻塞当前线程。if (ttl >= 0) {try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {//ttl < 0 表示可以获取了。if (interruptibly) {future.getNow().getLatch().acquire();} else {future.getNow().getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(future, threadId);}
//        get(lockAsync(leaseTime, unit));}private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {//设置了过期时间if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//没有设置过期,则定时续期。RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {//设置 watch dog ,每1/3 * expire 设置一次超时scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}/*加锁,返回nil表示加锁成功,其他表示其他锁的剩余时间。*/<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);
//redis执行lua脚本,KEYS下标从1开始。
//KYES[1]:是key名称,ARGV[1]是超时时间,ARGV[2]是KEY的 value。
//getLockName返回格式:{collection id} + ':' +  {threadId}return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//不存在hset 则创建。设置第一个entry是key,value。"if (redis.call('exists', KEYS[1]) == 0) then " +//为key增加field,并设置field的值为1。"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//存在key,则判断当前线程是否已加锁(field存在),存在则把引用计数加1。"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//返回剩余过期时间:毫秒                            "return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
    protected RFuture<RedissonLockEntry> subscribe(long threadId) {return pubSub.subscribe(getEntryName(), getChannelName());}

watch dog

用于KEY 续期,以免锁过期导致问题。使用的是netty的 时间轮。

private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}
}public static class ExpirationEntry {private final Map<Long, Integer> threadIds = new LinkedHashMap<>();private volatile Timeout timeout;}     private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}//RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}protected RFuture<Boolean> renewExpirationAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
RedissonExpirable

RedissonExpirable是个抽象类,实现了过期操作。

RedissonObject:Redis对象。

public abstract class RedissonObject implements RObject {protected final CommandAsyncExecutor commandExecutor;//对象名称protected String name;//编码protected final Codec codec;
}
unlock()
@Override
public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}        @Overridepublic RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();RFuture<Boolean> future = unlockInnerAsync(threadId);//future.onComplete((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}//设置处理成功result.trySuccess(null);});return result;}protected RFuture<Boolean> unlockInnerAsync(long threadId) {/*KEY[1]:key nameKEY[2]:ARGV[1]:ARGV[2]:ARGV[3]: field name*/return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//计数减1."local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//计数仍然大于0,则设置过期时间。"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +//计数等于0,则删除key,                              "else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}@Overridepublic RFuture<Boolean> deleteAsync() {return forceUnlockAsync();}@Overridepublic RFuture<Boolean> forceUnlockAsync() {cancelExpirationRenewal(null);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('del', KEYS[1]) == 1) then "+ "redis.call('publish', KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);}protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());}
状态控制
    @Overridepublic boolean isHeldByThread(long threadId) {//判断hset的field是否存在,2个参数:getName(),getLockName(threadId)RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));return get(future);}@Overridepublic int getHoldCount() {return get(getHoldCountAsync());}public RFuture<Integer> getHoldCountAsync() {return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));}

公平锁(RedissonFairLock)

RedissonFairLock是RedissonLock的子类。实现公平锁。

公平锁的设计思路是通过List实现等待队列。

public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;private final CommandAsyncExecutor commandExecutor;//等待队列名称private final String threadsQueueName;private final String timeoutSetName;
}

tryLockInnerAsync()

@Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);long currentTime = System.currentTimeMillis();if (command == RedisCommands.EVAL_NULL_BOOLEAN) {//KEY[1]:name。//KEY[2]:threadsQueue ,线程队列(list),元素为threadid。//KEY[3]:timeoutSetreturn commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// remove stale threads //把所有过期时间小于当前时间的都抛弃掉。"while true do " +//获取线程队列(list)第一个元素。firstThreadId2:第1个元素。"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +//没有等待对象,直接跳出。"if firstThreadId2 == false then " +"break;" +"end;" +//第一个元素的的score:超时时间"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//超时了,则从timeoutSet移除元素,threadsQueue 移除第一个元素"if timeout <= tonumber(ARGV[3]) then " +// remove the item from the queue and timeout set// NOTE we do not alter any other timeout"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +//不存在KEY,并且 threadsQueue也无元素 或者 是第一个元素当前线程。则移除当前线程的元素。"if (redis.call('exists', KEYS[1]) == 0) " +          //锁存在"and ((redis.call('exists', KEYS[2]) == 0) " +    // 锁没有被任何线程持有。"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +  //当前线程持有锁。"redis.call('lpop', KEYS[2]);" +                     //移除自等待队列"redis.call('zrem', KEYS[3], ARGV[2]);" + // decrease timeouts for all waiting in the queue//所有等待线程的超时减少。"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +"redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +"end;" +"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//是重入(再次加锁)                              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +"return 1;",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime);}if (command == RedisCommands.EVAL_LONG) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// remove stale threads //把所有过期时间小于当前时间的都抛弃掉。"while true do " +"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +"if timeout <= tonumber(ARGV[4]) then " +// remove the item from the queue and timeout set// NOTE we do not alter any other timeout"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +// check if the lock can be acquired now//可以获取到锁的情况。                              "if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +// remove this thread from the queue and timeout set// 当前线程获取到锁, 从queue、timeout中移除"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +// decrease timeouts for all waiting in the queue//把等待队列中的都减少超时时间。"local keys = redis.call('zrange', KEYS[3], 0, -1);" +  "for i = 1, #keys, 1 do " +"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +// acquire the lock and set the TTL for the lease"redis.call('hset', KEYS[1], ARGV[2], 1);" +   // 标记当前锁已被某个线程获取"redis.call('pexpire', KEYS[1], ARGV[1]);" +   // 设置标记的失效时常, 默认是 30 * 1000 ms"return nil;" +"end;" +// check if the lock is already held, and this is a re-entry//当前线程是再次重入。                               "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +// the lock cannot be acquired// check if the thread is already in the queue"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +// the real timeout is the timeout of the prior thread// in the queue, but this is approximately correct, and// avoids having to traverse the queue"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the// threadWaitTime//获取最后一个等待线程的过期时间,然后处理,就是当前线程的等待时间。后加入后执行。                              "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +"ttl = redis.call('pttl', KEYS[1]);" +"end;" +"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +//获取不到锁,当前线程,加入等待队列中。"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime);}throw new IllegalArgumentException();}

加锁成功条件:

1、KEY:xxx 不存在

2、KEY:xxx 存在,并且 ( KEY:redisson_lock_timeout:{xxxx} 不存在或者 第一个等待 线程是当前线程 )。

unlock()

    @Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// remove stale threads。 //把所有过期时间小于当前时间的都抛弃掉。"while true do "//等待队列中取第一个线程。                              + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"+ "if firstThreadId2 == false then "+ "break;"+ "end; "//把所有过期时间小于当前时间的都抛弃掉。+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"+ "if timeout <= tonumber(ARGV[4]) then "+ "redis.call('zrem', KEYS[3], firstThreadId2); "+ "redis.call('lpop', KEYS[2]); "+ "else "+ "break;"+ "end; "+ "end;"+ "if (redis.call('exists', KEYS[1]) == 0) then " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; " +"end;" +//当前线程没有引用计数,(没在等待队列)。                              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//当前线程 释放一次锁(引用计数减1)。如果计数大于0,则设置过期时间。                              "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"end; " +//删除锁。"redis.call('del', KEYS[1]); " +"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//发布释放锁消息。"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; ",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());}

RedissonMultiLock

MultiLock可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁,一次性锁定多个资源,再去处理一些事情,然后一次性释放所有的资源对应的锁。

public class RedissonMultiLock implements RLock {//存储多个RLockfinal List<RLock> locks = new ArrayList<>();
//传入多个锁。public RedissonMultiLock(RLock... locks) {if (locks.length == 0) {throw new IllegalArgumentException("Lock objects are not defined");}this.locks.addAll(Arrays.asList(locks));}
}//使用
RedissonClient redisson = Redisson.create(config);RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");RedissonMultiLock lock = new RedissonMultiLock(lock1,lock2,lock3);lock.lock();lock.unlock();

lock()

    public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId) {long baseWaitTime = locks.size() * 1500;long waitTime = -1;if (leaseTime == -1) {waitTime = baseWaitTime;} else {leaseTime = unit.toMillis(leaseTime);waitTime = leaseTime;if (waitTime <= 2000) {waitTime = 2000;} else if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}RPromise<Void> result = new RedissonPromise<Void>();tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result);return result;}@Overridepublic RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RPromise<Boolean> result = new RedissonPromise<Boolean>();LockState state = new LockState(waitTime, leaseTime, unit, threadId);state.tryAcquireLockAsync(locks.listIterator(), result);return result;}

RReadWriteLock

示例:

 RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");// 代码片段rwLock.readLock().lock();rwLock.readLock().unlock();rwLock.writeLock().lock();rwLock.writeLock().unlock();
public interface RReadWriteLock extends ReadWriteLock {@OverrideRLock readLock();@OverrideRLock writeLock();}public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {@Overridepublic RLock readLock() {return new RedissonReadLock(commandExecutor, getName());}@Overridepublic RLock writeLock() {return new RedissonWriteLock(commandExecutor, getName());}}

RedissonReadLock

    @Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//获取当前是锁模式。read/write。             "local mode = redis.call('hget', KEYS[1], 'mode'); " +//未加锁。             "if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +  //设置read 模式。"redis.call('hset', KEYS[1], ARGV[2], 1); " +      //设置锁计数。"redis.call('set', KEYS[2] .. ':1', 1); " +        //超时信息"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果是读模式,或者 写模式下,当前线程持有锁。             "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //当前线程锁计数加1,"local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));}@Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +           //无锁,pub 消息"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +"if (lockExists == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +   //锁计数减1 。"if (counter == 0) then " +"redis.call('hdel', KEYS[1], ARGV[2]); " +       //删除写锁。"end;" +"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +"if (redis.call('hlen', KEYS[1]) > 1) then " +"local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " +"if maxRemainTime > 0 then " +"redis.call('pexpire', KEYS[1], maxRemainTime); " +"return 0; " +"end;" + "if mode == 'write' then " + "return 0;" + "end; " +"end; " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; ",Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));}

RedissonWriteLock

   @Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +         //未加锁"redis.call('hset', KEYS[1], 'mode', 'write'); " +       "redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +        //已存在写锁"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //是当前线程持有锁。"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",     //不是获取锁,返回过期剩余时间。Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId));}@Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (mode == 'write') then " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +"if (lockExists == 0) then " +"return nil;" +"else " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('hdel', KEYS[1], ARGV[3]); " +"if (redis.call('hlen', KEYS[1]) == 1) then " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " + "else " +// has unlocked read-locks"redis.call('hset', KEYS[1], 'mode', 'read'); " +"end; " +"return 1; "+"end; " +"end; " +"end; "+ "return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}

CountDownLatch

public interface RCountDownLatch extends RObject, RCountDownLatchAsync {void await() throws InterruptedException;boolean await(long timeout, TimeUnit unit) throws InterruptedException;void countDown();long getCount();boolean trySetCount(long count);
}
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {private final CountDownLatchPubSub pubSub;private final String id;
}

trySetCount

    @Overridepublic boolean trySetCount(long count) {return get(trySetCountAsync(count));}@Overridepublic RFuture<Boolean> trySetCountAsync(long count) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//设置KEY的值为:count,并发布        NEW_COUNT_MESSAGE 消息。                       "if redis.call('exists', KEYS[1]) == 0 then "+ "redis.call('set', KEYS[1], ARGV[2]); "+ "redis.call('publish', KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);}

await

    @Overridepublic void await() throws InterruptedException {if (getCount() == 0) {return;}RFuture<RedissonCountDownLatchEntry> future = subscribe();try {commandExecutor.syncSubscriptionInterrupted(future);while (getCount() > 0) {// waiting for open statefuture.getNow().getLatch().await();}} finally {unsubscribe(future);}}@Overridepublic long getCount() {return get(getCountAsync());}@Overridepublic RFuture<Long> getCountAsync() {return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());}

countDown

    @Overridepublic void countDown() {get(countDownAsync());}@Overridepublic RFuture<Void> countDownAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local v = redis.call('decr', KEYS[1]);" +"if v <= 0 then redis.call('del', KEYS[1]) end;" +"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);}

pubsub

Java的Lock在获取不到锁的时候,会阻塞,然后通过notify(),notifyAll()等来唤醒。Redisson的锁,类似的流程,使用了Redis的 pubsub 功能来唤醒线程。

RLock在获取锁时,会创建一个订阅。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
{RFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {......}} finally {unsubscribe(future, threadId);}
}//protected final LockPubSub pubSub;protected RFuture<RedissonLockEntry> subscribe(long threadId) {return pubSub.subscribe(getEntryName(), getChannelName());}protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());}//channel名称:redisson_lock__channel:{XXX}String getChannelName() {return prefixName("redisson_lock__channel", getName());}

LockPubSub是PublishSubscribe的一个子类。

RedissonLockEntry

RedissonLockEntry用于控制阻塞,唤醒。

public interface PubSubEntry<E> {void acquire();int release();RPromise<E> getPromise();}public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {//计数。private int counter;//使用Java 信号量进行阻塞private final Semaphore latch;private final RPromise<RedissonLockEntry> promise;//监听器private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {super();this.latch = new Semaphore(0);// 信号量是0,则acquire时阻塞。this.promise = promise;}public void acquire() {counter++;  //计算加1}public int release() {return --counter; //计算减1}public RPromise<RedissonLockEntry> getPromise() {return promise;}public void addListener(Runnable listener) {listeners.add(listener);}public boolean removeListener(Runnable listener) {return listeners.remove(listener);}public ConcurrentLinkedQueue<Runnable> getListeners() {return listeners;}public Semaphore getLatch() {return latch;}}

PublishSubscribe


abstract class PublishSubscribe<E extends PubSubEntry<E>> {//private final PublishSubscribeService service;//PubSubEntry 的map。private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();PublishSubscribe(PublishSubscribeService service) {super();this.service = service;}//创建订阅public RFuture<E> subscribe(String entryName, String channelName) {AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));RPromise<E> newPromise = new RedissonPromise<E>() {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return semaphore.remove(listenerHolder.get());}};//listenerRunnable listener = new Runnable() {@Overridepublic void run() {//获取EntryE entry = entries.get(entryName);if (entry != null) {entry.acquire(); semaphore.release();entry.getPromise().onComplete(new TransferListener<E>(newPromise));return;}//创建新EntryE value = createEntry(newPromise);value.acquire();//E oldValue = entries.putIfAbsent(entryName, value);if (oldValue != null) {oldValue.acquire();semaphore.release();oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));return;}//创建Listener。RedisPubSubListener<Object> listener = createListener(channelName, value);service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);}};semaphore.acquire(listener);listenerHolder.set(listener);return newPromise;}public void unsubscribe(E entry, String entryName, String channelName) {//AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));semaphore.acquire(new Runnable() {@Overridepublic void run() {if (entry.release() == 0) {//引用计数为0,则释放channel。// just an assertionboolean removed = entries.remove(entryName) == entry;if (!removed) {throw new IllegalStateException();}service.unsubscribe(new ChannelName(channelName), semaphore);} else {semaphore.release();}}});}//对子类的覆盖的方法的封装。private RedisPubSubListener<Object> createListener(String channelName, E value) {RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {@Overridepublic void onMessage(CharSequence channel, Object message) {if (!channelName.equals(channel.toString())) {return;}//onMessagePublishSubscribe.this.onMessage(value, (Long) message);}@Overridepublic boolean onStatus(PubSubType type, CharSequence channel) {if (!channelName.equals(channel.toString())) {return false;}if (type == PubSubType.SUBSCRIBE) {value.getPromise().trySuccess(value);return true;}return false;}};return listener;}}
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {public static final Long UNLOCK_MESSAGE = 0L;public static final Long READ_UNLOCK_MESSAGE = 1L;public LockPubSub(PublishSubscribeService service) {super(service);}@Overrideprotected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}//消息处理@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {//锁释放消息if (message.equals(UNLOCK_MESSAGE)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}//Semaphore release。value.getLatch().release();} else if (message.equals(READ_UNLOCK_MESSAGE)) {//读锁释放while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}//Semaphore release。value.getLatch().release(value.getLatch().getQueueLength());}}}
public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {public SemaphorePubSub(PublishSubscribeService service) {super(service);}@Overrideprotected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}//Semaphore relese。value.getLatch().release(message.intValue());}}

AsyncSemaphore

public class AsyncSemaphore {//信号量计数private volatile int counter;//private final Set<Entry> listeners = new LinkedHashSet<Entry>();private static class Entry {private Runnable runnable;private int permits;}  public AsyncSemaphore(int permits) {counter = permits;}  public void acquire(Runnable listener) {acquire(listener, 1);}public void acquire(Runnable listener, int permits) {boolean run = false;synchronized (this) {if (counter < permits) {   //信号量acquire 不满足,则把请求加到等待队列中。listeners.add(new Entry(listener, permits));return;} else {counter -= permits;  //满足,run = true;}}if (run) {       //满足就运行。listener.run();}}public void release() {Entry entryToAcquire = null;synchronized (this) {counter++;         //信号量加1.Iterator<Entry> iter = listeners.iterator();if (iter.hasNext()) {Entry entry = iter.next();if (entry.getPermits() <= counter) {           //唤醒一个可以满足的。iter.remove();entryToAcquire = entry;}}}if (entryToAcquire != null) {acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits());  //有唤醒的,继续请求。}}}

PublishSubscribeService

PublishSubscribeService用于操作Redis,进行pubsub。

public class PublishSubscribeService {private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);//private final ConnectionManager connectionManager;//private final MasterSlaveServersConfig config;//AsyncSemaphore 缓存private final AsyncSemaphore [] locks = new AsyncSemaphore[50];private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();//Semaphore 控制 唤醒线程。private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);//private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);private final LockPubSub lockPubSub = new LockPubSub(this);public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {super();this.connectionManager = connectionManager;this.config = config;for (int i = 0; i < locks.length; i++) {locks[i] = new AsyncSemaphore(1);   //设置信号量为 1 。}}
}
                //PublishSubscribe: public RFuture<E> subscribe(String entryName, String channelName){RedisPubSubListener<Object> listener = createListener(channelName, value);service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);  }//PublishSubscribe: public void unsubscribe(E entry, String entryName, String channelName) {service.unsubscribe(new ChannelName(channelName), semaphore);}// 1)public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);return promise;}// 2)private void subscribe(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {//缓存PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);if (connEntry != null) {addListeners(channelName, promise, type, lock, connEntry, listeners);return;}// freePubSubLock.acquire(new Runnable() {@Overridepublic void run() {if (promise.isDone()) {   //donelock.release();freePubSubLock.release();return;}PubSubConnectionEntry freeEntry = freePubSubConnections.peek();  //下一个if (freeEntry == null) {connect(codec, channelName, promise, type, lock, listeners);return;}int remainFreeAmount = freeEntry.tryAcquire();if (remainFreeAmount == -1) {throw new IllegalStateException();}PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);if (oldEntry != null) {freeEntry.release();freePubSubLock.release();addListeners(channelName, promise, type, lock, oldEntry, listeners);return;}if (remainFreeAmount == 0) {freePubSubConnections.poll();}freePubSubLock.release();RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);ChannelFuture future;if (PubSubType.PSUBSCRIBE == type) {future = freeEntry.psubscribe(codec, channelName);} else {future = freeEntry.subscribe(codec, channelName);}//netty :ChannelFutureListenerfuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {if (!promise.isDone()) {subscribeFuture.cancel(false);}return;}connectionManager.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {subscribeFuture.cancel(false);}}, config.getTimeout(), TimeUnit.MILLISECONDS);}});}});}public RFuture<Void> unsubscribe(ChannelName channelName, AsyncSemaphore lock) {PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);if (entry == null || connectionManager.isShuttingDown()) {lock.release();return RedissonPromise.newSucceededFuture(null);}AtomicBoolean executed = new AtomicBoolean();RedissonPromise<Void> result = new RedissonPromise<Void>();ChannelFuture future = entry.unsubscribe(channelName, new BaseRedisPubSubListener() {@Overridepublic boolean onStatus(PubSubType type, CharSequence channel) {if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {executed.set(true);if (entry.release() == 1) {freePubSubConnections.add(entry);}lock.release();result.trySuccess(null);return true;}return false;}});future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {return;}connectionManager.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {if (executed.get()) {return;}entry.getConnection().onMessage(new PubSubStatusMessage(PubSubType.UNSUBSCRIBE, channelName));}}, config.getTimeout(), TimeUnit.MILLISECONDS);}});return result;}

附录

参考

https://github.com/mrniko/redisson/wiki

中文文档:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D

redisson-2.10.4源代码分析 : https://blog.csdn.net/ly1028826685/article/details/84922706

Redis客户端框架Redisson相关推荐

  1. 阿里云专访Redisson作者Rui Gu:构建开源企业级Redis客户端之路

    摘要: 本文为阿里云同学在RedisConf2018上对Redisson开源客户端作者Rui Gu做的一个专访,主要介绍了Rui Gu参与开启Redisson客户端开发的历程,同时也详细介绍了Redi ...

  2. springboot项目中redis客户端(Jedis、Lettuce、Redisson)

    一.redis客户端的对比 1).Jedis Jedis作为Redis官方推荐的一款客户端,也算是简单好用,基础功能齐全,在中小型项目中还是很好用的,但是Jedis是直连模式,在多个线程间共享一个Je ...

  3. SpringBoot使用Redis 数据访问(单点、集群、哨兵、连接池、Pipline、分布式框架Redisson、解决方案)

    目录 Redis 文献资料 用Redis编程 Redis模块API 教程和常见问题解答 管理 嵌入式和物联网 故障排除 Redis集群 其他基于Redis的分布式系统 在SSD和永久性存储器上进行Re ...

  4. Redis实现分布式锁全局锁—Redis客户端Redisson中分布式锁RLock实现

    2019独角兽企业重金招聘Python工程师标准>>> 1. 前因 以前实现过一个Redis实现的全局锁, 虽然能用, 但是感觉很不完善, 不可重入, 参数太多等等. 最近看到了一个 ...

  5. linux redis客户端,Redisson 3.4.0和2.9.0发布,Redis客户端

    Redisson 3.4.0和2.9.0发布,Redis客户端 发布时间:2017-04-27 09:12:16来源:红联作者:baihuo Redisson于2017年4月26日发布了3.4.0和2 ...

  6. REDIS 客户端封装 SPARROW 框架源码

    redis 本身有客户端,先抛出来一个问题?为什么要对redis客户端进行二次封装? 大概在11年时侯,第一次接触redis,那时侯研究过redis的各种数据结构,直接拿redis的客户端jedis直 ...

  7. 最强 Java Redis 客户端

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:dzone.com/articles/java-distri ...

  8. 分布式锁-Redis解决方案和Redisson解决方案

    文章目录 1:分布式锁的概念 1:概念 2:锁/分布式锁/事务区别 2:本文使用的案例场景 1:需求 2:controller层代码 3:锁控制层代码(使用synchronized 不成功) 4:调用 ...

  9. Redisson--最好用的Redis客户端--介绍

    原文网址:Redisson--最好用的Redis客户端--介绍_IT利刃出鞘的博客-CSDN博客 简介 说明 本文介绍Redisson这款最好用的Redis客户端. 官网 官网:Redisson: R ...

最新文章

  1. MySQL 5.7.2 发布,增强性能和可扩展性
  2. linux ssh 设置的相关总结(ssh最大连接数、ssh连接时长、安全性配置等)
  3. (三)Window的特色学习笔记
  4. Liunx文件的属性(权限) 超详细解析
  5. 7-25 雨刷程序功能扩展设计 (100 分)
  6. 十问十答 BSD 许可证
  7. 关于高效企业测试的思考(1/6)
  8. 前端学习(483):html之常用标签
  9. windows获取IP和MAC地址【Qt】
  10. [译] Facebook杯2013年编程挑战赛——预选赛题目及答案
  11. 拓端tecdat|R语言计量经济学与有时间序列模式的机器学习预测
  12. 软件启动时关于启动兼容问题汇总
  13. LOJ#6038. 「雅礼集训 2017 Day5」远行(LCT)
  14. 常见的python算法题_python笔试常见题
  15. struts2中的actionSupport
  16. html 5标签读音,radish读音
  17. 众多IT精英齐聚首尔,竟是因为这项技术……
  18. Tableau豆瓣电影数据项目实战练习1
  19. cubieboard2 android,在cubieboard2双卡版上从零构建Android4.2.2系统
  20. 【林轩田】机器学习基石(九)——线性回归

热门文章

  1. 网页嵌入暴风影音ActiveX
  2. 反编译apk修改v7包_微信Android SDK反编译还原源码 进行修改重新编译
  3. ernie发音_Ernie[娥妮,厄尼]的中文翻译及英文名意思
  4. windows连接远程打印机
  5. C# Minitab Box-Cox 最优Lambda λ
  6. 神州数码-路由器基本配置
  7. 计算机账务处理流程图,账务处理程序流程图
  8. 2022年T电梯修理模拟考试题库及T电梯修理模拟考试题库
  9. 交叉熵损失函数和NLL_loss
  10. 浅谈设备、驱动的加载和匹配