延时队列DelayQueue

延迟元素的无界阻塞队列,其中元素只能在其延迟到期后才能获取。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {...}public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

队列中的元素必须实现Delayed 接口,该接口继承Comparable接口,并指定了获取元素到期时间的方法。这里提供一个简单的基础实现例子:

public class DelayElem implements Delayed {/*** 延迟时间*/private final long delay;/*** 到期时间*/private final long expire;/*** 数据*/private final String msg;public DelayElem(long delay, String msg) {this.delay = delay;this.msg = msg;//到期时间 = 当前时间+延迟时间this.expire = System.currentTimeMillis() + this.delay;}/*** 需要实现的接口,获得延迟时间* 用过期时间-当前时间** @param unit 时间单位* @return 延迟时间*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}/*** 用于延迟队列内部比较排序* 当前时间的延迟时间 - 比较对象的延迟时间** @param o 比较对象* @return 结果*/@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "DelayElem{" +"delay=" + delay +", expire=" + expire +", msg='" + msg + '\'' +'}';}
}

DelayElem 的 compareTo 方法实现必须合理,因为DelayQueue中存储元素实际使用的是PriorityQueue优先级队列,是依赖于compareTo 方法进行排序的。

这里不要认为到期时间短或已过期的元素就一定在队列头部(虽然这可能比较契合大多场景),一些特殊需求下,可能compareTo 更侧重于其他属性,那么过期的元素可能并不先出队。如修改上面DelayElem的compareTo 方法:

    @Overridepublic int compareTo(Delayed o) {DelayElem d = (DelayElem) o;return (this.msg.compareTo(d.getMsg()));}

然后写个测试

    public static void main(String[] args) {DelayQueue<DelayElem> deque = new DelayQueue<>();DelayElem d1 = new DelayElem(2000, "c");DelayElem d2 = new DelayElem(4000, "a");deque.offer(d1);deque.offer(d2);System.err.println(deque.poll());System.err.println(deque.poll());}// 输出结果如下// mainDelayElem{delay=4000, expire=1634798982416, msg='a'}// mainDelayElem{delay=2000, expire=1634798980416, msg='c'}

继续聊DelayQueue,简单看一下核心源码

    private final transient ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();// 用来实际存储数据的优先级队列private final PriorityQueue<E> q = new PriorityQueue<E>();// 线程模型中的leader-follower模式private Thread leader = null;

只有这4个属性,其中leader 使用了leader-follower模式,查阅后在这里简单描述一下:

Leader-follower线程模型,其出现是为了解决“ 单线程接受请求,线程池线程处理请求 ”模式下,线程上下文切换以及线程间通信数据拷贝的开销,并且不需要维护一个队列。
在Leader-follower线程模型中每个线程有三种模式,leader,follower,processing。Leader-follower线程模型,一开始会创建一个线程池,并且会选取一个线程作为leader线程,leader线程负责监听网络请求,其它线程为follower处于waiting状态,当leader线程接受到一个请求后,会释放自己作为leader的权利,然后从follower线程中选择一个线程进行激活,然后激活的线程被选择为新的leader线程作为服务监听,然后老的leader则负责处理自己接受到的请求(现在老的leader线程状态变为了processing),处理完成后,状态从processing转换为follower。
可知这种模式下接受请求和进行处理使用的是同一个线程,这避免了线程上下文切换和线程通讯数据拷贝。

继续看添加元素的方法offer与获取元素的方法poll

