花了一天研究了下Redisson 的延时队列,RBlockingQueue ,RDelayedQueue 。 网上没一个说清楚的,而且都是说轮询redis的zset,都是错误的! 让我来纠正,如果我有错的也可指出。

Demo用法

public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {Config config = new Config();config.useSingleServer().setAddress("redis://172.29.2.10:7000");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);new Thread() {public void run() {while(true) {try {//阻塞队列有数据就返回,否则waitSystem.err.println( blockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}};}.start();for(int i=1;i<=5;i++) {// 向阻塞队列放入数据delayedQueue.offer("fffffffff"+i, 13, TimeUnit.SECONDS);}
}

上面构造了Redisson 阻塞延时队列,然后向里面塞了5条数据,都是13秒后到期。我们先不启动程序,先打开redis执行:

[root@localhost redis-cluster]# redis-cli -c -p 7000 -h 172.29.2.10 --raw
172.29.2.10:7000> monitor
OK

monitor 命令可以监控redis执行了哪些命令,注意线上不要乱搞,耗性能的。然后我们启动程序,观察redis执行命令情况,这里分为三个阶段:

第一介段: 客户端程序启动,offer方法执行之前 ,redis服务会收到如下redis命令:

1610452446.652126 [0 172.29.2.194:65025] "SUBSCRIBE" "redisson_delay_queue_channel:{dest_queue1}"
1610452446.672009 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452442403" "limit" "0" "2"
1610452446.672018 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452446.673896 [0 172.29.2.194:65034] "BLPOP" "dest_queue1" "0"

SUBSCRIBE

这里订阅了一个固定的队列 redisson_delay_queue_channel:{dest_queue1}, 就是为了开启进程里面的延时任务,很重要,redisson延时取数据都靠它了。后面会说。

zrangebyscore

zrangebyscore用法扫盲
>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count]
分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。

redisson_delay_queue_timeout:{dest_queue1} 是一个zset,当有延时数据存入Redisson队列时,就会在此队列中插入 数据,排序分数为延时的时间戳。

zrangebyscore就是取出前2条(源码是100条,如下图)过了当前时间的数据。如果取的是0的话就执行下面的zrange, 这里程序刚启动肯定是0(除非是之前的队列数据没有取完)。之所以在刚启动时 这样取数据就是为了把上次进程宕机后没发完的数据发完。

zrange

取出第一个数,也就是判断上面的还有不有下一页。

BLPOP

移出并获取 dest_queue1 列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 , 这里显然没有元素 ,就会一直阻塞。

第二介段: 执行offer向Redisson队列设置值

这个阶段我们发现redis干了下面事情:

1610452446.684465 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455407" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452446.684480 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452446.684492 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.684498 [0 lua] "publish" "redisson_delay_queue_channel:{dest_queue1}" "1610452455407"
1610452446.687922 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455422" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687943 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687958 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.690478 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455424" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452446.690492 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452446.690502 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.692661 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455427" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452446.692674 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452446.692683 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.696054 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455429" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452446.696081 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452446.696098 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"

我们客户端是设置了5条数据。上面也可以看出来。

zadd

往我们zset里面设置 数据截止的时间戳(当前执行的时间戳+延时的时间毫秒值),内容为我们的ffffff1 ,不过特殊编码了,加了点什么,不用管。

rpush

同步一份数据到list队列,这里也不知道干嘛的,先放到这里。

zrange+publish

取出排序好的第一个数据,也就是最临近要触发的数据,然后发送通知 (之前订阅了的客户端,可能是微服务就有多个客户端),内容为将要触发的时间。客户端收到通知后,就在自己进程里面开启延时任务(HashedWheelTimer),到时间后就可以从redis取数据发送。

后面又是我们的5条循环的设置数据 zadd...

第三介段:到延时时间取redis数据

1610452459.680953 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455416" "limit" "0" "2"
1610452459.680967 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff1"
1610452459.680976 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452459.680984 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452459.680991 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES" // 判断是否有值
1610452459.745813 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455480" "limit" "0" "2"
1610452459.745829 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff2"
1610452459.745837 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452459.745845 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff3"
1610452459.745848 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452459.745855 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452459.745864 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.756909 [0 172.29.2.194:65026] "BLPOP" "dest_queue1" "0"
1610452459.758092 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455493" "limit" "0" "2"
1610452459.758108 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff4"
1610452459.758114 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452459.758121 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff5"
1610452459.758124 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452459.758133 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452459.758143 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.759030 [0 172.29.2.194:65037] "BLPOP" "dest_queue1" "0"
1610452459.760933 [0 172.29.2.194:65036] "BLPOP" "dest_queue1" "0"
1610452459.763913 [0 172.29.2.194:65038] "BLPOP" "dest_queue1" "0"
1610452459.765999 [0 172.29.2.194:65039] "BLPOP" "dest_queue1" "0"

这个阶段是由客户端进程里面的延时任务执行的,延时任务是在第二阶段构造的,已经说了(通过redis的订阅/发布实现)。

zrangebyscore

取出前2条到时间的数据,第一阶段已说。

rpush

将上面取到的数据push到阻塞队列,注意我们第一阶段已经监听了这个阻塞队列

"BLPOP" "dest_queue1" "0" 

所以这里就会通知客户端取数据。

lrem + zrem

将取完的数据删掉。

zrange

取zset第一个数据,有的话继续上面逻辑取数据,否则进入下面。

BLPOP

继续监听这个阻塞队列。以便下次用。

小结一下

  • 客户端启动,redisson先订阅一个key,同时 BLPOP key 0 无限监听一个阻塞队列(等里面有数据了就返回)。
  • 当有数据put时,redisson先把数据放到一个zset集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的key,发布内容为数据到期的timeout,此时客户端进程开启一个延时任务,延时时间为发布的timeout。
  • 客户端进程的延时任务到了时间执行,从zset分页取出过了当前时间的数据,然后将数据rpush到第一步的阻塞队列里。然后将当前数据从zset移除,取完之后,又执行 BLPOP key 0 无限监听一个阻塞队列。
  • 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take方法。于是,我们就收到了数据。

大致原理就是这样,redisson不是通过轮询zset的,将延时任务执行放到进程里面实现,只有到时间才会取redis zset。

redisson里面还有很多异常,重试机制 没讲。毕竟时间就一天,没法全部吃透。有了这些原理,我相信你也能实现一个属于自己的redisson延时队列了。

redisson延时队列优化

由于我在线上使用了redisson延时队列,在数据量小的时候表现很佳也很稳定,但我们瞬时流量特别大,发生了到了延时时间了还给我延时十几分钟的情况,这个是我万万没想到的。

这个是我测试的情况

当我设置了14511条数据到redisson延时队列时,取出来的时间在本身的延时时间上还延时了198636多毫秒,而且时间随着数据增加而增加。 我们线上是微信自动发消息业务,这样会导致你跟你女朋友推晚安消息,结果她第二天早上收到,然后她就认为你勾搭上了美国的妞,和你分手,并暗暗自喜到老娘早就有外遇了,就盼着这天了。

为了你的性福,于是我想到了一个优化的法子,构建多个redisson队列,类似cluster模式。需要自己开发。

我把一个redisson延时队列实例放到了一数组里面,然后put值的时候采用轮训的负载均衡模式,put和take都是采用线程池,结果收到了很理想的效果,下面是测试结果:

10万多条数据,真实延时时间最大33399毫秒,已经表现很好了,毕竟我开发环境redis特别垃圾。

要了解实现的 请见这篇文章

解决 Redisson 延时队列 延时严重问题

兄弟(妹子)不要以为就这样江湖再见了,我还要烦你一点时间。

上面介绍了进程里面的延时任务都是一笔带过,下面来讲讲下它的原理。redisson使用的是netty里面的延时任务 io.netty.util.HashedWheelTimer

HashedWheelTimer 实现原理

HashedWheelTimer本质是一种类似延迟任务队列的实现,适用于对时效性不高的,可快速执行的,大量这样的“小”任务,能够做到高性能,低消耗

redisson是在这里用的 org.redisson.connection.MasterSlaveConnectionManager

   // 初始化  timer protected void initTimer(MasterSlaveServersConfig config) {int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};Arrays.sort(timeouts);int minTimeout = timeouts[0];if (minTimeout % 100 != 0) {minTimeout = (minTimeout % 100) / 2;} else if (minTimeout == 100) {minTimeout = 50;} else {minTimeout = 100;}// minTimeout 为100timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);connectionWatcher = new IdleConnectionWatcher(this, config);subscribeService = new PublishSubscribeService(this, config);}@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {try {System.err.println(time + " HadLuo ======================newTimeout==================" + task + " , "+ delay+"ms");return timer.newTimeout(task, delay, unit);} catch (IllegalStateException e) {if (isShuttingDown()) {return DUMMY_TIMEOUT;}throw e;}}

抽象出来就是这样

HashedWheelTimer timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), 100,TimeUnit.MILLISECONDS, 1024, false);
// 构建一个延时任务
timer.newTimeout((time) -> {System.err.println("到了12s后了,该娶媳妇了~");
}, 12, TimeUnit.SECONDS);

