概述

DelayQueue是一个支持时延获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue可以运用在以下两个应用场景:

  1. 缓存系统的设计:使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,就表示有缓存到期了。
  2. 定时任务调度:使用DelayQueue保存当天要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如Tiner就是使用DelayQueue实现的。

用法实例

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author admin*/
public class Message implements Delayed {/***触发时间*/private long time;/***名称*/String name;public Message(String name,long time,TimeUnit unit){this.name = name;this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {Message item = (Message) o;long diff = this.time - item.time;if (diff <= 0){return  -1;}else{return 1;}}@Overridepublic String toString() {return DelayQueueDemo.printDate() + "Message{" + "time=" + time + ", name=" + name + "/" + "}";}
}
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;/*** @author admin*/
public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {Message item1 = new Message("消息1",5, TimeUnit.SECONDS);Message item2 = new Message("消息2",10, TimeUnit.SECONDS);Message item3 = new Message("消息3",15, TimeUnit.SECONDS);DelayQueue<Message> queue  = new DelayQueue<Message>();queue.add(item1);queue.add(item2);queue.add(item3);int queueLengh = queue.size();System.out.println(printDate() + "开始!");for (int i = 0; i < queueLengh; i++) {Message take = queue.take();System.out.format(printDate() + " 消息出队,属性name=%s%n",take.name);}System.out.println(printDate() + "结束!");}static String printDate(){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return sdf.format(new Date());}
}

DelayQueue声明

DelayQueue声明如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E>

从DelayQueue声明可以看出,DelayQueue中的元素必须是Delayed接口的子类。

Delayed声明如下:

public interface Delayed extends Comparable<Delayed> {/***以给定的时间单位返回与此对象关联的剩余延迟*/long getDelay(TimeUnit unit);
}

DelayQueue属性

/**
*可重入锁
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
*缓存元素的优先级队列
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
*特定的用于等待队列头中元素的线程
*Leader-Follower模式的变体形式
*用于最小化不必要的定时等待
*/
private Thread leader = null;
/**
*当更新的元素在队列的开头变得可用时
*或在新线程可能需要成为领导者时,会发出条件信号
*/
private final Condition available = lock.newCondition();

以上可以看出,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。

DelayQueue构造器

/**
*默认构造器
*/
public DelayQueue() {}
/**
*添加集合c中所有元素到队列中
*/
public DelayQueue(Collection<? extends E> c) {this.addAll(c);
}

DelayQueue入队

/*
*将指定元素插入此延时队列
*/
public boolean add(E e) {return offer(e);
}
/*
*将指定元素插入此延时队列
*由于队列是无界的,因此该方法将永远不会被阻塞
*/
public void put(E e) {offer(e);
}
/*
*将指定元素插入此延时队列
*由于队列是无界的,因此该方法将永远不会被阻塞
*/
public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);
}

