21 DelayQueue 源码解析

引导语

之前我们说的阻塞队列,都是资源足够时立马执行。本章我们说的队列比较特殊,是一种延迟队列,意思是延迟执行,并且可以设置延迟多久之后执行,比如设置过 5 秒钟之后再执行,在一些延迟执行的场景被大量使用,比如说延迟对账等等。

1 整体设计

DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟 5 秒执行,那么当前线程就会沉睡 5 秒,等 5 秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如当运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现的。

1.1 类注释

类注释上比较简单,只说了三个概念:

  1. 队列中元素将在过期时被执行,越靠近队头,越早过期;
  2. 未过期的元素不能够被 take;
  3. 不允许空元素。

这三个概念,其实就是三个问题,下文我们会一一看下这三点是如何实现的。

1.2 类图

DelayQueue 的类图和之前的队列一样,不多说,关键是 DelayQueue 类上是有泛型的,如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {

从泛型中可以看出,DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,如下:

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

也就是说 DelayQueue 队列中的元素必须是实现 Delayed 接口和 Comparable 接口的,并覆写了 getDelay 方法和 compareTo 的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现 Delayed 接口。

除此之外 DelayQueue 还大量使用了 PriorityQueue 队列的大量功能,这个和 SynchronousQueue 队列很像,大量复用了其它基础类的逻辑,代码示例如下:

PriorityQueue 中文叫做优先级队列,在此处的作用就是可以根据过期时间做优先级排序,让先过期的可以先执行,用来实现类注释中的第一点。

这里的复用的思想还是蛮重要的,我们在源码中经常会遇到这种思想,比如说 LinkedHashMap 复用 HashMap 的能力,Set 复用 Map 的能力,还有此处的 DelayQueue 复用 PriorityQueue 的能力。小结一下,如果想要复用需要做到哪些:

  1. 需要把能遇见可复用的功能尽量抽象,并开放出可扩展的地方,比如说 HashMap 在操作数组的方法中,都给 LinkedHashMap 开放出很多 after 开头的方法,便于 LinkedHashMap 进行排序、删除等等;
  2. 采用组合或继承两种手段进行复用,比如 LinkedHashMap 采用的继承、 Set 和 DelayQueue 采用的组合,组合的意思就是把可复用的类给依赖进来。

2 演示

为了方便大家理解,写了一个演示的 demo,演示了一下:

public class DelayQueueDemo {// 队列消息的生产者static class Product implements Runnable {private final BlockingQueue queue;public Product(BlockingQueue queue) {this.queue = queue;}@Overridepublic void run() {try {log.info("begin put");long beginTime = System.currentTimeMillis();queue.put(new DelayedDTO(System.currentTimeMillis() + 2000L,beginTime));//延迟 2 秒执行queue.put(new DelayedDTO(System.currentTimeMillis() + 5000L,beginTime));//延迟 5 秒执行queue.put(new DelayedDTO(System.currentTimeMillis() + 1000L * 10,beginTime));//延迟 10 秒执行log.info("end put");} catch (InterruptedException e) {log.error("" + e);}}}// 队列的消费者static class Consumer implements Runnable {private final BlockingQueue queue;public Consumer(BlockingQueue queue) {this.queue = queue;}@Overridepublic void run() {try {log.info("Consumer begin");((DelayedDTO) queue.take()).run();((DelayedDTO) queue.take()).run();((DelayedDTO) queue.take()).run();log.info("Consumer end");} catch (InterruptedException e) {log.error("" + e);}}}@Data// 队列元素,实现了 Delayed 接口static class DelayedDTO implements Delayed {Long s;Long beginTime;public DelayedDTO(Long s,Long beginTime) {this.s = s;this.beginTime =beginTime;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(s - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}public void run(){log.info("现在已经过了{}秒钟",(System.currentTimeMillis() - beginTime)/1000);}}// demo 运行入口public static void main(String[] args) throws InterruptedException {BlockingQueue q = new DelayQueue();DelayQueueDemo.Product p = new DelayQueueDemo.Product(q);DelayQueueDemo.Consumer c = new DelayQueueDemo.Consumer(q);new Thread(c).start();new Thread(p).start();}
}
打印出来的结果如下:
06:57:50.544 [Thread-0] Consumer begin
06:57:50.544 [Thread-1] begin put
06:57:50.551 [Thread-1] end put
06:57:52.554 [Thread-0] 延迟了2秒钟才执行
06:57:55.555 [Thread-0] 延迟了5秒钟才执行
06:58:00.555 [Thread-0] 延迟了10秒钟才执行
06:58:00.556 [Thread-0] Consumer end

写这个代码的目的主要想演示一下延迟执行的例子,我们大概的思路是:

  1. 新建队列的元素,如 DelayedDTO,必须实现 Delayed 接口,我们在 getDelay 方法中实现了现在离过期时间还剩多久的方法。
  2. 定义队列元素的生产者,和消费者,对应着代码中的 Product 和 Consumer。
  3. 对生产者和消费者就行初始化和管理,对应着我们的 main 方法。

虽然这只是一个简单的 demo,但实际工作中,我们使用 DelayQueue 基本上就是这种思想,只不过写代码的时候会更加通用和周全,接下来我们来看下 DelayQueue 是如何实现 put 和 take 的。

3 放数据

我们以 put 为例,put 调用的是 offer 的方法,offer 的源码如下:

public boolean offer(E e) {final ReentrantLock lock = this.lock;// 上锁lock.lock();try {// 使用 PriorityQueue 的扩容,排序等能力q.offer(e);// 如果恰好刚放进去的元素正好在队列头// 立马唤醒 take 的阻塞线程,执行 take 操作// 如果元素需要延迟执行的话,可以使其更快的沉睡计时if (q.peek() == e) {leader = null;available.signal();}return true;} finally {// 释放锁lock.unlock();}
}

可以看到其实底层使用到的是 PriorityQueue 的 offer 方法,我们来看下:

// 新增元素
public boolean offer(E e) {// 如果是空元素的话,抛异常if (e == null)throw new NullPointerException();modCount++;int i = size;// 队列实际大小大于容量时,进行扩容// 扩容策略是:如果老容量小于 64,2 倍扩容,如果大于 64,50 % 扩容if (i >= queue.length)grow(i + 1);size = i + 1;// 如果队列为空,当前元素正好处于队头if (i == 0)queue[0] = e;else// 如果队列不为空,需要根据优先级进行排序siftUp(i, e);return true;
}
// 按照从小到大的顺序排列private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;// k 是当前队列实际大小的位置while (k > 0) {// 对 k 进行减倍int parent = (k - 1) >>> 1;Object e = queue[parent];// 如果 x 比 e 大,退出,把 x 放在 k 位置上if (key.compareTo((E) e) >= 0)break;// x 比 e 小,继续循环,直到找到 x 比队列中元素大的位置queue[k] = e;k = parent;}queue[k] = key;}

可以看到,PriorityQueue 的 offer 方法主要做了三件事情:

  1. 对新增元素进行判空;
  2. 对队列进行扩容,扩容策略和集合的扩容策略很相近;
  3. 根据元素的 compareTo 方法进行排序,我们希望最终排序的结果是从小到大的,因为我们想让队头的都是过期的数据,我们需要在 compareTo 方法里面实现:通过每个元素的过期时间进行排序,如下:
(int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));

