目录

一、简介

二、常用API

三、示例

四、源码阅读

五、总结


一、简介

DelayQueue是JUC提供的一种无界延迟队列,它实现了BlockingQueue<E>阻塞队列接口,底层基于已有的PriorityBlockingQueue实现,类声明如下:

public class DelayQueue<E extends Delayed>
extends AbstractQueue<E>
implements BlockingQueue<E>

可见,DelayQueue队列中的元素必须继承自Delayed接口,Delayed接口继承自Comparable接口:

public interface Delayed extends Comparable<Delayed>

在Delayed接口中有一个方法Delayed用来获取队列中元素的剩余过期时间。

long getDelay(TimeUnit unit) : 在给定的时间单元中,返回与此对象关联的剩余延迟.

DelayQueue是延迟元素的无界阻塞队列,在该队列中,一个元素只能在过期时被获取。队列的头是延迟元素,其延迟在过去过期最远。如果元素没有过期,就没有head, poll将返回null。当元素的getDelay(timeunit, nanosecond)方法返回一个小于或等于零的值时,就会发生过期,此队列不允许空元素。

二、常用API

【a】构造方法:DelayQueue提供了两个构造方法:

DelayQueue()

创建一个最初为空的新DelayQueue

DelayQueue(Collection<? extends E> c)

创建一个DelayQueue,该队列最初包含给定的延迟实例集合的元素

【b】常用方法:

返回值类型

方法描述

boolean

add(E e)

将指定的元素插入此延迟队列

void

clear()

自动删除此延迟队列中的所有元素

int

drainTo(Collection<? super E> c)

从该队列中删除所有可用元素,并将它们添加到给定集合中

int

drainTo(Collection<? super E> c, int maxElements)

从该队列中最多删除给定数量的可用元素,并将它们添加到给定集合中

Iterator<E>

iterator()

返回此队列中所有元素(过期和未过期)的迭代器。

boolean

offer(E e)

将指定的元素插入此延迟队列

boolean

offer(E e, long timeout, TimeUnit unit)

将指定的元素插入此延迟队列

E

peek()

检索但不删除此队列的头,或在此队列为空时返回null

E

poll()

检索并删除此队列的头,如果此队列没有具有过期延迟的元素,则返回null

E

poll(long timeout, TimeUnit unit)

检索并删除此队列的头,如有必要,将一直等待,直到此队列中具有过期延迟的元素可用,或指定的等待时间过期

void

put(E e)

将指定的元素插入此延迟队列

int

remainingCapacity()

总是返回整数。MAX_VALUE,因为延迟队列没有容量限制

boolean

remove(Object o)

从此队列中删除指定元素的单个实例(如果存在),无论它是否已过期

int

size()

返回此集合中的元素数

E

take()

检索并删除此队列的头,如有必要,将一直等待,直到此队列上有一个具有过期延迟的元素可用为止。

Object[]

toArray()

返回一个包含此队列中所有元素的数组

<T> T[]

toArray(T[] a)

返回一个包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型

三、示例

下面通过一个简单的示例说明如何使用延迟队列DelayQueue:

