为什么80%的码农都做不了架构师?>>>   

本文主要研究一下redisson的DelayedQueue

maven

      <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.8.1</version></dependency>

实例

    @Testpublic void testDelayedQueue() throws InterruptedException {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.99.100:6379");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);delayedQueue.offer("demo", 10, TimeUnit.SECONDS);Assert.assertFalse(blockingQueue.contains("demo"));TimeUnit.SECONDS.sleep(15);Assert.assertTrue(blockingQueue.contains("demo"));}
  • 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue

源码解析

RDelayedQueue.offer

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {private final QueueTransferService queueTransferService;private final String channelName;private final String queueName;private final String timeoutSetName;protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);channelName = prefixName("redisson_delay_queue_channel", getName());queueName = prefixName("redisson_delay_queue", getName());timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());//QueueTransferTask task = ......queueTransferService.schedule(queueName, task);this.queueTransferService = queueTransferService;}public void offer(V e, long delay, TimeUnit timeUnit) {get(offerAsync(e, delay, timeUnit));}public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {long delayInMs = timeUnit.toMillis(delay);long timeout = System.currentTimeMillis() + delayInMs;long randomId = PlatformDependent.threadLocalRandom().nextLong();return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);"+ "redis.call('rpush', KEYS[3], value);"// if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "+ "if v[1] == value then "+ "redis.call('publish', KEYS[4], ARGV[1]); "+ "end;",Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));}public ByteBuf encode(Object value) {if (commandExecutor.isRedissonReferenceSupportEnabled()) {RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);if (reference != null) {value = reference;}}try {return codec.getValueEncoder().encode(value);} catch (IOException e) {throw new IllegalArgumentException(e);}}public static String prefixName(String prefix, String name) {if (name.contains("{")) {return prefix + ":" + name;}return prefix + ":{" + name + "}";}//......
}
  • 这里使用的是一段lua脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
  • 变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
  • 这段lua脚本对timeoutSetName的zset添加一个结构体,其score为timeout值;对queueName的list的表尾添加结构体;然后判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息

queueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {@Overrideprotected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "+ "if #expiredValues > 0 then "+ "for i, v in ipairs(expiredValues) do "+ "local randomId, value = struct.unpack('dLc0', v);"+ "redis.call('rpush', KEYS[1], value);"+ "redis.call('lrem', KEYS[3], 1, v);"+ "end; "+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"+ "end; "// get startTime from scheduler queue head task+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "+ "if v[1] ~= nil then "+ "return v[2]; "+ "end "+ "return nil;",Arrays.<Object>asList(getName(), timeoutSetName, queueName), System.currentTimeMillis(), 100);}@Overrideprotected RTopic<Long> getTopic() {return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);}};queueTransferService.schedule(queueName, task);
  • RedissonDelayedQueue构造器里头对QueueTransferTask进行调度
  • 调度执行的是pushTaskAsync方法,主要就是将到期的元素从元素队列移到目标队列
  • 这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
  • 这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素,取前200条
  • 如果有值表示该元素需要移交到目标队列,然后调用rpush移交到目标队列,再调用lrem从元素队列移除,最后在从timeoutSetName的zset中删除掉已经处理的这些元素
  • 处理完过元素转移之后,再取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil

QueueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java

public class QueueTransferService {private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();public synchronized void schedule(String name, QueueTransferTask task) {QueueTransferTask oldTask = tasks.putIfAbsent(name, task);if (oldTask == null) {task.start();} else {oldTask.incUsage();}}public synchronized void remove(String name) {QueueTransferTask task = tasks.get(name);if (task != null) {if (task.decUsage() == 0) {tasks.remove(name, task);task.stop();}}}
}
  • 这里的schedule方法首先添加到ConcurrentMap中,如果该任务已经存在,则调用oldTask.incUsage(),不存在则启动该任务

QueueTransferTask.start

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java

    public void start() {RTopic<Long> schedulerTopic = getTopic();statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {@Overridepublic void onSubscribe(String channel) {pushTask();}});messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {@Overridepublic void onMessage(CharSequence channel, Long startTime) {scheduleTask(startTime);}});}private void scheduleTask(final Long startTime) {TimeoutTask oldTimeout = lastTimeout.get();if (startTime == null) {return;}if (oldTimeout != null) {oldTimeout.getTask().cancel();}long delay = startTime - System.currentTimeMillis();if (delay > 10) {Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    @Overridepublic void run(Timeout timeout) throws Exception {pushTask();TimeoutTask currentTimeout = lastTimeout.get();if (currentTimeout.getTask() == timeout) {lastTimeout.compareAndSet(currentTimeout, null);}}}, delay, TimeUnit.MILLISECONDS);if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {timeout.cancel();}} else {pushTask();}}private void pushTask() {RFuture<Long> startTimeFuture = pushTaskAsync();startTimeFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {if (!future.isSuccess()) {if (future.cause() instanceof RedissonShutdownException) {return;}log.error(future.cause().getMessage(), future.cause());scheduleTask(System.currentTimeMillis() + 5 * 1000L);return;}if (future.getNow() != null) {scheduleTask(future.getNow());}}});}
  • 这里用到了RTopic,添加了StatusListener以及MessageListener
  • StatusListener在订阅的时候触发pushTask,MessageListener主要是调用scheduleTask
  • pushTaskAsync在RedissonDelayedQueue的实现就是上面讲的实现元素在原始队列及目标队列的转移
  • scheduleTask方法会重新计算delay,对于大于10的延时触发pushTask,小于等于10的则立刻触发pushTask
  • pushTask会对pushTaskAsync操作进行回调,如果执行不成功则重新触发scheduleTask,如果执行成功但是返回值(timeoutSetName的zset的第一个元素的得分)不为null的话,则以该值触发scheduleTask

