聊聊redisson的DelayedQueue
为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下redisson的DelayedQueue
maven
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.8.1</version></dependency>
实例
@Testpublic void testDelayedQueue() throws InterruptedException {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.99.100:6379");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);delayedQueue.offer("demo", 10, TimeUnit.SECONDS);Assert.assertFalse(blockingQueue.contains("demo"));TimeUnit.SECONDS.sleep(15);Assert.assertTrue(blockingQueue.contains("demo"));}
- 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue
源码解析
RDelayedQueue.offer
redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {private final QueueTransferService queueTransferService;private final String channelName;private final String queueName;private final String timeoutSetName;protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);channelName = prefixName("redisson_delay_queue_channel", getName());queueName = prefixName("redisson_delay_queue", getName());timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());//QueueTransferTask task = ......queueTransferService.schedule(queueName, task);this.queueTransferService = queueTransferService;}public void offer(V e, long delay, TimeUnit timeUnit) {get(offerAsync(e, delay, timeUnit));}public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {long delayInMs = timeUnit.toMillis(delay);long timeout = System.currentTimeMillis() + delayInMs;long randomId = PlatformDependent.threadLocalRandom().nextLong();return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);"+ "redis.call('rpush', KEYS[3], value);"// if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "+ "if v[1] == value then "+ "redis.call('publish', KEYS[4], ARGV[1]); "+ "end;",Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));}public ByteBuf encode(Object value) {if (commandExecutor.isRedissonReferenceSupportEnabled()) {RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);if (reference != null) {value = reference;}}try {return codec.getValueEncoder().encode(value);} catch (IOException e) {throw new IllegalArgumentException(e);}}public static String prefixName(String prefix, String name) {if (name.contains("{")) {return prefix + ":" + name;}return prefix + ":{" + name + "}";}//......
}
- 这里使用的是一段lua脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
- 变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
- 这段lua脚本对timeoutSetName的zset添加一个结构体,其score为timeout值;对queueName的list的表尾添加结构体;然后判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息
queueTransferService.schedule
redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {@Overrideprotected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "+ "if #expiredValues > 0 then "+ "for i, v in ipairs(expiredValues) do "+ "local randomId, value = struct.unpack('dLc0', v);"+ "redis.call('rpush', KEYS[1], value);"+ "redis.call('lrem', KEYS[3], 1, v);"+ "end; "+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"+ "end; "// get startTime from scheduler queue head task+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "+ "if v[1] ~= nil then "+ "return v[2]; "+ "end "+ "return nil;",Arrays.<Object>asList(getName(), timeoutSetName, queueName), System.currentTimeMillis(), 100);}@Overrideprotected RTopic<Long> getTopic() {return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);}};queueTransferService.schedule(queueName, task);
- RedissonDelayedQueue构造器里头对QueueTransferTask进行调度
- 调度执行的是pushTaskAsync方法,主要就是将到期的元素从元素队列移到目标队列
- 这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
- 这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素,取前200条
- 如果有值表示该元素需要移交到目标队列,然后调用rpush移交到目标队列,再调用lrem从元素队列移除,最后在从timeoutSetName的zset中删除掉已经处理的这些元素
- 处理完过元素转移之后,再取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil
QueueTransferService.schedule
redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java
public class QueueTransferService {private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();public synchronized void schedule(String name, QueueTransferTask task) {QueueTransferTask oldTask = tasks.putIfAbsent(name, task);if (oldTask == null) {task.start();} else {oldTask.incUsage();}}public synchronized void remove(String name) {QueueTransferTask task = tasks.get(name);if (task != null) {if (task.decUsage() == 0) {tasks.remove(name, task);task.stop();}}}
}
- 这里的schedule方法首先添加到ConcurrentMap中,如果该任务已经存在,则调用oldTask.incUsage(),不存在则启动该任务
QueueTransferTask.start
redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java
public void start() {RTopic<Long> schedulerTopic = getTopic();statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {@Overridepublic void onSubscribe(String channel) {pushTask();}});messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {@Overridepublic void onMessage(CharSequence channel, Long startTime) {scheduleTask(startTime);}});}private void scheduleTask(final Long startTime) {TimeoutTask oldTimeout = lastTimeout.get();if (startTime == null) {return;}if (oldTimeout != null) {oldTimeout.getTask().cancel();}long delay = startTime - System.currentTimeMillis();if (delay > 10) {Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Overridepublic void run(Timeout timeout) throws Exception {pushTask();TimeoutTask currentTimeout = lastTimeout.get();if (currentTimeout.getTask() == timeout) {lastTimeout.compareAndSet(currentTimeout, null);}}}, delay, TimeUnit.MILLISECONDS);if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {timeout.cancel();}} else {pushTask();}}private void pushTask() {RFuture<Long> startTimeFuture = pushTaskAsync();startTimeFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {if (!future.isSuccess()) {if (future.cause() instanceof RedissonShutdownException) {return;}log.error(future.cause().getMessage(), future.cause());scheduleTask(System.currentTimeMillis() + 5 * 1000L);return;}if (future.getNow() != null) {scheduleTask(future.getNow());}}});}
- 这里用到了RTopic,添加了StatusListener以及MessageListener
- StatusListener在订阅的时候触发pushTask,MessageListener主要是调用scheduleTask
- pushTaskAsync在RedissonDelayedQueue的实现就是上面讲的实现元素在原始队列及目标队列的转移
- scheduleTask方法会重新计算delay,对于大于10的延时触发pushTask,小于等于10的则立刻触发pushTask
- pushTask会对pushTaskAsync操作进行回调,如果执行不成功则重新触发scheduleTask,如果执行成功但是返回值(
timeoutSetName的zset的第一个元素的得分
)不为null的话,则以该值触发scheduleTask
小结
- redisson的DelayedQueue使用上是将元素及延时信息入队,之后定时任务将到期的元素转移到目标队列
- 这里使用了三个结构来存储,一个是目标队列list;一个是原生队列list,添加的是带有延时信息的结构体;一个是timeoutSetName的zset,元素是结构体,其score为timeout值
- redisson使用了很多异步回调来操作,整体代码阅读上会相对费劲些
doc
- delayed-queue
转载于:https://my.oschina.net/go4it/blog/2206612
聊聊redisson的DelayedQueue相关推荐
- 聊聊redisson的分布式锁
序 本文主要研究一下redisson的分布式锁 maven <dependency><groupId>org.redisson</groupId><artif ...
- 聊聊redis分布式锁的8大坑
在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些 ...
- 卧槽,redis分布式锁如果用不好,坑真多
在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些 ...
- 新来个技术总监,这Redis分布式锁设计的真漂亮!
前言 在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引 ...
- 实际开发中使用Redis做分布式锁,躲坑指南,收藏起来
今天我们来聊聊Redis分布式锁,曾经被Redis分布式锁的坑给坑惨了,接下来,我就进行一个完整的整理,希望大家都能避免踩坑. 在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁 ...
- Redis入门笔记2
四 Redis 解决session共享[刚需] 4.1 session共享问题 我们之前都是单点项目,对于用户的信息存储都是使用session进行存储.但是在集群环境中,此时session就会有问题: ...
- redis分布式锁的8大坑【Redis分布式锁】
在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些 ...
- 聊聊分布式锁——Redis和Redisson的方式
聊聊分布式锁--Redis和Redisson的方式 一.什么是分布式锁 分布式~~锁,要这么念,首先得是『分布式』,然后才是『锁』 分布式:这里的分布式指的是分布式系统,涉及到好多技术和理论,包括CA ...
- 使用Redisson优雅关闭订单
在支付系统中,订单通常是具有时效性的,例如在下单30分钟后如果还没有完成支付,那么就要取消订单,不能再执行后续流程.说到这,可能大家的第一反应是启动一个定时任务,来轮询订单的状态是否完成了支付,如果超 ...
最新文章
- jquery学习之重要知识点
- 音频编码标准发展现状
- UVA 1622 Robot
- JAVA通过JCO连接SAP例子
- 刘强东为抗疫发声:我们送的不是货,是温暖和希望!
- 英国鬼死于狭隘和傲慢,中国鬼死于听天由命和漫不经心--《我的团长我的团》兰晓龙...
- 跟着百度学PHP[13]-文件上传
- idea无法正常使用SVN的解决方法
- 如何开发 Web 应用程序
- 祝微软北京.net俱乐部徐磊生日快乐
- stm32最小原理图的PCB图绘制(含AHT20温度传感器)
- JAVA IO流读取中文出现乱码
- Android系统开发
- 我们可能都低估了浪潮存储
- [总结] 全部笔记博文目录总结(持续更新...)
- 贵卅大学计算机研究生院导师,贵州大学机械工程学院研究生导师:罗绍华
- 一,Weston简介
- 【刷题】验证回文字符串
- 企业如何做好项目管理工作?
- 手机(小米10s)接收微信语音\视频通话时,蓝牙耳机(小米Air2s)无法使用,手机仍外放