以上几个方法都会调用offer()方法。

    public boolean offer(E e) {//获取可重入锁final ReentrantLock lock = this.lock;//可重入锁加锁lock.lock();try {//调用优先级队列的offer()方法入队q.offer(e);//如果入队元素在队首,则唤醒一个出队线程if (q.peek() == e) {leader = null;available.signal();}//返回入队成功return true;} finally {//解锁lock.unlock();}}

leader是等待获取队列头元素的线程,应用主从式设计减少不必要的等待。如果leader不为空,表示已经有线程在等待获取队列的头元素。所以,通过await()方法让出当前线程等待信号。如果leader为空,则把当前线程设置为leader,当一个线程为leader,它使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。

DelayQueue出队

poll()方法

 /**检索并删除次队列的头*如果此队列没有延迟过期的元素,则返回null*/public E poll() {//获取可重入锁final ReentrantLock lock = this.lock;//可重入锁加锁lock.lock();try {//检索但不删除队列头部元素E first = q.peek();//如果first为null或者返回与此对象关联的剩余延迟时间大于0//返回nullif (first == null || first.getDelay(NANOSECONDS) > 0)return null;else//否则通过优先队列poll()方法出队return q.poll();} finally {//可重入锁解锁lock.unlock();}}

take()方法

 /**检索并除去此队列的头*等待直到该队列上具有过期延迟的元素可用*/public E take() throws InterruptedException {//获取可重入锁final ReentrantLock lock = this.lock;//可重入锁加锁lock.lockInterruptibly();try {for (;;) {//检索但不删除队列头部元素E first = q.peek();//如果first为空if (first == null)//在available条件上等待available.await();else {//如果first非空//获取first的剩余延迟时间long delay = first.getDelay(NANOSECONDS);//如果delay小于等于0if (delay <= 0)//延迟时间到期,获取并删除头部元素return q.poll();//如果delay大于0,即延迟时间未到期//将first置为nullfirst = null; //如果leader线程非空if (leader != null)//当前线程无限期阻塞//等待leader线程唤醒available.await();else {//如果leader线程为空//获取当前线程Thread thisThread = Thread.currentThread();//是当前线程成为leader线程leader = thisThread;try {//当前线程等待剩余延迟时间available.awaitNanos(delay);} finally {//如果当前线程是leader线程//释放leader线程if (leader == thisThread)leader = null;}}}}} finally {//如果leader线程为null并且队列不为空//说明没有其他线程在等待,那就通知条件队列if (leader == null && q.peek() != null)//通过signal()方法唤醒一个出队线程available.signal();//解锁lock.unlock();}}

take()方法总结:

  1. 获取锁。
  2. 取出优先级队列q的首元素。
  3. 如果元素q的队首为空则阻塞。
  4. 如果元素q的队首(first)不为空;获取这个元素的delay时间值,如果first的延迟delay时间小于等于0,说明元素已经到了可以使用的时间,调用poll()方法弹出该元素,跳出方法。
  5. 如果first的延迟delay时间大于0,释放元素first的引用,避免内存泄漏。
  6. 如果延迟delay时间大于0,leader非空,当前线程等待。
  7. 如果延迟delay时间大于0,leader为空,将当前线程设置为leader线程,等待剩余时间。
  8. 自旋,循环以上操作,直到return。

重载poll()方法

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {//获取等待时间long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();//如果first为空if (first == null) {//如果nanos小于等于0if (nanos <= 0)//返回nullreturn null;else//如果nanos大于0//等待nanos时间nanos = available.awaitNanos(nanos);} else {//如果队首非空//获取first的剩余延迟时间long delay = first.getDelay(NANOSECONDS);//如果delay小于等于0if (delay <= 0)//延迟时间到期,获取并删除头部元素return q.poll();//如果delay大于0//如果nanos小于等于0if (nanos <= 0)//返回nullreturn null;//如果delay大于0且nanos大于0//first置为nullfirst = null; //如果nanos小于delay或者leader非空if (nanos < delay || leader != null)//等待delay时间nanos = available.awaitNanos(nanos);else {//如果nanos大于等于delay或者leader为空//获取当前线程Thread thisThread = Thread.currentThread();//设置当前线程为leaderleader = thisThread;try {//等待delay时间long timeLeft = available.awaitNanos(delay);//修改nanosnanos -= delay - timeLeft;} finally {//如果当前线程为leader线程//释放leader线程if (leader == thisThread)leader = null;}}}}} finally {//如果leader为null并且队列不为空//说明没有其他线程在等待,那就通知条件队列if (leader == null && q.peek() != null)//通过singnal()方法唤醒一个出队线程available.signal();//解锁lock.unlock();}}

DelayQueue相关推荐

  1. 【Java并发编程】20、DelayQueue实现订单的定时取消

    当订单定时取消需要修改数据库订单状态,但是怎么确定订单什么时候应该改变状态,解决方案有下面两种:  第一种,写个定时器去每分钟扫描数据库,这样更新及时,但是如果数据库数据量大的话,会对数据库造成很大的 ...

  2. java加载c库阻塞_【死磕Java並發】-----J.U.C之阻塞隊列:DelayQueue

    DelayQueue是一個支持延時獲取元素的無界阻塞隊列.里面的元素全部都是"可延期"的元素,列頭的元素是最先"到期"的元素,如果隊列里面沒有元素到期,是不能從 ...

  3. 每日一博 - DelayQueue阻塞队列源码解读

    文章目录 Pre DelayQueue特征 Leader/Followers模式 DelayQueue源码分析 类继承关系 核心方法 成员变量 构造函数 入队方法 offer(E e) 出队方法 po ...

  4. Java DelayQueue延迟队列的使用和源码分析

    文章目录 概述 示例 原理分析 概述 DelayQueue 是JAVA提供的延时队列,队列内部的对象必须实现 Delayed 接口,该接口只有一个 getDelay 方法,返回延迟执行的时长. pub ...

  5. 使用DelayQueue 和 FutureTask 实现java中的缓存

    使用DelayQueue.ConcurrentHashMap.FutureTask实现的缓存工具类. DelayQueue 简介 DelayQueue是一个支持延时获取元素的无界阻塞队列.DelayQ ...

  6. DelayQueue源码

    介绍 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素.只有延时期满后才能从队列中获取元素.(DelayQueue可以运用在 ...

  7. DelayQueue详解

    一.DelayQueue是什么 DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对 ...

  8. java中DelayQueue的使用

    文章目录 简介 DelayQueue DelayQueue的应用 总结 java中DelayQueue的使用 简介 今天给大家介绍一下DelayQueue,DelayQueue是BlockingQue ...

  9. 阻塞队列之七:DelayQueue延时队列

    一.DelayQueue简介 是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的(PriorityQueue实际 ...

  10. delayqueue_在DelayQueue中更改延迟,从而更改顺序

    delayqueue 因此,我正在考虑构建一个简单的对象缓存,该缓存在给定时间后会使对象过期. 显而易见的机制是使用Java并发包中的DelayedQueue类. 但是我想知道是否有可能在将对象添加到 ...

最新文章

  1. OpenGL 几何着色器Geometry Shader
  2. 如何开始使用centos_如何开始使用CentOS
  3. CombineFileInputFormat 文件分片总结
  4. 数据不平衡问题及解决方案
  5. idea 注释模板_常用的模板函数
  6. zedboard的DDR3型号MT41K128M16HA
  7. 5.7 Universal Transformers
  8. 欧姆龙CP-X显示 END重复 以及 条 0 -重叠条
  9. c语言对称矩阵的压缩存储_对称矩阵的压缩存储和输出
  10. UE4 Ultra Dynamic Sky 参数翻译及功能概述
  11. java 实现soa_Java实现SOA的标准途径
  12. 2021年常规赛NBA球员数据分析
  13. Vue 安装@vue/cli报错npmERR gyp ERR
  14. numpy弧度制和角度制转换deg2rad, rad2deg
  15. echarts 乡镇级地图制作办法
  16. ‘primordial is not defined‘ node 报错解决方法 终极篇!!
  17. 修改服务器的返回数据,使用charles 修改服务器返回数据
  18. 北京建行个人信贷客户资信调查函.doc
  19. PhoneRescue for Mac(iOS数据恢复软件)
  20. C++ 机房预约系统

热门文章

  1. win7美化_win7/8/10桌面插件美化
  2. 计算机网络工具软件包括,计算机网络常用工具软件
  3. 我们为什么存在于三维空间而不是四维空间
  4. 思科Packet Tracer基础使用教程
  5. 使用机器学习算法打造一个简单的“微博指数”
  6. 数据分析 | 异常数据识别小结
  7. 移动前端开发和web前端开发有什么区别
  8. Minecraft 1.18.1、1.18.2模组开发 12.动态物品材质
  9. [转]挑礼物指南:价格不高却有格调的礼物,送给挑礼物困难症的你(多图预警)...
  10. 开源项目推荐:SCADA组态软件Qt,kanzi,C#,MFC和WEB大全(收藏版)