这样便可实现越快过期的元素越能排到队头。

可以看到,新增数据时,只是使用到了 compareTo 方法,来对队列中的元素进行排序,接下来我们看下,取数据时,是如何操作的。

4 拿数据

取数据时,如果发现有元素的过期时间到了,就能拿出数据来,如果没有过期元素,那么线程就会一直阻塞,我们以 take 为例子,来看一下核心源码:

for (;;) {// 从队头中拿数据出来E first = q.peek();// 如果为空,说明队列中,没有数据,阻塞住if (first == null)available.await();else {// 获取队头数据的过期时间long delay = first.getDelay(NANOSECONDS);// 如果过期了,直接返回队头数据if (delay <= 0)return q.poll();// 引用置为 null ,便于 gc,这样可以让线程等待时,回收 first 变量first = null;// leader 不为空的话,表示当前队列元素之前已经被设置过阻塞时间了// 直接阻塞当前线程等待。if (leader != null)available.await();else {// 之前没有设置过阻塞时间,按照一定的时间进行阻塞Thread thisThread = Thread.currentThread();leader = thisThread;try {// 进行阻塞available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}
}

可以看到阻塞等待的功能底层使用的是锁的能力,这个我们在后面章节中会说到。

以上演示的 take 方法是会无限阻塞,直到队头的过期时间到了才会返回,如果不想无限阻塞,可以尝试 poll 方法,设置超时时间,在超时时间内,队头元素还没有过期的话,就会返回 null。

5 总结

DelayQueue 是非常有意思的队列,底层使用了排序和超时阻塞实现了延迟队列,排序使用的是 PriorityQueue 排序能力,超时阻塞使用得是锁的等待能力,可以看出 DelayQueue 其实就是为了满足延迟执行的场景,在已有 API 的基础上进行了封装,我们在工作中,可以学习这种思想,对已有的功能能复用的尽量复用,减少开发的工作量。

