1、DelayQueue基本特征

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E>

DelayQueue延迟队列同时具备:无界队列、阻塞队列、优先队列的特征。分别来看下:

  • 无界队列:通过调用DelayQueue的offer方法(或add方法),把待执行的任务对象放入队列,该方法是非阻塞的。这个队列是无界队列,内存足够的情况下,理论上存放的任务对象数是无限的。
  • 阻塞队列:DelayQueue实现了BlockingQueue接口,是一个阻塞队列。但该队列只是在取对象时阻塞,对应两个方法:
    1. take()方法,获取并移除队列头的对象,如果时间还未到,就阻塞等待。
    2. poll(long timeout, TimeUnit unit) 方法,阻塞时间长度为timeout,然后获取并移除队列头的对象,如果对象延迟时间还未到,就返回null。
  • 优先队列:DelayQueue的一个重要的成员是一个优先队列PriorityQueue,PriorityQueue内部是一个二叉小顶堆实现,其特点就是头部元素对应的权值是队列中最小的,也就是通过poll()方法获取到的对象是最优先的。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();

2、Delayed接口

DelayQueue延迟队列中存放的对象,必须是实现Delayed接口的类对象。Delayed接口,是Comparable的子类:

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

所有要实现Delayed接口必须重写其getDelay、compareTo方法。

看一个实现例子:

package com.juc.queue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author TyuIn* @version 1.0* @description* @date 2021/12/1 8:41*/
public class TaskInfo implements Delayed {/*** 任务id*/private int id;/*** 业务类型*/private int type;/*** 业务数据*/private String data;/*** 执行时间*/private long executeTime;public TaskInfo(int id, int type, String data, long executeTime) {this.id = id;this.type = type;this.data = data;this.executeTime = TimeUnit.NANOSECONDS.convert(executeTime, TimeUnit.MILLISECONDS) + System.nanoTime();}public int getId() {return id;}public void setId(int id) {this.id = id;}public int getType() {return type;}public void setType(int type) {this.type = type;}public String getData() {return data;}public void setData(String data) {this.data = data;}public long getExecuteTime() {return executeTime;}public void setExecuteTime(long executeTime) {this.executeTime = executeTime;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.executeTime - System.nanoTime() , TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(@Nonnull Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));}}

通过DelayQueue的offer方法加入对象时,会根据对象compareTo方法把对象放到优先队列PriorityQueue中的指定位置;

通过DelayQueue的take方法获取对象时,会调用对象的getDelay方法,确定延迟获取时间,需要注意的是这里的时间单位为纳秒,示例代码中通过unit.convert(this.excuteTime - System.nanoTime() , TimeUnit.NANOSECONDS)进行转换。

3、DelayQueue使用示例

package com.juc.queue;import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** @author TyuIn* @version 1.0* @description* @date 2021/12/1 8:49*/
public class DelayQueueTest {/*** 延迟队列*/private static DelayQueue<TaskInfo> queue = new DelayQueue<>();/*** 3个线程的线程池*/private static ExecutorService es =  Executors.newFixedThreadPool(3);public static void main(String[] args){while (true) {try {if (queue.size() <= 0){// 获取任务放入队列getTask();if(queue.size() <= 0){System.out.println("没有任务睡眠10秒");// 没有任务睡眠10秒TimeUnit.SECONDS.sleep(10);}}else{TaskInfo task = queue.take();es.submit(()->{System.out.println("执行任务:" + task.getId() + ":" + task.getData());});}}catch (Exception e){e.printStackTrace();}}}/*** 模拟从数据库获取 将来10秒中内即将执行的任务*/public static void getTask(){Random r = new Random();int t = r.nextInt(2);if(t==0){return;}TaskInfo t1 = new TaskInfo(1,1,"任务1",1000);TaskInfo t2 = new TaskInfo(2,2,"任务2",2000);TaskInfo t3 = new TaskInfo(3,3,"任务3",3000);TaskInfo t4 = new TaskInfo(4,4,"任务4",4000);TaskInfo t5 = new TaskInfo(5,5,"任务5",5000);TaskInfo t6 = new TaskInfo(6,6,"任务6",6000);TaskInfo t7 = new TaskInfo(7,7,"任务7",7000);TaskInfo t8 = new TaskInfo(8,8,"任务8",8000);queue.offer(t1);queue.offer(t2);queue.offer(t3);queue.offer(t4);queue.offer(t5);queue.offer(t6);queue.offer(t7);queue.offer(t8);}}

示例代码讲解:

