一、DelayQueue是什么

  DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

二、DelayQueue能做什么

 1. 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。 
 2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。

 3.关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

 4.缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

 5.任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。

三、实例展示

 定义元素类,作为队列的元素

 DelayQueue只能添加(offer/put/add)实现了Delayed接口的对象,意思是说我们不能想往DelayQueue里添加什么就添加什么,不能添加int、也不能添加String进去,必须添加我们自己的实现了Delayed接口的类的对象,来代码:

/***  compareTo 方法必须提供与 getDelay 方法一致的排序*/
class MyDelayedTask implements Delayed{private String name ;private long start = System.currentTimeMillis();private long time ;public MyDelayedTask(String name,long time) {this.name = name;this.time = time;}/*** 需要实现的接口,获得延迟时间   用过期时间-当前时间* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间* @param o* @return*/@Overridepublic int compareTo(Delayed o) {MyDelayedTask o1 = (MyDelayedTask) o;return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "MyDelayedTask{" +"name='" + name + '\'' +", time=" + time +'}';}
}

  其中,compareTo 方法 getDelay 方法 就是Delayed接口的方法,我们必须实现,而且按照JAVASE文档,compareTo 方法必须提供与 getDelay 方法一致的排序,也就是说compareTo方法里可以按照getDelay方法的返回值大小排序,即在compareTo方法里比较getDelay方法返回值大小

写main方法测试

  定义一个DelayQueue,添加几个元素,while循环获取元素

private static DelayQueue delayQueue  = new DelayQueue();public static void main(String[] args) throws InterruptedException {new Thread(new Runnable() {@Overridepublic void run() {delayQueue.offer(new MyDelayedTask("task1",10000));delayQueue.offer(new MyDelayedTask("task2",3900));delayQueue.offer(new MyDelayedTask("task3",1900));delayQueue.offer(new MyDelayedTask("task4",5900));delayQueue.offer(new MyDelayedTask("task5",6900));delayQueue.offer(new MyDelayedTask("task6",7900));delayQueue.offer(new MyDelayedTask("task7",4900));}}).start();while (true) {Delayed take = delayQueue.take();System.out.println(take);}}

执行结果

MyDelayedTask{name='task3', time=1900}
MyDelayedTask{name='task2', time=3900}
MyDelayedTask{name='task7', time=4900}
MyDelayedTask{name='task4', time=5900}
MyDelayedTask{name='task5', time=6900}
MyDelayedTask{name='task6', time=7900}
MyDelayedTask{name='task1', time=10000}

 DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法。

static class Task implements Delayed{@Override//比较延时,队列里元素的排序依据public int compareTo(Delayed o) {return 0;}@Override//获取剩余时间public long getDelay(TimeUnit unit) {return 0;}}

  元素进入队列后,先进行排序,然后,只有getDelay也就是剩余时间为0的时候,该元素才有资格被消费者从队列中取出来,所以构造函数一般都有一个时间传入。

具体另一个实例:

import java.sql.Time;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class Delayquue {public static void main(String[] args) throws Exception {BlockingQueue<Task> delayqueue = new DelayQueue<>();long now = System.currentTimeMillis();delayqueue.put(new Task(now+3000));delayqueue.put(new Task(now+4000));delayqueue.put(new Task(now+6000));delayqueue.put(new Task(now+1000));System.out.println(delayqueue);for(int i=0; i<4; i++) {System.out.println(delayqueue.take());}}static class Task implements Delayed{long time = System.currentTimeMillis();public Task(long time) {this.time = time;}@Overridepublic int compareTo(Delayed o) {if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1;else return 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return "" + time;}}
} 

输出结果:

  可以看出来,每隔一段时间就会输出一个元素,这个间隔时间就是由构造函数定义的秒数来决定的。


原理分析:

 内部结构

  • 可重入锁
  • 用于根据delay时间排序的优先级队列
  • 用于优化阻塞通知的线程元素leader
  • 用于实现阻塞和通知的Condition对象

delayed和PriorityQueue

 在理解delayQueue原理之前我们需要先了解两个东西,delayed和PriorityQueue.

  • delayed是一个具有过期时间的元素
  • PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列

  delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素

offer方法

  1. 执行加锁操作
  2. 吧元素添加到优先级队列中
  3. 查看元素是否为队首
  4. 如果是队首的话,设置leader为空,唤醒所有等待的队列
  5. 释放锁

代码如下:

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}

take方法

  1. 执行加锁操作
  2. 取出优先级队列元素q的队首
  3. 如果元素q的队首/队列为空,阻塞请求
  4. 如果元素q的队首(first)不为空,获得这个元素的delay时间值
  5. 如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法
  6. 如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露
  7. 判断leader元素是否为空,不为空的话阻塞当前线程
  8. 如果leader元素为空的话,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队首到达可以出队的时间,在finally块中释放leader元素的引用
  9. 循环执行从1~8的步骤
  10. 如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程
  11. 执行解锁操作
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}  

get点

 整个代码的过程中并没有使用上太难理解的地方,但是有几个比较难以理解他为什么这么做的地方

leader元素的使用

 大家可能看到在我们的DelayQueue中有一个Thread类型的元素leader,那么他是做什么的呢,有什么用呢?

 让我们先看一下元素注解上的doc描述:

Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.

 上面主要的意思就是说用leader来减少不必要的等待时间,那么这里我们的DelayQueue是怎么利用leader来做到这一点的呢:

 这里我们想象着我们有个多个消费者线程用take方法去取,内部先加锁,然后每个线程都去peek第一个节点.
 如果leader不为空说明已经有线程在取了,设置当前线程等待