面试官系统精讲Java源码及大厂真题 - 21 DelayQueue 源码解析相关推荐

  1. 面试官系统精讲Java源码及大厂真题 - 34 只求问倒:连环相扣系列锁面试题

    34 只求问倒:连环相扣系列锁面试题 自信和希望是青年的特权. 引导语 面试中,问锁主要是两方面:锁的日常使用场景 + 锁原理,锁的日常使用场景主要考察对锁 API 的使用熟练度,看看你是否真的使用过 ...

  2. 面试官系统精讲Java源码及大厂真题 - 48 一起看过的 Java 源码和面试真题

    48 一起看过的 Java 源码和面试真题 不为了源码而读源码,只为了更好的实践 持续几个月,我们的专栏终于结束了,这篇总结篇,我们又想回到当初写这篇专栏的初心:我们不为读源码而读源码,只是为了更好的 ...

  3. 面试官系统精讲Java源码及大厂真题 - 36 从容不迫:重写锁的设计结构和细节

    36 从容不迫:重写锁的设计结构和细节 受苦的人,没有悲观的权利. --尼采 引导语 有的面试官喜欢让同学在说完锁的原理之后,让你重写一个新的锁,要求现场在白板上写出大概的思路和代码逻辑,这种面试题目 ...

  4. 面试官系统精讲Java源码及大厂真题 - 38 线程池源码面试题

    38 线程池源码面试题 与有肝胆人共事,从无字句处读书. --周恩来 引导语 线程池在日常面试中占比很大,主要是因为线程池内容涉及的知识点较广,比如涉及到队列.线程.锁等等,所以很多面试官喜欢把线程池 ...

  5. 面试官系统精讲Java源码及大厂真题 - 24 举一反三:队列在 Java 其它源码中的应用

    24 举一反三:队列在 Java 其它源码中的应用 世上无难事,只要肯登攀. 引导语 队列除了提供 API 供开发者使用外,自身也和 Java 中其他 API 紧密结合,比如线程池和锁,线程池直接使用 ...

  6. 面试官系统精讲Java源码及大厂真题 - 17 并发 List、Map源码面试题

    17 并发 List.Map源码面试题 梦想只要能持久,就能成为现实.我们不就是生活在梦想中的吗? 引导语 并发 List 和 Map 是技术面时常问的问题,问的问题也都比较深入,有很多问题都是面试官 ...

  7. 面试官系统精讲Java源码及大厂真题 - 11 HashSet、TreeSet 源码解析

    11 HashSet.TreeSet 源码解析 更新时间:2019-09-16 19:37:35 成功的奥秘在于目标的坚定. --迪斯雷利 引导语 HashSet.TreeSet 两个类是在 Map ...

  8. 面试官系统精讲Java源码及大厂真题 - 10 Map源码会问哪些面试题

    10 Map源码会问哪些面试题 更新时间:2019-09-10 10:34:08 人的一生可能燃烧也可能腐朽,我不能腐朽,我愿意燃烧起来! --奥斯特洛夫斯基 引导语 Map 在面试中,占据了很大一部 ...

  9. 面试官系统精讲Java源码及大厂真题 - 07 List 源码会问哪些面试题

    07 List 源码会问哪些面试题 勤学如春起之苗,不见其增,日有所长. --陶潜 引导语 List 作为工作中最常见的集合类型,在面试过程中,也是经常会被问到各种各样的面试题,一般来说,只要你看过源 ...

最新文章

  1. python模拟全部代码_Python模拟登陆实现代码
  2. HBase、Redis、MongoDB、Couchbase、LevelDB 五款主流NoSQL数据库大比拼
  3. Android API之android.net.wifi.WifiConfiguration
  4. python整数转换字符串_使用Python中的str()函数将整数值转换为字符串
  5. 五四,阿里巴巴新青年了解下?
  6. mysql开启权限控制_mysql开启远程访问及相关权限控制
  7. 面试官问你Java内存区域你用new创建对象来解释
  8. 双十一重磅福利来袭,拯救 “四大皆空” 的你!
  9. mysql 历史记录查询
  10. android studio2.4,Android Studio 2.4 Preview 7 发布
  11. vnr光学识别怎么打开_物流仓库安防监控系统安装的作用和功能
  12. excel表格怎么关闭函数计算机,excel表格的数据如何去掉公式-如何取消Excel表格里面的函数公式...
  13. 红蜘蛛10013端口被占用
  14. stm32花式点流水灯
  15. matlab 如何读取二进制、十六进制txt文档
  16. 【AWS云从业者基础知识笔记】——模块8:定价和支持
  17. VB / VBA 自制二维码小工具
  18. t微信小程序开发-获取微信运动步数
  19. 中科大何向南团队+快手App联合出品 KuaiRec | 快手首个稠密为99.6%的数据集 | 相关介绍、下载、处理、使用方法
  20. C#调用斑马打印机打印条码标签(支持COM/LPT/USB/ZPL/EPL/Bitmap)

热门文章

  1. MVVM教程(2):
  2. Magento教程 24:如何发送电子报! (Newsletter)
  3. Magento教程 5:系统安装与备份
  4. 品牌不可不知的YouTube的影片行销策略
  5. PHP结合HTML5使用FormData对象提交表单及上传图片
  6. 使用SSH更新WordPress
  7. 使用Google Custom Search打造站内搜索
  8. 分布式任务调度平台XXL-JOB搭建使用
  9. 通过docker build --build-arg 传值进 Dockerfile 动态构建容器
  10. 使用runnable创建线程