算了,下篇文章讲下原理吧,篇幅还有点长,请移致下面文章。

HashedWheelTimer 源码解析​githubs.xyz

强烈推荐一个 进阶 JAVA架构师 的博客

Java架构师修炼​githubs.xyz

Redisson 延时队列 原理 详解相关推荐

  1. 单调队列-原理详解(deque实现)

    一.单调队列的概念: 单调队列,即单调递减或单调递增的队列. 二.单调队列的性质: 1.  队列中的元素在原来的列表中的位置是由前往后的(随着循环顺序入队). 2.  队列中元素的大小是单调递增或递减 ...

  2. AQS抽象队列同步器原理详解

    系列文章目录 第一节 synchronized关键字详解-偏向锁.轻量级锁.偏向锁.重量级锁.自旋.锁粗化.锁消除 AQS抽象队列同步器原理详解 系列文章目录 前言 一.AQS特性 二.AQS原理 1 ...

  3. Kafka 原理详解

    Kafka 原理详解 1 kakfa基础概念说明 Broker:消息服务器,就是我们部署的一个kafka服务 Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存 ...

  4. 操作系统:基于页面置换算法的缓存原理详解(下)

    概述: 在上一篇<操作系统:基于页面置换算法的缓存原理详解(上)>中,我们主要阐述了FIFO.LRU和Clock页面置换算法.接着上一篇说到的,本文也有三个核心算法要讲解.分别是LFU(L ...

  5. JAVA消息服务JMS规范及原理详解

    一.简介 JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进 ...

  6. Java LinkedList的实现原理详解

    LinkedList是Java List类型的集合类的一种实现,此外,LinkedList还实现了Deque接口.本文基于Java1.8,对于LinkedList的实现原理做一下详细讲解. (Java ...

  7. java8 stream运行原理之并行流原理详解

    上一篇文章<java8 stream运行原理之顺序流原理详解>介绍了顺序流的执行原理,本文接着上一篇介绍并行流的执行原理. 一.如何创建并行流 调用parallel()方法可以创建并行流, ...

  8. 并发编程五:java并发线程池底层原理详解和源码分析

    文章目录 java并发线程池底层原理详解和源码分析 线程和线程池性能对比 Executors创建的三种线程池分析 自定义线程池分析 线程池源码分析 继承关系 ThreadPoolExecutor源码分 ...

  9. BlockingQueue(阻塞队列)详解

    推荐:Java并发编程汇总 BlockingQueue(阻塞队列)详解 原文地址 BlockingQueue 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程 ...

最新文章

  1. 当 K8s 集群达到万级规模,阿里巴巴如何解决系统各组件性能问题?
  2. python中的import详解_python中的import
  3. MySQL修改数据类型语句
  4. python单词的含义-python 前面几个单词含义
  5. 带宽与虚拟桌面的考虑
  6. Linux下passwd和shadow文件内容详解
  7. 优词词根词典mdx_中外英语词典223种(Txt格式)免费分享
  8. MVC 生成PDf表格并插入图片
  9. 基于android物流快递服务系统app
  10. 零基础学习C语言的第一天
  11. Excel2013 基本用法(上)
  12. vbs恶搞小程序速成
  13. 《利用Python进行数据分析·第2版》第14章 数据分析案例
  14. H5实现全屏与F11全屏
  15. C51——初识PWM
  16. php网页画线,HTML5 canvas基本绘图之绘制线条
  17. 排课表--拓扑排序【自己写的拓扑排序方法】[1]
  18. Jade ( Translucency / Subsurface Scattering ) Shader——玉石效果——SSS深度图实现
  19. CVPR 2020 论文大盘点-图像质量评价篇
  20. 视频教程-《2048》-UGUI搞定2D游戏研发-Unity3D

热门文章

  1. L2-009 抢红包 (25分)
  2. IDEA打包Maven项目
  3. Matlab 产生伪随机调相信号与伪随机信号
  4. 分享5款轻量级的Win10神器,错过你会后悔的
  5. 视界云与华为云签署全面合作协议,强强联合共铸未来云生态
  6. tableau高级绘图(十)-Tableau绘制圆形图
  7. 基于录制的路径建立栅格地图--适用于室外的割草机扫地机
  8. 中国自推式割晒机行业市场供需与战略研究报告
  9. 水管混装存在哪些隐患
  10. 进销存软件如何解决服装行业问题与痛点