文章目录

  • Pre
  • DelayQueue特征
  • Leader/Followers模式
  • DelayQueue源码分析
    • 类继承关系
    • 核心方法
    • 成员变量
    • 构造函数
    • 入队方法
      • offer(E e)
    • 出队方法
      • poll()
      • poll(long timeout, TimeUnit unit)
      • take()
      • peek
  • 小结


Pre

每日一博 - 延时任务的多种实现方式解读

DelayQueue 由优先级支持的、基于时间的调度队列,内部使用非线程安全的优先队列(PriorityQueue)实现,而无界队列基于数组的扩容实现。

在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

创建队列

-

BlockingQueue<String> blockingQueue = new DelayQueue();

入队的对象必须要实现 Delayed接口,而Delayed集成自 Comparable 接口。

Delayed 接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期。该接口强制实现下列两个方法。

  • compareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。让元素按激活日期排队
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。

DelayQueue特征

  • DelayQueue的泛型参数需要实现Delayed接口

Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。

  • DelayQueue不允许包含null元素。


Leader/Followers模式

  • 有若干个线程(一般组成线程池)用来处理大量的事件

  • 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠

  • 假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件

  • 唤醒的追随者作为新的领导者等待事件的发生

  • 处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者

  • 假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。

所以线程会有三种身份中的一种:leader 和 follower,以及一个干活中的状态:processser。

基本原则就 永远最多只有一个 leader。

而所有 follower 都在等待成为 leader。

线程池启动时会自动产生一个 Leader 负责等待网络 IO 事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将被其提拔为新的 Leader ,然后自己就去干活了,去处理这个网络事件,处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。

这种方法可以增强 CPU高速缓存相似性,以及消除动态内存分配和线程间的数据交换。


DelayQueue源码分析

类继承关系

核心方法

成员变量

DelayQueue 通过组合一个PriorityQueue 来实现元素的存储以及优先级维护,通过ReentrantLock 来保证线程安全,通过Condition 来判断是否可以取数据,对于leader 后面再来分析它的作用

// 可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 存储元素的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();// 获取数据 等待线程标识
private Thread leader = null;// 条件控制,表示是否可以从队列中取数据
private final Condition available = lock.newCondition();

构造函数


DelayQueue 内部组合PriorityQueue,对元素的操作都是通过PriorityQueue 来实现的,DelayQueue 的构造方法很简单,对于PriorityQueue 都是使用的默认参数,不能通过DelayQueue 来指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要实现Comparable ,因此不需要指定的Comparator。

/**
* 无参构造函数
*/
public DelayQueue() {}/**
* 通过集合初始化
*/
public DelayQueue(Collection<? extends E> c) {this.addAll(c);
}


入队方法

虽然提供入队的接口方式很多,实际都是调用的offer 方法,通过PriorityQueue 来进行入队操作,入队超时方法并没有其超时功能。

  • add(E e),将指定的元素插入到此队列中,在成功时返回 true

  • put(E e),将指定的元素插入此队列中,队列达到最大值,则抛oom异常

  • offer(E e),将指定的元素插入到此队列中,在成功时返回 true

  • offer(E e, long timeout, TimeUnit unit),指定一个等待时间将元素放入队列中并没有意义