if (leader != null)available.await();

 如果为空说明没有其他线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环

     else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) leader = null; } } 

take方法中为什么释放first元素

first = null; // don't retain ref while waiting

 我们可以看到doug lea后面写的注释,那么这段代码有什么用呢?

 想想假设现在延迟队列里面有三个对象。

  • 线程A进来获取first,然后进入 else 的else ,设置了leader为当前线程A
  • 线程B进来获取first,进入else的阻塞操作,然后无限期等待
  • 这时在JDK 1.7下面他是持有first引用的
  • 如果线程A阻塞完毕,获取对象成功,出队,这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first.
  • 假设还有线程C 、D、E.. 持有对象1引用,那么无限期的不能回收该对象1引用了,那么就会造成内存泄露.

链接:

https://www.jianshu.com/p/e0bcc9eae0ae
https://www.jianshu.com/p/bf9f6b08ba5b
https://blog.csdn.net/toocruel/article/details/82769595

转载于:https://www.cnblogs.com/myseries/p/10944211.html

DelayQueue详解相关推荐

  1. JUC学习 - 延迟队列 DelayQueue 详解

    1.DelayQueue基本特征 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>impl ...

  2. 【Live555】live555源码详解(二):BasicHashTable、DelayQueue、HandlerSet

    [Live555]live555源码详解系列笔记 3.BasicHashTable 哈希表 协作图: 3.1 BasicHashTable BasicHashTable 继承自 HashTable 重 ...

  3. 【Live555】live555源码详解系列笔记

    [Live555]liveMedia下载.配置.编译.安装.基本概念 [Live555]live555源码详解(一):BasicUsageEnvironment.UsageEnvironment [L ...

  4. ScheduledThreadPoolExecutor详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文主要分为两个部分,第一部分首先会对ScheduledThreadPoolExecutor进行简单的介绍,并且会介绍其主要A ...

  5. java多线程学习-java.util.concurrent详解

    http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...

  6. java 高并发第三阶段实战_Java 高并发第三阶段实战---Java并发包深入解析与使用详解...

    第三阶段的课程主要围绕着Java并发包的使用,展开详细的介绍,主要内容有1.原子包源码剖析,2.并发包工具类详细介绍,3.线程服务以及Future和callable等详细介绍,4.高并发容器和阻塞容器 ...

  7. 转:Java 7 种阻塞队列详解

    转自: Java 7 种阻塞队列详解 - 云+社区 - 腾讯云队列(Queue)是一种经常使用的集合.Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表. ...

  8. scheduledthreadpoolexecutor使用_ScheduledThreadPoolExecutor详解

    本文主要分为两个部分,第一部分首先会对ScheduledThreadPoolExecutor进行简单的介绍,并且会介绍其主要API的使用方式,然后介绍了其使用时的注意点,第二部分则主要对Schedul ...

  9. Zookeeper客户端Curator使用详解

    http://www.jianshu.com/p/70151fc0ef5d Zookeeper客户端Curator使用详解 简介 Curator是Netflix公司开源的一套zookeeper客户端框 ...

最新文章

  1. 适用于AMD ROC GPU的Numba概述
  2. 01-01java概述 doc命令、jdk\jre下载安装、path、classpath配置、开发中常见小问题
  3. Java黑皮书课后题第2章:*2.22(金融应用:货币单位)改写程序清单2-10,解决将double转int可能会造成精度损失问题。以整数值作为输入,其最后两位代表的是美分币值
  4. MaxCompute与OSS非结构化数据读写互通(及图像处理实例)
  5. 以ThreadStart方式实现多线程
  6. 【UOJ#246】套路(动态规划)
  7. Java和Android中的注解
  8. Hibernate之必须导入jar包
  9. (转)将cocos2dx项目从VS移植到Eclipse
  10. 随机森林RF、XGBoost、GBDT和LightGBM的原理和区别
  11. 【script】lambda的使用
  12. android 自定义加载动画效果,Android 自定义View修炼-自定义加载进度动画LoadingImageView...
  13. 移动互联网实时视频通讯之视频采集
  14. Altera下载器使用说明 Intel(Altera)FPGA高速下载器线PL-USB2-BLASTER中文详细使用手册
  15. win10如何桌面添加计算机,win10系统桌面怎么添加计算机等图标
  16. 如何通过一根网线连接两台电脑,实现数据的传输?
  17. 2019 秋季最新最全面 JAVA 面试题 附答案
  18. 在 V2EX 的开发环境里尝试了一下 OneAPM @livid
  19. 为什么手机八核心还会卡?
  20. Tensorflow中的masking和padding

热门文章

  1. Qtum量子链帅初受邀火星特训营面对面授课
  2. 英语写作学习笔录 task1 conclusion
  3. 脑芯编:窥脑究竟,织网造芯(二)
  4. 关于函数返回值的讨论与总结
  5. 了解模型、视图和控制器
  6. 在颜值上,我 Bootstrap 真的没怕过谁
  7. java连接mysql 不推荐_java连接mysql
  8. jsch设置代理_尽管在JSch中设置了STRICT_HOST_CHECKING,但仍获取UnknownHostKey异常
  9. 创建spring配置
  10. 利用死信交换机接收死信