  1. 首先创建了一个DelayQueue的延迟队列;并通过Executors.newFixedThreadPool(3)创建了一个3个线程数的线程池。
  2. main方法循环体中判断如果队列中没有对象,就模拟从数据库中获取10秒内即将执行的任务,并放入DelayQueue。如果数据库中没有10秒内即将执行的任务,程序睡眠10秒。
  3. 如果队列中有对象,调用DelayQueue的take()方法,获取到期的任务信息,并把任务信息交给线程池进行处理。

实例中,模拟创建了8个任务,每个任务的延迟执行时间分别为1到8秒。

执行main方法,每隔1秒打印一条信息,打印完整信息如下:

执行任务:1:任务1
执行任务:2:任务2
执行任务:3:任务3
执行任务:4:任务4
执行任务:5:任务5
执行任务:6:任务6
执行任务:7:任务7
执行任务:8:任务8

4、DelayQueue源码解析

首先看下DelayQueue的成员变量:

// 为了保证线程安全:对队列中每次存取操作,都需要进行加锁,采用的重入锁
private final transient ReentrantLock lock = new ReentrantLock();// 优先队列,延迟对象最终放到该队列中,保证每次从头部取出的对象,是应该最先被执行的
private final PriorityQueue<E> q = new PriorityQueue<E>();// leader线程,其等待延迟时间为优先队列中,最优先对象的延迟时间。其他线程无限期等待
private Thread leader = null;// 配合重入锁使用,对线程进行等待,唤醒等操作
private final Condition available = lock.newCondition();

4.1 三个加入队列方法

add、offer、put 三个加入队列方法,其中add和put都是直接调用offer方法,所以调用三个方法中的任意一个都是等效的。首先看下offer方法:

public boolean offer(E e) {final ReentrantLock lock = this.lock;// 加锁  lock.lock();try {// 调用PriorityQueue的offer方法,放入队列  q.offer(e);// 判断刚加入的对象,是不是头节点  if (q.peek() == e) {leader = null;// 唤醒 take() 或 poll(..) 方法中阻塞的线程  available.signal();}return true;} finally {// 释放锁lock.unlock();}
}/*** PriorityQueue 中的 peek() 方法*/
public E peek() {return (size == 0) ? null : (E) queue[0];
}

这个方法最值得关注的地方是,放入队列后,判断刚放入队列的对象是不是PriorityQueue队列的头节点,如果是需要唤醒take()poll(..)方法中的等待阻塞,重新获取头节点对象的延迟等待时间。

add、put方法都是直接调用offer方法,源码为:

public boolean add(E e) {return offer(e);
}public void put(E e) {offer(e);
}

4.2 四个获取对象方法

poll()、poll(..)、take()、peek()这四个方法都可以实现从队列头获取一个对象,但每个方法实现都不相同。

(1)peek()方法:非阻塞方法
public E peek() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {return q.peek();} finally {// 释放锁lock.unlock();}
}

DelayQueue的peek方法,本质上调用的是PriorityQueue的peek方法,只是多了一个加锁操作。该方法会返回头部节点对象,但不会从队列中移除。peek的含义为:瞟一眼。

(2)poll()方法:从队列头部获取并移除一个对象,非阻塞方法
public E poll() {final ReentrantLock lock = this.lock;// 加锁 lock.lock();try {// 获取队列中的头节点对象(不会移除)  E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)// 调用对象的getDelay方法,如果延迟时间还未到,直接返回空  return null;else  // 如果延迟时间已经到达,直接调用PriorityQueue队列的取出并移除的poll方法return q.poll();} finally {// 释放锁lock.unlock();}
}

poll()方法 首先调用peek方法获取到头节点对象,通过调用对象的getDelay方法判断延迟时间是否到达,如果没有到达返回null,否则调用PriorityQueue的poll方法 取出并移除头节点对象 并返回。

(3)take()方法:DelayQueue的核心方法,常用于任务延迟执行,是阻塞方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加中断锁lock.lockInterruptibly();try {for (;;) {// 获取头节点 E first = q.peek();if (first == null) // 头结点为空,释放锁无限期等待,等待offer方法放入对象,再次获得锁available.await();else {// 获取头节点对象延迟时间 long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 延迟时间已过,直接从队列中移除并取出返回 return q.poll();first = null; // don't retain ref while waitingif (leader != null)// 如果不是leader线程,无限期等待  available.await();else {Thread thisThread = Thread.currentThread();// 设置当前线程为leader线程leader = thisThread;try {// 释放锁,等待头结点延迟时间到来,再获得锁。available.awaitNanos(delay);} finally {if (leader == thisThread)// 释放leader线程引用leader = null;}}}}} finally {if (leader == null && q.peek() != null)// 唤醒某一个线程,获得锁,设置leader线程 available.signal();// 释放锁lock.unlock();}
}

take方法主要实现逻辑为(for循环体):

  1. 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
  2. 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
  3. 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
  4. 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
  5. finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。

take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。

这也是为什么在DelayQueue中都是使用signal唤醒,而不使用signalAll的原因(只需要一个线程成为leader线程)。

这个图,展示有3个线程调用DelayQueue的take方法,只会有一个线程成为”leader线程”,这里假设为线程1。其他两个线程为“追随者”,无限期等待,在”leader线程”执行完成之后调用signal方法随机唤醒一个线程成为新的”leader线程”。

(4)poll(…)方法:带延迟参数的poll方法,是阻塞方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 将时间转为纳秒表示long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 加中断锁lock.lockInterruptibly();try {for (;;) {// 获取头节点 E first = q.peek();if (first == null) {if (nanos <= 0)return null; // 指定延迟时间 小于0直接返回null  else// 等待 指定延迟时间后,再重新获得锁  nanos = available.awaitNanos(nanos);} else {// 获取头节点对象的延迟时间 long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 如果对象延迟时间已过期,直接取出并移除该对象,返回return q.poll();if (nanos <= 0)// 如果对象延迟时间还未到,但指定延迟时间已到,返回nullreturn null;first = null; // don't retain ref while waitingif (nanos < delay || leader != null)// 如果"指定延迟时间"小于"对象延迟时间"或者不是leader线程// 等待指定时间后 再次被唤醒。  nanos = available.awaitNanos(nanos);else { // 如果"指定延迟时间"大于等于"对象延迟时间"并且 leader线程为空  Thread thisThread = Thread.currentThread();// 指定当前线程为leader线程  leader = thisThread;try {long timeLeft = available.awaitNanos(delay);// 重新计算最新的 "指定延迟时间"nanos -= delay - timeLeft;} finally {if (leader == thisThread)// 释放leader线程引用leader = null;}}}}} finally {if (leader == null && q.peek() != null)// leader线程执行结束后,唤醒某个“追随者”线程  available.signal();// 释放锁lock.unlock();}
}

poll(…) 方法: 如果指定的延迟时间,小于头结点对象的延迟时间,返回为空,非阻塞。

如果指定的延迟时间,大于头结点对象的延迟时间,会阻塞,阻塞长度为头结点对象的延迟时间。

这样说会比较抽象,看一个例子:

package com.juc.queue;import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;/*** @author TyuIn* @version 1.0* @description* @date 2021/12/1 9:40*/
public class DelayQueueTest1 {/*** 延迟队列*/private static DelayQueue<TaskInfo> queue = new DelayQueue<>();public static void main(String[] args) throws Exception{// 初始化队列getTask();// 启动线程数for(int i=0;i<3;i++){new Thread(new Runnable() {@Overridepublic void run() {try {// 延迟时间TaskInfo task = queue.poll(10000, TimeUnit.MILLISECONDS);if(task == null){System.out.println("任务为空");}else {System.out.println("执行任务:" + task.getId() + ":" + task.getData());}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}/*** 模拟从数据库获取 将来10秒中内即将执行的任务*/public static void getTask(){TaskInfo t1 = new TaskInfo(1,1,"任务1",1000);TaskInfo t2 = new TaskInfo(2,2,"任务2",2000);TaskInfo t3 = new TaskInfo(3,3,"任务3",3000);TaskInfo t4 = new TaskInfo(4,4,"任务4",4000);TaskInfo t5 = new TaskInfo(5,5,"任务5",5000);TaskInfo t6 = new TaskInfo(6,6,"任务6",6000);TaskInfo t7 = new TaskInfo(7,7,"任务7",7000);TaskInfo t8 = new TaskInfo(8,8,"任务8",8000);queue.offer(t1);queue.offer(t2);queue.offer(t3);queue.offer(t4);queue.offer(t5);queue.offer(t6);queue.offer(t7);queue.offer(t8);}}

该实例会启动3个线程同时调用queue.poll(10000, TimeUnit.MILLISECONDS)方法,其中一个线程会被设置为“leader线程”,等待时间为头结点的延迟时间,其他线程的等待时间都为10000ms。

当“leader线程”执行完成后,会选择另外某个线程做为“leader线程”并且将等待时间改为当前头结点的延迟时间。

执行这段代码的main方法,会每隔1秒打印一条信息,完整打印信息如下:

执行任务:1:任务1
执行任务:2:任务2
执行任务:3:任务3

如果把指定延迟时间改为500,即:queue.poll(500, TimeUnit.MILLISECONDS),重新执行main()方法,该方法返回为空,这时不会阻塞,并立即打印三条消息:

任务为空
任务为空
任务为空

poll(…)方法的使用场景为:按指定时间段,分批次执行延迟队列中的任务。

从源码上看,在指定延迟时间大于头节点对象延迟时间时的实现 跟take()方法很像,只是“追随者线程”的等待时间有区别:poll(…)方法是等待指定延迟时间,take()方法是无限期等待。

4.3 其他方法

public boolean remove(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {return q.remove(o);} finally {lock.unlock();}
}public void clear() {final ReentrantLock lock = this.lock;lock.lock();try {q.clear();} finally {lock.unlock();}
}

文章参考:http://www.itsoku.com/ 博主觉得这个文章的内容挺不错的,感兴趣的可以去了解一下。

JUC学习 - 延迟队列 DelayQueue 详解相关推荐

  1. java多线程学习-java.util.concurrent详解

    http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...

  2. ucos 消息队列代码详解_用python实现 多进程队的列数据处理详解,零基础记得都收藏哦

    今天就为大家分享一篇python 多进程队列数据处理详解,具有很好的参考价值,希望对大家有所帮助.喜欢的话记得点赞转发关注不迷路哦!!! 总之了写到多进程队列数据处理问题,也就不多废话了,直接来上代码 ...

  3. BlockingQueue(阻塞队列)详解

    推荐:Java并发编程汇总 BlockingQueue(阻塞队列)详解 原文地址 BlockingQueue 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程 ...

  4. c语言将AOE网络的数据写入TXT文档中,数据结构与算法学习辅导及习题详解.张乃孝版-C/C++文档类资源...

    数据结构与算法学习辅导及习题详解.张乃孝版.04年10月 经过几年的努力,我深深体会到,编写这种辅导书要比编写一本湝通教材困难得多. 但愿我的上述理想,在本书中能够得以体现. 本书的组织 本书继承了& ...

  5. 消息队列超详解(以RabbitMQ和Kafka为例,为何使用消息队列、优缺点、高可用性、问题解决)

    消息队列超详解(以RabbitMQ和Kafka为例) 为什么要用消息队列这个东西? 先说一下消息队列的常见使用场景吧,其实场景有很多,但是比较核心的有3个:解耦.异步.削峰. 解耦:现场画个图来说明一 ...

  6. python 消息队列 get是从队首还是队尾取东西_python分布式爬虫中消息队列知识点详解...

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的 ...

  7. python多进程队列中的队列_python 多进程队列数据处理详解

    我就废话不多说了,直接上代码吧! # -*- coding:utf8 -*- import paho.mqtt.client as mqtt from multiprocessing import P ...

  8. ELK学习笔记之Logstash详解

    0x00 Logstash概述 官方介绍:Logstash is an open source data collection engine with real-time pipelining cap ...

  9. expect学习笔记及实例详解【转】

    1. expect是基于tcl演变而来的,所以很多语法和tcl类似,基本的语法如下所示: 1.1 首行加上/usr/bin/expect 1.2 spawn: 后面加上需要执行的shell命令,比如说 ...

最新文章

  1. html表格联动,html前端基础:table和select操作
  2. Binder跨进程通信原理(一):动态内核加载模块
  3. 11. java 抽象类
  4. Boring Partition(CF-239D)
  5. C++实现整数值转中文大写
  6. C/C++ 笔试、面试题目大汇总收藏(上)
  7. MySQL下载安装、配置与使用(win7x64)
  8. 中科院-杨力祥视频教程 02课程
  9. 电脑垃圾太多?这几个清理电脑的软件来看看吗?
  10. 广东第一高中生_广东男篮签下全美第一高中生 NBA状元热门征战CBA
  11. global 与 $GLOBALS用法
  12. 2022年sublime安装教程超简单
  13. 流利阅读 2019.2.22 Duke University apologizes over professor’s email asking Chinese students to speak En
  14. 计算机策略组 网络,组策略
  15. 214情人节,使用微信小程序【信鸽相知】写情书吧
  16. 考研英语 长难句训练day65
  17. 自然语言处理(NLP)的基本概念 (未完待续)
  18. Featuretools快速使用指南--看这一篇就够了
  19. 百度董事长李彦宏的理念
  20. iconfont添加新图标_老项目中的iconfont字体图标添加新的图标

热门文章

  1. 解决Visual C++2008安装失败,error 1935
  2. 用这个酷炫数据地图,老板口中别人家的可视化大屏你也能搞定
  3. html展示列表,如何在HTML中展示列表?
  4. kido机器人没反应_机器人示教器常见故障及解决方案
  5. Word论文中关于章、节、图、表、公式自动编号及引用
  6. 通过JS方式实现隐藏手机号码中间4位数
  7. win7访问共享文件提示:禁用当前账户
  8. 讯飞-糖尿病遗传风险检测挑战赛
  9. 基于物体路标的仿人机器人实时里程计
  10. Spring Cloud学习笔记(三)