public class T04_DelayQueue {public static void main(String[] args) {DelayQueue<DelayData> delayQueue = new DelayQueue<>();//生产者线程new Thread(() -> {for (int i = 0; i < 5; i++) {long currentTime = System.nanoTime();//返回指定原点(包括)和指定边界(排除)之间的伪随机长值long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L);DelayData delayData = new DelayData(currentTime + validTime);delayQueue.put(delayData);System.out.println(Thread.currentThread().getName() + ": put ->" + delayData);//模拟延时try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}, "生产者").start();//消费者线程new Thread(() -> {for (int i = 0; i < 5; i++) {try {System.out.println(Thread.currentThread().getName() + ": take -> " + delayQueue.take());Thread.yield();} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者").start();}
}/*** 队列元素必须实现Delayed接口,重写getDelay和compareTo方法*/
class DelayData implements Delayed {/*** 数据过期时间*/private long delayTime;public DelayData(long delayTime) {this.delayTime = delayTime;}/*** 返回剩余有效时间** @param unit 时间单位  纳秒*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.delayTime - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {if (o == this) {return 0;}if (o instanceof DelayData) {DelayData x = (DelayData) o;// 优先比较失效时间long diff = this.delayTime - x.delayTime;return diff < 0 ? -1 : diff > 0 ? 1 : 0;}long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}@Overridepublic String toString() {return "DelayData{" +"delayTime=" + delayTime +'}';}
}

运行结果:

生产者: put ->DelayData{delayTime=178412709161927}
生产者: put ->DelayData{delayTime=178418135952860}
消费者: take -> DelayData{delayTime=178412709161927}
生产者: put ->DelayData{delayTime=178417873829764}
生产者: put ->DelayData{delayTime=178415526587641}
生产者: put ->DelayData{delayTime=178417839279680}
消费者: take -> DelayData{delayTime=178415526587641}
消费者: take -> DelayData{delayTime=178417839279680}
消费者: take -> DelayData{delayTime=178417873829764}
消费者: take -> DelayData{delayTime=178418135952860}

从运行结果可以看出,消费者每次获取到的元素都是有效期最小的,且都是已经失效了的。

四、源码阅读

(1)重要属性说明

//可重入锁,保障线程安全
private final transient ReentrantLock lock = new ReentrantLock();
//优先级队列,用于存储元素,并按优先顺序
private final PriorityQueue<E> q = new PriorityQueue<E>();/*** 指定为等待队列头部的元素的线程.*/
private Thread leader = null;/*** 用于实现阻塞的Condition对象* 当一个新的元素在队列的最前面可用时,或者一个新线程需要成为leader时,就会发出条件信号*/
private final Condition available = lock.newCondition();

为了最小化不必要的时间等待,DelayQueue并不会让所有出队线程都无限等待,而是用leader保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦leader线程被唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在available条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的leader线程。这样,就避免了无效的等待,提升了性能。

(2)常用方法说明

【a】offer(E e)、put(E e)、offer(E e, long timeout, TimeUnit unit)、add(E e)

/*** 将指定的元素插入此延迟队列*/
public boolean add(E e) {return offer(e);
}/*** 将指定的元素插入此延迟队列*/
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {//底层调用优先级队列PriorityQueue的offer方法q.offer(e);//如果准备入队的元素在队首, 则唤醒一个出队线程if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}/*** 将指定的元素插入此延迟队列。因为队列是无界的,所以这个方法永远不会阻塞*/
public void put(E e) {offer(e);
}/*** 将指定的元素插入此延迟队列。因为队列是无界的,所以这个方法永远不会阻塞*/
public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);
}

【b】take()

/*** 检索并删除此队列的头,如有必要,将一直等待,直到此队列上有一个具有过期延迟的元素可用为止.*/
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {//调用PriorityQueue的peek()方法获取队首元素E first = q.peek();if (first == null)//如果队首元素为空,则阻塞当前线程available.await();else {//队首元素不为空,则获取对应的过期时间,单位为纳秒.long delay = first.getDelay(NANOSECONDS);//如果过期时间 <= 0,执行出队操作if (delay <= 0)return q.poll();//释放first的引用,避免内存泄漏 first = null;if (leader != null)//领导者线程为空,则无限阻塞当前线程available.await();else {//设置领导者线程为新的leader线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//限时等待当前线程available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// 不存在leader线程但队列中存在元素, 说明没有其他线程在等待, 则唤醒一个其它出队线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}

【c】poll()、poll(long timeout, TimeUnit unit)

/*** 检索并删除此队列的头,如果此队列没有具有过期延迟的元素,则返回null*/
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//调用优先级队列的peek方法出队E first = q.peek();//如果队首元素为空或者未过期,则返回nullif (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}
}/*** 检索并删除此队列的头,如有必要,将等待直到此队列上有一个具有过期延迟的元素可用,或指定的等待时间过期*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();//队首元素为空if (first == null) {if (nanos <= 0)return null;elsenanos = available.awaitNanos(nanos);} else { //队首元素不为空//获取队首元素的过期时间long delay = first.getDelay(NANOSECONDS);//已过期,执行出队操作if (delay <= 0)return q.poll();if (nanos <= 0)return null;//置空队首元素first = null; // don't retain ref while waitingif (nanos < delay || leader != null)//限时阻塞等待nanos = available.awaitNanos(nanos);else {//设置领导者线程为新的leader线程Thread thisThread = Thread.currentThread();leader = thisThread;try {long timeLeft = available.awaitNanos(delay);nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {// 不存在leader线程但队列中存在元素, 说明没有其他线程在等待, 则唤醒一个其它出队线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}

五、总结

DelayQueue是阻塞队列中非常有用的一种队列,经常被用于缓存或定时任务等的设计。以上就是关于DelayQueue延时队列的一些介绍和使用总结,如果不对之处,还请大家不吝指正。

并发编程学习之延时队列DelayQueue相关推荐

  1. libevent c++高并发网络编程_高并发编程学习(2)——线程通信详解

    前序文章 高并发编程学习(1)--并发基础 - https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-j ...

  2. java公平锁和非公平锁_java并发编程学习之再谈公平锁和非公平锁

    在java并发编程学习之显示锁Lock里有提过公平锁和非公平锁,我们知道他的使用方式,以及非公平锁的性能较高,在AQS源码分析的基础上,我们看看NonfairSync和FairSync的区别在什么地方 ...

  3. 高并发编程学习(2)——线程通信详解

    前序文章 高并发编程学习(1)--并发基础 - https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-j ...

  4. Java高并发编程学习(三)java.util.concurrent包

    简介 我们已经学习了形成Java并发程序设计基础的底层构建块,但对于实际编程来说,应该尽可能远离底层结构.使用由并发处理的专业人士实现的较高层次的结构要方便得多.要安全得多.例如,对于许多线程问题,可 ...

  5. java并发编程学习一

    java并发编程学习一 什么是进程和线程? 进程是操作系统进行资源分配的最小单位 进程跟进程之间的资源是隔离的,同一个进程之中的线程可以共享进程的资源. 线程是进程的一个实体,是CPU 调度和分派的基 ...

  6. java中解决脏读_java并发编程学习之脏读代码示例及处理

    使用interrupt()中断线程     当一个线程运行时,另一个线程可以调用对应的Thread对象的interrupt()方法来中断它,该方法只是在目标线程中设置一个标志,表示它已经被中断,并立即 ...

  7. JAVA中的延时队列DelayQueue

    Java中的延时队列DelayQueue是基于优先队列PriorityQueue实现的. 注:PriorityQueue是基于堆(Heap)实现的.堆(Heap)在本质上是一个数组. Priority ...

  8. java并行任务,Java 并发编程学习(五):批量并行执行任务的两种方式

    Java 并发编程学习(五):批量并行执行任务的两种方式 背景介绍 有时候我们需要执行一批相似的任务,并且要求这些任务能够并行执行.通常,我们的需求会分为两种情况: 并行执行一批任务,等待耗时最长的任 ...

  9. Java并发编程学习记录

    Java并发编程汇总 并发问题的分解 多线程并发的特性 volatile 在并发编程中可能出现的问题: 管程 wait() 的正确姿势 notify() 何时可以使用 在使用多线程编程的时候,开启多少 ...

  10. Java并发编程学习 + 原理分析(建议收藏)

    总结不易,如果对你有帮助,请点赞关注支持一下 微信搜索程序dunk,关注公众号,获取博客源码 Doug Lea是一个无私的人,他深知分享知识和分享苹果是不一样的,苹果会越分越少,而自己的知识并不会因为 ...

最新文章

  1. HTML5实践 -- 使用css装饰你的图片画廊 - part2
  2. 使用JQuery完成仿百度的信息提示
  3. Redis的编译安装
  4. [skill] C与C++对于类型转换的验证
  5. 设计模式:Abstract Factory和Builder(比较区别,个人认为讲得很明白)
  6. UE3 后期处理编辑器用户指南
  7. 冯诺依曼体系下 计算机主机不包括,计算机组装第一张测试
  8. java中ares框架_ARES辅助开发工具-用户手册.doc
  9. Nginx之11吸星大法 - (页面缓存) 1
  10. JavaScript:面向对象简单实例——图书馆
  11. go-micro 框架初探
  12. 最受欢迎的网管工具集
  13. TOGAF9.2 第I部分 第1章简介
  14. label怎么换行 vb_C#与VB.NET换行符的对比及某些string在label能正常换行,在textbox不能换行的问题...
  15. swift搭建苹果软件模版代码
  16. 写在博士旅程之前|博士第一年|博士第三年|博士第四年
  17. 2022年阿里云服务器租用价格表(最新收费标准及活动价格表)
  18. 功分器和耦合器以及合路器的区别
  19. html实训QQ音乐官网首页制作
  20. 关于笔记本连接显示器检测不到的问题(NoVideoInput)

热门文章

  1. 2015年c语言等级考试题1 10 2分,2015年计算机二级《C语言》精选练习题及答案(2)...
  2. python %号_python基础集结号
  3. 阿里云云计算6 ECS的概念
  4. NumPy库—random模块
  5. android.mk 依赖关系,Android NDK学习(二):编译脚本语法Android.mk和Application.mk
  6. 2021-08-26BERT: Pre-training of Deep Bidirectional Transformers forLanguage Understanding
  7. 433.最小基因变化
  8. 341.扁平化嵌套列表迭代器
  9. 释放空间后将指针置空
  10. 【Codeforces Round #585 (Div. 2) E】Marbles【状压DP】