JUC学习 - 延迟队列 DelayQueue 详解
1、DelayQueue基本特征
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E>
DelayQueue延迟队列同时具备:无界队列、阻塞队列、优先队列的特征。分别来看下:
- 无界队列:通过调用DelayQueue的offer方法(或add方法),把待执行的任务对象放入队列,该方法是非阻塞的。这个队列是无界队列,内存足够的情况下,理论上存放的任务对象数是无限的。
- 阻塞队列:DelayQueue实现了BlockingQueue接口,是一个阻塞队列。但该队列只是在取对象时阻塞,对应两个方法:
take()
方法,获取并移除队列头的对象,如果时间还未到,就阻塞等待。poll(long timeout, TimeUnit unit)
方法,阻塞时间长度为timeout,然后获取并移除队列头的对象,如果对象延迟时间还未到,就返回null。
- 优先队列:DelayQueue的一个重要的成员是一个优先队列PriorityQueue,PriorityQueue内部是一个二叉小顶堆实现,其特点就是头部元素对应的权值是队列中最小的,也就是通过
poll()
方法获取到的对象是最优先的。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();
2、Delayed接口
DelayQueue延迟队列中存放的对象,必须是实现Delayed接口的类对象。Delayed接口,是Comparable的子类:
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}
所有要实现Delayed接口必须重写其getDelay、compareTo方法。
看一个实现例子:
package com.juc.queue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author TyuIn* @version 1.0* @description* @date 2021/12/1 8:41*/
public class TaskInfo implements Delayed {/*** 任务id*/private int id;/*** 业务类型*/private int type;/*** 业务数据*/private String data;/*** 执行时间*/private long executeTime;public TaskInfo(int id, int type, String data, long executeTime) {this.id = id;this.type = type;this.data = data;this.executeTime = TimeUnit.NANOSECONDS.convert(executeTime, TimeUnit.MILLISECONDS) + System.nanoTime();}public int getId() {return id;}public void setId(int id) {this.id = id;}public int getType() {return type;}public void setType(int type) {this.type = type;}public String getData() {return data;}public void setData(String data) {this.data = data;}public long getExecuteTime() {return executeTime;}public void setExecuteTime(long executeTime) {this.executeTime = executeTime;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.executeTime - System.nanoTime() , TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(@Nonnull Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));}}
通过DelayQueue的offer
方法加入对象时,会根据对象compareTo方法把对象放到优先队列PriorityQueue中的指定位置;
通过DelayQueue的take
方法获取对象时,会调用对象的getDelay
方法,确定延迟获取时间,需要注意的是这里的时间单位为纳秒,示例代码中通过unit.convert(this.excuteTime - System.nanoTime() , TimeUnit.NANOSECONDS)
进行转换。
3、DelayQueue使用示例
package com.juc.queue;import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** @author TyuIn* @version 1.0* @description* @date 2021/12/1 8:49*/
public class DelayQueueTest {/*** 延迟队列*/private static DelayQueue<TaskInfo> queue = new DelayQueue<>();/*** 3个线程的线程池*/private static ExecutorService es = Executors.newFixedThreadPool(3);public static void main(String[] args){while (true) {try {if (queue.size() <= 0){// 获取任务放入队列getTask();if(queue.size() <= 0){System.out.println("没有任务睡眠10秒");// 没有任务睡眠10秒TimeUnit.SECONDS.sleep(10);}}else{TaskInfo task = queue.take();es.submit(()->{System.out.println("执行任务:" + task.getId() + ":" + task.getData());});}}catch (Exception e){e.printStackTrace();}}}/*** 模拟从数据库获取 将来10秒中内即将执行的任务*/public static void getTask(){Random r = new Random();int t = r.nextInt(2);if(t==0){return;}TaskInfo t1 = new TaskInfo(1,1,"任务1",1000);TaskInfo t2 = new TaskInfo(2,2,"任务2",2000);TaskInfo t3 = new TaskInfo(3,3,"任务3",3000);TaskInfo t4 = new TaskInfo(4,4,"任务4",4000);TaskInfo t5 = new TaskInfo(5,5,"任务5",5000);TaskInfo t6 = new TaskInfo(6,6,"任务6",6000);TaskInfo t7 = new TaskInfo(7,7,"任务7",7000);TaskInfo t8 = new TaskInfo(8,8,"任务8",8000);queue.offer(t1);queue.offer(t2);queue.offer(t3);queue.offer(t4);queue.offer(t5);queue.offer(t6);queue.offer(t7);queue.offer(t8);}}
示例代码讲解:
- 首先创建了一个DelayQueue的延迟队列;并通过Executors.newFixedThreadPool(3)创建了一个3个线程数的线程池。
- main方法循环体中判断如果队列中没有对象,就模拟从数据库中获取10秒内即将执行的任务,并放入DelayQueue。如果数据库中没有10秒内即将执行的任务,程序睡眠10秒。
- 如果队列中有对象,调用DelayQueue的take()方法,获取到期的任务信息,并把任务信息交给线程池进行处理。
实例中,模拟创建了8个任务,每个任务的延迟执行时间分别为1到8秒。
执行main方法,每隔1秒打印一条信息,打印完整信息如下:
执行任务:1:任务1
执行任务:2:任务2
执行任务:3:任务3
执行任务:4:任务4
执行任务:5:任务5
执行任务:6:任务6
执行任务:7:任务7
执行任务:8:任务8
4、DelayQueue源码解析
首先看下DelayQueue的成员变量:
// 为了保证线程安全:对队列中每次存取操作,都需要进行加锁,采用的重入锁
private final transient ReentrantLock lock = new ReentrantLock();// 优先队列,延迟对象最终放到该队列中,保证每次从头部取出的对象,是应该最先被执行的
private final PriorityQueue<E> q = new PriorityQueue<E>();// leader线程,其等待延迟时间为优先队列中,最优先对象的延迟时间。其他线程无限期等待
private Thread leader = null;// 配合重入锁使用,对线程进行等待,唤醒等操作
private final Condition available = lock.newCondition();
4.1 三个加入队列方法
add、offer、put
三个加入队列方法,其中add和put都是直接调用offer方法,所以调用三个方法中的任意一个都是等效的。首先看下offer方法:
public boolean offer(E e) {final ReentrantLock lock = this.lock;// 加锁 lock.lock();try {// 调用PriorityQueue的offer方法,放入队列 q.offer(e);// 判断刚加入的对象,是不是头节点 if (q.peek() == e) {leader = null;// 唤醒 take() 或 poll(..) 方法中阻塞的线程 available.signal();}return true;} finally {// 释放锁lock.unlock();}
}/*** PriorityQueue 中的 peek() 方法*/
public E peek() {return (size == 0) ? null : (E) queue[0];
}
这个方法最值得关注的地方是,放入队列后,判断刚放入队列的对象是不是PriorityQueue队列的头节点,如果是需要唤醒take()
或poll(..)
方法中的等待阻塞,重新获取头节点对象的延迟等待时间。
add、put方法都是直接调用offer方法,源码为:
public boolean add(E e) {return offer(e);
}public void put(E e) {offer(e);
}
4.2 四个获取对象方法
poll()、poll(..)、take()、peek()
这四个方法都可以实现从队列头获取一个对象,但每个方法实现都不相同。
(1)peek()方法:非阻塞方法
public E peek() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {return q.peek();} finally {// 释放锁lock.unlock();}
}
DelayQueue的peek方法,本质上调用的是PriorityQueue的peek方法,只是多了一个加锁操作。该方法会返回头部节点对象,但不会从队列中移除。peek的含义为:瞟一眼。
(2)poll()方法:从队列头部获取并移除一个对象,非阻塞方法
public E poll() {final ReentrantLock lock = this.lock;// 加锁 lock.lock();try {// 获取队列中的头节点对象(不会移除) E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)// 调用对象的getDelay方法,如果延迟时间还未到,直接返回空 return null;else // 如果延迟时间已经到达,直接调用PriorityQueue队列的取出并移除的poll方法return q.poll();} finally {// 释放锁lock.unlock();}
}
poll()方法 首先调用peek方法获取到头节点对象,通过调用对象的getDelay方法判断延迟时间是否到达,如果没有到达返回null,否则调用PriorityQueue的poll方法 取出并移除头节点对象 并返回。
(3)take()方法:DelayQueue的核心方法,常用于任务延迟执行,是阻塞方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加中断锁lock.lockInterruptibly();try {for (;;) {// 获取头节点 E first = q.peek();if (first == null) // 头结点为空,释放锁无限期等待,等待offer方法放入对象,再次获得锁available.await();else {// 获取头节点对象延迟时间 long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 延迟时间已过,直接从队列中移除并取出返回 return q.poll();first = null; // don't retain ref while waitingif (leader != null)// 如果不是leader线程,无限期等待 available.await();else {Thread thisThread = Thread.currentThread();// 设置当前线程为leader线程leader = thisThread;try {// 释放锁,等待头结点延迟时间到来,再获得锁。available.awaitNanos(delay);} finally {if (leader == thisThread)// 释放leader线程引用leader = null;}}}}} finally {if (leader == null && q.peek() != null)// 唤醒某一个线程,获得锁,设置leader线程 available.signal();// 释放锁lock.unlock();}
}
take方法主要实现逻辑为(for循环体):
- 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
- 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
- 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
- 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
- finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。
take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。
这也是为什么在DelayQueue中都是使用signal唤醒,而不使用signalAll的原因(只需要一个线程成为leader线程)。
这个图,展示有3个线程调用DelayQueue的take方法,只会有一个线程成为”leader线程”,这里假设为线程1。其他两个线程为“追随者”,无限期等待,在”leader线程”执行完成之后调用signal方法随机唤醒一个线程成为新的”leader线程”。
(4)poll(…)方法:带延迟参数的poll方法,是阻塞方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 将时间转为纳秒表示long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 加中断锁lock.lockInterruptibly();try {for (;;) {// 获取头节点 E first = q.peek();if (first == null) {if (nanos <= 0)return null; // 指定延迟时间 小于0直接返回null else// 等待 指定延迟时间后,再重新获得锁 nanos = available.awaitNanos(nanos);} else {// 获取头节点对象的延迟时间 long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 如果对象延迟时间已过期,直接取出并移除该对象,返回return q.poll();if (nanos <= 0)// 如果对象延迟时间还未到,但指定延迟时间已到,返回nullreturn null;first = null; // don't retain ref while waitingif (nanos < delay || leader != null)// 如果"指定延迟时间"小于"对象延迟时间"或者不是leader线程// 等待指定时间后 再次被唤醒。 nanos = available.awaitNanos(nanos);else { // 如果"指定延迟时间"大于等于"对象延迟时间"并且 leader线程为空 Thread thisThread = Thread.currentThread();// 指定当前线程为leader线程 leader = thisThread;try {long timeLeft = available.awaitNanos(delay);// 重新计算最新的 "指定延迟时间"nanos -= delay - timeLeft;} finally {if (leader == thisThread)// 释放leader线程引用leader = null;}}}}} finally {if (leader == null && q.peek() != null)// leader线程执行结束后,唤醒某个“追随者”线程 available.signal();// 释放锁lock.unlock();}
}
poll(…) 方法: 如果指定的延迟时间,小于头结点对象的延迟时间,返回为空,非阻塞。
如果指定的延迟时间,大于头结点对象的延迟时间,会阻塞,阻塞长度为头结点对象的延迟时间。
这样说会比较抽象,看一个例子:
package com.juc.queue;import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;/*** @author TyuIn* @version 1.0* @description* @date 2021/12/1 9:40*/
public class DelayQueueTest1 {/*** 延迟队列*/private static DelayQueue<TaskInfo> queue = new DelayQueue<>();public static void main(String[] args) throws Exception{// 初始化队列getTask();// 启动线程数for(int i=0;i<3;i++){new Thread(new Runnable() {@Overridepublic void run() {try {// 延迟时间TaskInfo task = queue.poll(10000, TimeUnit.MILLISECONDS);if(task == null){System.out.println("任务为空");}else {System.out.println("执行任务:" + task.getId() + ":" + task.getData());}} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}/*** 模拟从数据库获取 将来10秒中内即将执行的任务*/public static void getTask(){TaskInfo t1 = new TaskInfo(1,1,"任务1",1000);TaskInfo t2 = new TaskInfo(2,2,"任务2",2000);TaskInfo t3 = new TaskInfo(3,3,"任务3",3000);TaskInfo t4 = new TaskInfo(4,4,"任务4",4000);TaskInfo t5 = new TaskInfo(5,5,"任务5",5000);TaskInfo t6 = new TaskInfo(6,6,"任务6",6000);TaskInfo t7 = new TaskInfo(7,7,"任务7",7000);TaskInfo t8 = new TaskInfo(8,8,"任务8",8000);queue.offer(t1);queue.offer(t2);queue.offer(t3);queue.offer(t4);queue.offer(t5);queue.offer(t6);queue.offer(t7);queue.offer(t8);}}
该实例会启动3个线程同时调用queue.poll(10000, TimeUnit.MILLISECONDS)
方法,其中一个线程会被设置为“leader线程”,等待时间为头结点的延迟时间,其他线程的等待时间都为10000ms。
当“leader线程”执行完成后,会选择另外某个线程做为“leader线程”并且将等待时间改为当前头结点的延迟时间。
执行这段代码的main方法,会每隔1秒打印一条信息,完整打印信息如下:
执行任务:1:任务1
执行任务:2:任务2
执行任务:3:任务3
如果把指定延迟时间改为500,即:queue.poll(500, TimeUnit.MILLISECONDS)
,重新执行main()方法,该方法返回为空,这时不会阻塞,并立即打印三条消息:
任务为空
任务为空
任务为空
poll(…)方法的使用场景为:按指定时间段,分批次执行延迟队列中的任务。
从源码上看,在指定延迟时间大于头节点对象延迟时间时的实现 跟take()方法很像,只是“追随者线程”的等待时间有区别:poll(…)方法是等待指定延迟时间,take()方法是无限期等待。
4.3 其他方法
public boolean remove(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {return q.remove(o);} finally {lock.unlock();}
}public void clear() {final ReentrantLock lock = this.lock;lock.lock();try {q.clear();} finally {lock.unlock();}
}
文章参考:http://www.itsoku.com/ 博主觉得这个文章的内容挺不错的,感兴趣的可以去了解一下。
JUC学习 - 延迟队列 DelayQueue 详解相关推荐
- java多线程学习-java.util.concurrent详解
http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...
- ucos 消息队列代码详解_用python实现 多进程队的列数据处理详解,零基础记得都收藏哦
今天就为大家分享一篇python 多进程队列数据处理详解,具有很好的参考价值,希望对大家有所帮助.喜欢的话记得点赞转发关注不迷路哦!!! 总之了写到多进程队列数据处理问题,也就不多废话了,直接来上代码 ...
- BlockingQueue(阻塞队列)详解
推荐:Java并发编程汇总 BlockingQueue(阻塞队列)详解 原文地址 BlockingQueue 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程 ...
- c语言将AOE网络的数据写入TXT文档中,数据结构与算法学习辅导及习题详解.张乃孝版-C/C++文档类资源...
数据结构与算法学习辅导及习题详解.张乃孝版.04年10月 经过几年的努力,我深深体会到,编写这种辅导书要比编写一本湝通教材困难得多. 但愿我的上述理想,在本书中能够得以体现. 本书的组织 本书继承了& ...
- 消息队列超详解(以RabbitMQ和Kafka为例,为何使用消息队列、优缺点、高可用性、问题解决)
消息队列超详解(以RabbitMQ和Kafka为例) 为什么要用消息队列这个东西? 先说一下消息队列的常见使用场景吧,其实场景有很多,但是比较核心的有3个:解耦.异步.削峰. 解耦:现场画个图来说明一 ...
- python 消息队列 get是从队首还是队尾取东西_python分布式爬虫中消息队列知识点详解...
当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的 ...
- python多进程队列中的队列_python 多进程队列数据处理详解
我就废话不多说了,直接上代码吧! # -*- coding:utf8 -*- import paho.mqtt.client as mqtt from multiprocessing import P ...
- ELK学习笔记之Logstash详解
0x00 Logstash概述 官方介绍:Logstash is an open source data collection engine with real-time pipelining cap ...
- expect学习笔记及实例详解【转】
1. expect是基于tcl演变而来的,所以很多语法和tcl类似,基本的语法如下所示: 1.1 首行加上/usr/bin/expect 1.2 spawn: 后面加上需要执行的shell命令,比如说 ...
最新文章
- html表格联动,html前端基础:table和select操作
- Binder跨进程通信原理(一):动态内核加载模块
- 11. java 抽象类
- Boring Partition(CF-239D)
- C++实现整数值转中文大写
- C/C++ 笔试、面试题目大汇总收藏(上)
- MySQL下载安装、配置与使用(win7x64)
- 中科院-杨力祥视频教程	02课程
- 电脑垃圾太多?这几个清理电脑的软件来看看吗?
- 广东第一高中生_广东男篮签下全美第一高中生 NBA状元热门征战CBA
- global 与 $GLOBALS用法
- 2022年sublime安装教程超简单
- 流利阅读 2019.2.22 Duke University apologizes over professor’s email asking Chinese students to speak En
- 计算机策略组 网络,组策略
- 214情人节,使用微信小程序【信鸽相知】写情书吧
- 考研英语 长难句训练day65
- 自然语言处理(NLP)的基本概念 (未完待续)
- Featuretools快速使用指南--看这一篇就够了
- 百度董事长李彦宏的理念
- iconfont添加新图标_老项目中的iconfont字体图标添加新的图标