offer(E e)

    /*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true}* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}

将指定的元素插入到此队列中,在成功时返回 true,其他几个方法内部都调用了offer 方法,我们也可以直接调用offer 方法来完成入队操作。

peek并不一定是当前添加的元素,队头是当前添加元素,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的一个线程,通知他们队列里面有元素了。

public boolean offer(E e) {final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {//通过PriorityQueue 来将元素入队q.offer(e);//peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了if (q.peek() == e) {leader = null;// 唤醒通知available.signal();}return true;} finally {// 解锁lock.unlock();}
}


出队方法

  • poll(),获取并移除此队列的头,如果此队列为空,则返回 null
  • poll(long timeout, TimeUnit unit),获取并移除此队列的头部,在指定的等待时间前等待
  • take(),获取并移除此队列的头部,在元素变得可用之前一直等待
  • peek(),调用此方法,可以返回队头元素,但是元素并不出队

poll()

获取并移除此队列的头,如果此队列为空,则返回 null

public E poll() {final ReentrantLock lock = this.lock;// 获取同步锁lock.lock();try {// 获取队头元素E first = q.peek();// 如果对头为null 或者 延时还没有到,则返回 nullif (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll(); // 否则元素出队} finally {lock.unlock();}
}

poll(long timeout, TimeUnit unit)

获取并移除此队列的头部,在指定的等待时间前等待。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 超时等待时间long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 获取可中断锁lock.lockInterruptibly();try // 无限循环for (;;) {// 获取队头元素E first = q.peek// 队头为空,也就是队列为空if (first == null) {// 达到超时指定时间,返回nullif (nanos <= 0)return null;else// 如果还没有超时,那么在available条件上进行等待nanos时间nanos = available.awaitNanos(nanos);} else {// 获取元素延迟时间long delay = first.getDelay(NANOSECONDS);// 延时到期if (delay <= 0)return q.poll(); // 返回出队元素// 延时未到期,超时到期,返回nullif (nanos <= 0)return null;first = null; // don't retain ref while waiting// 超时等待时间 < 延迟时间 或者 有其他线程再取数据if (nanos < delay || leader != null)// 在available条件上进行等待nanos时间nanos = available.awaitNanos(nanos);else {// 超时等待时间 > 延迟时间 // 并且没有其他线程在等待,那么当前元素成为leader,表示 leader线程最早 正在等待获取元素Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待 延迟时间 超时long timeLeft = available.awaitNanos(delay);// 还需要继续等待 nanosnanos -= delay - timeLeft;} finally {// 清除 leaderif (leader == thisThread)leader = null;}}}}} finally {// 唤醒阻塞在 available 的一个线程,表示可以取数据了if (leader == null && q.peek() != null)available.signal// 释放锁lock.unlock();}
}

梳理一下

  1. 如果队列为空,如果超时时间未到,则进行等待,否则返回null
  2. 队列不为空,取出队头元素,如果延迟时间到来,则返回元素,否则如果超时时间到返回null
  3. 超时时间未到,并且超时时间 < 延迟时间 或者 有线程正在获取元素,那么进行等待
  4. 超时时间 > 延迟时间,那么肯定可以取到元素,设置 leader为当前线程,等待延迟时间到期。

需要注意的时Condition 条件在阻塞时会释放锁,在被唤醒时会再次获取锁,获取成功才会返回。 当进行超时等待时,阻塞在Condition 上后会释放锁,一旦释放了锁,那么其它线程就有可能参与竞争,某一个线程就可能会成为leader(参与竞争的时间早,并且能在等待时间内能获取到队头元素那么就可能成为leader) leader是用来减少不必要的竞争,如果leader不为空说明已经有线程在取了,设置当前线程等待即可。

leader 就是一个信号,告诉其它线程:你们不要再去获取元素了,它们延迟时间还没到期,我都还没有取到数据呢,你们要取数据,等我取了再说

take()

获取并移除此队列的头部,在元素变得可用之前一直等待

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 获取可中断锁lock.lockInterruptibly();try {for (;;) {// 获取队头元素E first = q.peek();// 队头元素为空,则阻塞等待if (first == null)available.await();else {// 获取元素延迟时间long delay = first.getDelay(NANOSECONDS);// 延时到期if (delay <= 0)return q.poll(); // 返回出队元素first = null; // don't retain ref while waiting// 如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待延迟时间到期available.awaitNanos(delay);} finally {//清除 leaderif (leader == thisThread)leader = null;}}}}} finally {//唤醒阻塞在available 的一个线程,表示可以取数据了if (leader == null && q.peek() != null)available.signal();// 释放锁lock.unlock();}
}

该方法就是相当于在前面的超时等待中,把超时时间设置为无限大,那么这样只要队列中有元素,要是元素延迟时间要求,那么就可以取出元素,否则就直接等待元素延迟时间到期,再取出元素,最先参与等待的线程会成为leader。

peek

调用此方法,可以返回队头元素,但是元素并不出队。

public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {//返回队列头部元素,元素不出队return q.peek();} finally {lock.unlock();}
}

小结

  • DelayQueue 内部通过组合PriorityQueue 来实现存储和维护元素顺序的;
  • DelayQueue 存储元素必须实现Delayed 接口,通过实现Delayed 接口,可以获取到元素延迟时间,以及可以比较元素大小(Delayed 继承Comparable);
  • DelayQueue 通过一个可重入锁来控制元素的入队出队行为;
  • DelayQueue 中leader 标识 用于减少线程的竞争,表示当前有其它线程正在获取队头元素;
  • PriorityQueue 只是负责存储数据以及维护元素的顺序,对于延迟时间取数据则是在DelayQueue 中进行判断控制的;
  • DelayQueue 没有实现序列化接口

每日一博 - DelayQueue阻塞队列源码解读相关推荐

  1. 并发编程5:Java 阻塞队列源码分析(下)

    上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...

  2. 并发-阻塞队列源码分析

    阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...

  3. Java并发包源码学习系列:LBD双端阻塞队列源码解析

    尝试将节点加入到first之前,更新first,如果插入之后超出容量,返回false. private boolean linkFirst(Node node) { // assert lock.is ...

  4. Alamofire源码解读系列(十二)之请求(Request)

    本篇是Alamofire中的请求抽象层的讲解 前言 在Alamofire中,围绕着Request,设计了很多额外的特性,这也恰恰表明,Request是所有请求的基础部分和发起点.这无疑给我们一个Req ...

  5. Alamofire源码解读系列(七)之网络监控(NetworkReachabilityManager)

    Alamofire源码解读系列(七)之网络监控(NetworkReachabilityManager) 本篇主要讲解iOS开发中的网络监控 前言 在开发中,有时候我们需要获取这些信息: 手机是否联网 ...

  6. Alamofire源码解读系列(九)之响应封装(Response)

    本篇主要带来Alamofire中Response的解读 前言 在每篇文章的前言部分,我都会把我认为的本篇最重要的内容提前讲一下.我更想同大家分享这些顶级框架在设计和编码层次究竟有哪些过人的地方?当然, ...

  7. Alamofire源码解读系列(五)之结果封装(Result)

    本篇讲解Result的封装 前言 有时候,我们会根据现实中的事物来对程序中的某个业务关系进行抽象,这句话很难理解.在Alamofire中,使用Response来描述请求后的结果.我们都知道Alamof ...

  8. SSD源码解读1-数据层AnnotatedDataLayer

    前言 年后到现在,利用自己的业余时间断断续续将caffe的SSD源码看完了,虽然中间由于工作原因暂停了一段时间,但最终还算顺利完成了,SSD源码的阅读也是今年的年度计划中比较重要的一项内容,完成了还是 ...

  9. Alamofire源码解读系列(十一)之多表单(MultipartFormData)

    本篇讲解跟上传数据相关的多表单 前言 我相信应该有不少的开发者不明白多表单是怎么一回事,然而事实上,多表单确实很简单.试想一下,如果有多个不同类型的文件(png/txt/mp3/pdf等等)需要上传给 ...

最新文章

  1. Minimax 和 Alpha-beta 剪枝算法简介,及以此实现的井字棋游戏(Tic-tac-toe)
  2. python自动测试p-Python-selenium-自动化测试模型
  3. python数据包的作用_使用Python将登录数据包发送到Minecraft服务器不起作用
  4. matlab逆变换法产生随机数_matlab中产生随机数的程序
  5. 活在无尽梦境的后续 β
  6. 《复杂》读书笔记(part1)--一些思想是由简单的思想组合而成,我称此为复杂
  7. Java EE 8的前5个新功能
  8. burpsuite小米手机抓包_使用burpsuite实现Android APP的HTTPS抓包
  9. 向圣诞老人和他的精灵学习Google Analytics(分析)
  10. 高效pycharm使用技巧_您是否正在使用这种高效的采访技巧?
  11. Lesson 2 Gradient Desent
  12. Android Studio生成签名文件和自动签名
  13. 以太坊 智能合约IDE 在线 Solidity IDE
  14. 高等数学(第七版)同济大学 习题7-3 个人解答
  15. 织梦采集插件-简单好用织梦采集插件
  16. 硕士学位论文多级标题编号与图表编号
  17. 1g1h1m mysql_mysql服务器优化
  18. 如何通过压缩视频软件,减少大小且画质无损技巧
  19. 微软研发中心招聘的背后
  20. dedecms织梦后台不显示模块管理和辅助插件

热门文章

  1. java 中required_通过实例学习Spring @Required注释原理
  2. android 之ListView的布局填充器
  3. matlab中所遇到的问题,【总结】【matlab】【机器学习】学习过程中遇到的问题总结...
  4. python交互式程序设计导论第二周_沧州学堂云Python 交互式程序设计导论搜题公众号...
  5. Linux下CMake简明教程(三)同一目录下多个源文件
  6. tensorflow加载模型
  7. python super理解(二)
  8. 图像识别 43个模型
  9. 分计算iv值_S71200PLC模拟量编程方法与计算原理
  10. 背景和文字分离的matlab实现