一、什么是延迟队列(DelayQueue)?

  DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。

     /*** An unbounded {@linkplain BlockingQueue blocking queue} of* {@code Delayed} elements, in which an element can only be taken* when its delay has expired.  The <em>head</em> of the queue is that* {@code Delayed} element whose delay expired furthest in the* past.  If no delay has expired there is no head and {@code poll}* will return {@code null}. Expiration occurs when an element's* {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less* than or equal to zero.  Even though unexpired elements cannot be* removed using {@code take} or {@code poll}, they are otherwise* treated as normal elements. For example, the {@code size} method* returns the count of both expired and unexpired elements.* This queue does not permit null elements.* /

  由定义可知,DelayQueue 是一个无界阻塞队列,队列中的元素只有在延迟期满后才能被取出。队列的头部存储的是最先到期的元素。添加进该队列的元素必须实现 Delayed 接口,指定延迟时间,元素过期的判断是根据 getDelay(TimeUnit unit) 方法的返回值,返回值小于等于 0,则认为元素过期。队列不允许存储空元素。

二、DelayQueue 的使用场景

  DelayQueue 被用于需要延迟处理任务的场景,例如,网民在网上商城下单后,如果超时未支付,订单会被后台系统关闭。这种需要延时处理的场景就可以采用 DelayQueue 实现。

三、原理解析(源码)
3.1 Class 定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {// ...
}

  DelayQueue 类继承了AbstractQueue,并实现了 BlockingQueue 接口,DelayQueue 的泛型参数(即队列中的元素)要实现 Delayed 接口。Delayed 接口定义如下。

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

  Delayed 接口继承了 Comparable 接口,Comparable 接口定义如下。

public interface Comparable<T> {public int compareTo(T o);
}

  所以,延迟队列中的元素要实现 getDelay(TimeUnit unit) 和 compareTo(T o) 两个方法。

  • compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
  • getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
3.2 延迟队列的属性

  DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

  DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式,具体见本文第四节示例。

3.3 DelayQueue 的主要方法
3.3.1 offer 添加元素
public boolean offer(E e) {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lock();try {// 向优先队列中插入元素q.offer(e);// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 if (q.peek() == e) {leader = null;available.signal();}return true;} finally {// 释放全局独占锁lock.unlock();}}
  • leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
  • 如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
  • 如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

  DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

3.3.2 take 取出元素

  take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。

public E take() throws InterruptedException {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 获取队头元素,peek 方法不会删除元素E first = q.peek();if (first == null)// 若队头为空,则阻塞当前线程available.await();else {// 否则获取队头元素的超时时间long delay = first.getDelay(NANOSECONDS);// 已超时,直接出队if (delay <= 0)return q.poll();// 释放 first 的引用,避免内存泄漏first = null; // don't retain ref while waiting// leader != null 表明有其他线程在操作,阻塞当前线程if (leader != null)available.await();else {// leader 指向当前线程Thread thisThread = Thread.currentThread();leader = thisThread;try {// 超时阻塞available.awaitNanos(delay);} finally {// 释放 leaderif (leader == thisThread)leader = null;}}}}} finally {// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列if (leader == null && q.peek() != null)available.signal();// 释放全局独占锁    lock.unlock();}}
3.3.3 poll 取出元素

  取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}
四、什么是 Leader-Follower 模式?

  DelayQueue 采用了 Leader-Follower 模式,那什么是 Leader-Follower 模式呢?举个 “饭店运作模式” 的例子来对照理解。

4.1 单 Reactor 多线程模式(Reactor 模式称为反应器模式或应答者模式)

  饭店的员工一般都是分角色的,比如接待员、服务员、厨师等等。
  假如有一个叫做 A 的人,固定他作为饭店接待员,来客人了就分给客人一个座位号,然后交给其他服务员,比如 B 进行后续处理。B 会根据座位号为客人引路,为客人点菜等等。如果把 A、B 比作两个线程,客人比作任务,任务由 A 处理,到交接给 B 处理,有一次线程上下文切换。

4.2 Leader-Follower 模式

  这次饭店不分角色了,每个人都是接待员和服务员,统称为员工。
  每次只能有一个员工在门口等待,比如 A 先在门口等待,其他员工在屋里歇着。来客人了的话,A 会叫一个其他员工,比如 B 来门口接替自己。然后 A 开始为客人服务,比如分配座位号,引路,点菜等全流程服务。拿线程来说的话,就是接受任务,处理任务都是由线程 A 负责,没有线程上下文切换。

4.3 DelayQueue 的 Leader-Follower 模式

  这次饭店也不分角色,都是员工,但是改变了经营策略,每个客人必须预约吃饭时间,预约采用 APP 预约。因为加入了延时,逻辑变得复杂了一些。
  每次还是只能有一个员工在门口等待,比如 A 先在门口等待,A 看了眼预约登记表,发现离预约最早到店的时间还有 30 分钟,A 就什么都不干了,先休息 30 分钟。
  其他员工还是先在屋里歇着,但是因为采用 APP 预约,客人约几点都有可能,如果此时有客人约的是 10 分钟后到店,因为 A 要 30 分钟后才能醒来干活,所以如果这位客人来了,门口就没有人接待了。
  对于这个问题,饭店的软件系统在监听到最早到店时间变了的话,会再叫一个员工来门口等待,此员工可能是新员工 B,也可能是叫醒了之前在门口休息的员工 A。我们叫这位新员工 X。 如果新员工 X 发现最早到店时间是现在,或者客人已经来了,就会叫一个员工 C 来门口接替自己,并立即开始为客人提供全流程服务。如果新员工 X 发现最早到店时间是 10 分钟后,新员工 X 就像 A 之前一样,什么都不干了,先休息 10 分钟。
  如果最早到店时间没有变化,还是 30 分钟后,软件系统不会叫人,其他员工看到 A 在门口等待,自己可以安心的在屋里歇着,等着 A 叫人替换他。
  员工 A 在 30 分钟后醒来,客人也到了,A 会叫一个同事(比如 B)接替自己,而 A 为客人提供全流程服务。

五、图解 DelayQueue 的生产/消费过程

  DelayQueue 是 Leader-Follower 模式的变种,以下通过队列及消费者线程状态变化大致说明一下 DelayQueue 的运行过程。

初始状态

  因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单,所以此图不考虑向队列添加元素的生产者线程。假设现在共有三个消费者线程。
  队列中的元素按到期时间排序,队列头部的元素 2s 以后到期。消费者线程1查看了头部元素以后,发现还需要 2s 才到期,于是它进入等待状态,2s 以后醒来,等待头部元素到期的线程称为 Leader 线程。
  消费者线程 2 与消费者线程 3 处于待命状态,它们不等待队列中的非头部元素。当消费者线程1拿到对象 5 以后,会向它们发送 signal。这个时候两个线程中的一个会结束待命状态而进入等待状态。

2S 以后

  消费者线程1已经拿到了对象5,从等待状态进入处理状态,处理它取到的对象5,同时向消费者线程 2 与消费者线程 3 发送 signal。
  消费者线程 2 与消费者线程 3 会争抢领导权,这里是消费者线程 2 进入等待状态,成为 Leader 线程,等待 2s 以后对象 4 到期。而消费者线程 3 则继续处于待命状态。
  此时队列中加入了一个新元素对象6,它 10s 后到期,排在队尾。

又 2S 以后

  先看线程1,如果它已经结束了对象 5 的处理,则进入待命状态。如果还没有结束,则它继续处理对象 5。
  消费线程2取到对象 4 以后,也进入处理状态,同时给处于待命状态的消费线程3发送信号,消费线程3进入等待状态,成为新的 Leader。现在头部元素是新插入的对象 7,因为它 1s 以后就过期,要早于其它所有元素,所以排到了队列头部。

又 1S 后

  一种不好的结果:

  消费线程3一定正在处理对象 7。消费线程1与消费线程2还没有处理完它们各自取得的对象,无法进入待命状态,也更加进入不了等待状态。此时对象 3 马上要到期,那么如果它到期时没有消费者线程空下来,则它的处理一定会延期。
  可以想见,如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长。

  另外一种好的结果:

  消费线程1与消费线程2很快完成对取出对象的处理,及时返回重新等待队列中的到期元素。一个处于等待状态(Leader),对象 3 一到期就立刻处理。另一个则处于待命状态。这样,每一个对象都能在到期时被及时处理,不会发生明显的延期。
  所以,消费者线程的数量要够,处理任务的速度要快。否则,队列中的到期元素无法被及时取出并处理,造成任务延期、队列元素堆积等情况。
  如果消费者线程数过少,则来不及处理到期的任务。如果消费者线程数过多,在线程调度、同步上花更多的时间,无益改善性能。

文章参考
  • 延迟队列DelayQueue原理
  • DelayQueue
  • DelayQueue实现原理及应用场景分析
  • 读源码DelayQueue-Leader-Follower模式

Java 延迟队列 DelayQueue 的原理相关推荐

  1. java延迟队列,java高级面试笔试题

    我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家. 扫描二维码或搜索下图红色VX号,加VX好友,拉你进[程序员面试学习交流群]免费领取.也欢迎各位一起 ...

  2. 【JAVA】延迟队列DelayQueue的应用

    最近在开发CRM管理系统时遇到一个需求:销售部门的人员在使用该系统时,可以从[线索公海]模块中 "领取" 潜在的客户线索到自己的[线索私海]模块中,成为自己私有的潜在客户线索,以便 ...

  3. JUC学习 - 延迟队列 DelayQueue 详解

    1.DelayQueue基本特征 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>impl ...

  4. 使用Java延时队列DelayQueue实现订单延时处理

    DelayQueue简单介绍 DelayQueue:一个使用优先级队列实现的无界阻塞队列. 支持延时获取的元素的阻塞队列,元素必须要实现Delayed接口. 适用场景:实现自己的缓存系统,订单到期,限 ...

  5. JAVA延迟队列(实现数据的缓存和定时清理)

    在延迟队列中所保存的每一个元素内容.每当时间一到,(compareTo进行比较,getDelay()获取延迟时间),都会自动进行队里数据的弹出操作; 使用延迟队列(模拟讨论会依次离开的场景) publ ...

  6. 【Java ASQ队列同步器实现原理】简单理解

    ASQ 1.概念 2.核心结构 3.实现原理 3.1 同步状态的获取 3.2 同步队列 3.3 独占式同步状态的获取 3.4 独占式同步状态的释放 1.概念 队列同步器AbstractQueuedSy ...

  7. python延时队列_如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用rabbitmq做异步处理任务的中间件,所 ...

  8. 完整案例:实现延迟队列的两种方法

    延迟队列是指把当前要做的事情,往后推迟一段时间再做. 延迟队列在实际工作中和面试中都比较常见,它的实现方式有很多种,然而每种实现方式也都有它的优缺点,接下来我们来看. 延迟队列的使用场景 延迟队列的常 ...

  9. java实现消息队列以及延迟消息(队列DelayQueue)

    1.java实现延迟消息(队列DelayQueue) DelayQueue是一个支持延时获取元素的无界阻塞队列.队列使用PriorityQueue来实现.队列中的元素必须实现Delayed接口,在创建 ...

  10. java 延时队列_Java实现简单延迟队列和分布式延迟队列

    在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队 ...

最新文章

  1. 创建并运行HelloWorld Java项目和类
  2. 编程语言介绍、python解释器执行代码的过程
  3. 整理:关于聚簇索引和非聚簇索引的区别
  4. android信鸽推送demo_【厚积薄发】手游推送方案
  5. 使用Commons Logging - Java异常处理
  6. python树代码_浅析AST抽象语法树及Python代码实现
  7. Ext.state.Manager.setProvider(new Ext.state.CookieProvider())
  8. beetl 国内下载量貌似快跟freemaker的差不多了
  9. 小学生计算机的样子,小学生状物作文:我家的电脑
  10. jdbc oracle clob blob long类型数据
  11. html js注册表单代码,用户注册常用javascript代码
  12. html实现点击下载文件
  13. 主流前端框架的介绍优缺点
  14. EV:ePWM+eCAP
  15. LeetCode—5757. 矩阵中最大的三个菱形和(Get Biggest ...)[中等]—分析及代码(Java)
  16. DW 6 CS6 通用破解方法
  17. Oracle安装手册
  18. 编程领域名词:魔法数值、魔法数字、魔法值
  19. 漏洞修复需要升级打补丁,打补丁准备工作
  20. 返回html404字符串,post提交数据后,返回的网页错误404怎么回事。

热门文章

  1. 商业银行资产托管业务读书笔记
  2. 飞思卡尔智能车知识总结
  3. 【SLAM学习】(三)激光雷达原理及分类
  4. 电子签章系统解决方案
  5. ArcGIS空间大数据平台与HadoopSpark大数据平台的集成与开发
  6. 体脂率在线计算机,如何简单测算出自己的体脂率?
  7. 破解Windows系统密码---利用PE系统破解
  8. 华为模拟器eNSP下载与安装(win10系统)
  9. 我的HTC G16 CHACHA A810e版手机如何解锁和一键root的
  10. Java学习笔记:2021年12月31日下午-2022年1月1日上午