    public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);/*** 这里如果新插入的元素位于队列头部,无论是替代原头部还是作为队列中唯一的元素,* 都意味着所有正在阻塞,尝试获取头元素的线程需要重新获取头部元素*/if (q.peek() == e) {leader = null;// 等待队列是有序的FIFO先进先出队列,首先激活的一定是leader线程available.signal();}return true;} finally {lock.unlock();}}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);}

该队列是无界队列,offer方法永远不会发生阻塞,所以我们需要关心的是poll(take方法逻辑看起来比poll明朗一些,两者思路是一致的,所以我选取take进行分析),leader-follower模式也是针对于多个线程并发阻塞获取的情景。

另外,我们发现其实唤醒操作available.signal(),只发生在两种情况下:

  1. 队列中新增了元素,并且该元素替换了原头元素,需要唤醒获取线程重新获取头元素
  2. 某个获取线程成功获取到元素或超时退出后,释放leader标记,唤醒下一个线程使其成为新的leader
    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 循环,每次醒来重新获取头元素for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waiting// 说明只有leader线程才有执行权if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// 表示当前线程成功作为leader获取到了头部元素,唤醒下一个线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

分析一下:假设队列中存在元素,但未到期,暂不可获取时,其他多个线程进行尝试获取

  1. 第一个进入take的线程,当前其他无排队线程,leader为初始值null,之后当前线程成为leader,在当前线程未释放leader之前,即leader != null,其他后续进入的线程直接阻塞,直至被唤醒。
  2. leader等待一定时间后自动苏醒,成功获取到元素,或者在等待期间,队列中头元素发生了变化,被添加线程唤醒。唤醒(被唤醒的不一定是leader线程哦)后,重新尝试获取新的头部元素。
  3. leader每次醒来都会将leader字段置null,之后如果能成功获取,则会释放leader,唤醒下一个线程,功成身退。否则将再次将leader指向自己,继续等待。
  4. 被leader唤醒的线程醒来后,重复以上逻辑,如果需要等待,则此时leader == null,该线程会将leader指向自己,称为新的leader。

那么leader-follwer模式的优势在哪呢?
好处在于,leader线程等待指定时间,其他线程无限阻塞,leader醒后唤醒其他等待者(即等待队列中有人在排队时,其他人可以放心一直等待,而不是等待一段时间后就自己醒来检测一次),假设没有leader,每个排队线程等到队列元素到期可用时,都会醒来一次,然而只有一个线程能成功获取,这是没有必要的,类似惊群效应。
说的不是很明白,可以自己想象一下,假设不设置leader,take方法该怎么写,有什么问题

好像就这样完了吧。

队列总结(六)DelayQueue相关推荐

  1. 阻塞队列之七:DelayQueue延时队列

    一.DelayQueue简介 是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的(PriorityQueue实际 ...

  2. 【JAVA】延迟队列DelayQueue的应用

    最近在开发CRM管理系统时遇到一个需求:销售部门的人员在使用该系统时,可以从[线索公海]模块中 "领取" 潜在的客户线索到自己的[线索私海]模块中,成为自己私有的潜在客户线索,以便 ...

  3. JUC学习 - 延迟队列 DelayQueue 详解

    1.DelayQueue基本特征 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>impl ...

  4. 使用Java延时队列DelayQueue实现订单延时处理

    DelayQueue简单介绍 DelayQueue:一个使用优先级队列实现的无界阻塞队列. 支持延时获取的元素的阻塞队列,元素必须要实现Delayed接口. 适用场景:实现自己的缓存系统,订单到期,限 ...

  5. 锁与并发工具包与线程池与LockSupport与Fork/Join框架与并行流串行流与阻塞队列与JPS,jstack命令查看死锁查看线程状态与AQS个人笔记九

    朝闻道,夕死可矣 本文共计 86564字,估计阅读时长1小时 点击进入->Thread源码万字逐行解析 文章目录 本文共计 86564字,估计阅读时长1小时 一锁 二Java中13个原子操作类 ...

  6. Java中的5大队列,你知道几个?

    作者 | 王磊 来源 | Java中文社群(ID:javacn666) 通过前面文章的学习<一文详解「队列」,手撸队列的3种方法!>我们知道了队列(Queue)是先进先出(FIFO)的,并 ...

  7. java add offer_图解Java中的5大队列!(干货收藏)

    Java 中的队列有很多,例如:ArrayBlockingQueue.LinkedBlockingQueue.PriorityQueue.DelayQueue.SynchronousQueue等,那它 ...

  8. java线程池_Java多线程并发:线程基本方法+线程池原理+阻塞队列原理技术分享...

    线程基本方法有哪些? 线程相关的基本方法有 wait,notify,notifyAll,sleep,join,yield 等. 线程等待(wait) 调用该方法的线程进入 WAITING 状态,只有等 ...

  9. 使用Redis 实现消息队列

    一 .为什么要用Redis实现轻量级MQ? MQ的主要作用: 应用解耦 异步化消息 流量削峰填谷 目前使用比较多的是ActiveMQ . RabbitMQ . ZeroMQ . Kafka . Met ...

  10. DelayQueue详解

    一.DelayQueue是什么 DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对 ...

最新文章

  1. QQ远程协助没动静?QQ版本有讲究
  2. Unity3D教程:自定义mesh做指向信息面板效果
  3. RichTextBox读写数据库
  4. 对C语言 结构体 和 结构变量
  5. python操作Elasticsearch7.17.0
  6. python开发专属表情包_Python开发个人专属表情包网站
  7. 【转帖】.Net中C#的DllImport的用法
  8. Struts 动态Form的验证框架步骤
  9. python展示文件_python 文件操作实力显示
  10. C# ASP 面试题 2017
  11. retain、strong、weak、assign区别
  12. 操作系统(五)输入/输出(I/O)管理
  13. 动态数组与迭代器 0119
  14. asp.net调试技巧
  15. Flex4中使用WCF
  16. linux 常用到的命令(centos 6.5)
  17. spring boot websocket 客户端_Spring Boot 开发集成 WebSocket,实现私有即时通信系统
  18. 阿里巴巴JAVA编码规范考试
  19. 手机1像素线粗_移动端1像素边框问题
  20. ps模糊照片变清晰步骤东方逐梦

热门文章

  1. 70.(cesium之家)cesium接入天地图影像与注记(经纬度)
  2. PCI/PCIe接口卡Windows驱动程序(4)- 驱动程序代码(源文件)
  3. 《第一本无人驾驶技术书》
  4. PyRadiomics工具包使用说明
  5. CODESYS Automation Server
  6. 金蝶生成凭证模板_软件-金蝶外购入库凭证模版
  7. 长尾关键词是什么意思?如何使用5118挖掘和下载长尾词?
  8. d3中元素拖拽drag实例
  9. lora calculator的使用
  10. AI语音机器人,人工智能系统转型相应的配套和未来趋势