小结

  • redisson的DelayedQueue使用上是将元素及延时信息入队,之后定时任务将到期的元素转移到目标队列
  • 这里使用了三个结构来存储,一个是目标队列list;一个是原生队列list,添加的是带有延时信息的结构体;一个是timeoutSetName的zset,元素是结构体,其score为timeout值
  • redisson使用了很多异步回调来操作,整体代码阅读上会相对费劲些

doc

  • delayed-queue

转载于:https://my.oschina.net/go4it/blog/2206612

聊聊redisson的DelayedQueue相关推荐

  1. 聊聊redisson的分布式锁

    序 本文主要研究一下redisson的分布式锁 maven <dependency><groupId>org.redisson</groupId><artif ...

  2. 聊聊redis分布式锁的8大坑

    在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些 ...

  3. 卧槽,redis分布式锁如果用不好,坑真多

    在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些 ...

  4. 新来个技术总监,这Redis分布式锁设计的真漂亮!

    前言 在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引 ...

  5. 实际开发中使用Redis做分布式锁,躲坑指南,收藏起来

    今天我们来聊聊Redis分布式锁,曾经被Redis分布式锁的坑给坑惨了,接下来,我就进行一个完整的整理,希望大家都能避免踩坑. 在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁 ...

  6. Redis入门笔记2

    四 Redis 解决session共享[刚需] 4.1 session共享问题 我们之前都是单点项目,对于用户的信息存储都是使用session进行存储.但是在集群环境中,此时session就会有问题: ...

  7. redis分布式锁的8大坑【Redis分布式锁】

    在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些 ...

  8. 聊聊分布式锁——Redis和Redisson的方式

    聊聊分布式锁--Redis和Redisson的方式 一.什么是分布式锁 分布式~~锁,要这么念,首先得是『分布式』,然后才是『锁』 分布式:这里的分布式指的是分布式系统,涉及到好多技术和理论,包括CA ...

  9. 使用Redisson优雅关闭订单

    在支付系统中,订单通常是具有时效性的,例如在下单30分钟后如果还没有完成支付,那么就要取消订单,不能再执行后续流程.说到这,可能大家的第一反应是启动一个定时任务,来轮询订单的状态是否完成了支付,如果超 ...

最新文章

  1. jquery学习之重要知识点
  2. 音频编码标准发展现状
  3. UVA 1622 Robot
  4. JAVA通过JCO连接SAP例子
  5. 刘强东为抗疫发声:我们送的不是货,是温暖和希望!
  6. 英国鬼死于狭隘和傲慢,中国鬼死于听天由命和漫不经心--《我的团长我的团》兰晓龙...
  7. 跟着百度学PHP[13]-文件上传
  8. idea无法正常使用SVN的解决方法
  9. 如何开发 Web 应用程序
  10. 祝微软北京.net俱乐部徐磊生日快乐
  11. stm32最小原理图的PCB图绘制(含AHT20温度传感器)
  12. JAVA IO流读取中文出现乱码
  13. Android系统开发
  14. 我们可能都低估了浪潮存储
  15. [总结] 全部笔记博文目录总结(持续更新...)
  16. 贵卅大学计算机研究生院导师,贵州大学机械工程学院研究生导师:罗绍华
  17. 一,Weston简介
  18. 【刷题】验证回文字符串
  19. 企业如何做好项目管理工作?
  20. 手机(小米10s)接收微信语音\视频通话时,蓝牙耳机(小米Air2s)无法使用,手机仍外放

热门文章

  1. 倒计时|全场书籍低至 3.5 折起,无门槛包邮!
  2. 厉害了程序员~凡尔赛文学现作 | 每日趣闻
  3. keras第二课:后端函数
  4. 分布式架构探索 - 2. WebService RPC框架之Apache CXF
  5. 交换机运维需要注意哪些问题,让我们一起来闲聊下
  6. DSAPI多功能组件编程应用-DS提示气泡
  7. Ubuntu安装ftp服务器
  8. 于XAML导入命名空间的代码
  9. ubuntu访问win7
  10. Nginx启动提示找不到libpcre.so.1解决方法