1.什么是DelayQueue

DelayQueue 是 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。

总结一下如下:

1、DelayQueue队列中的元素必须是Delayed接口的实现类,该类内部实现了getDelay()和compareTo()方法,第一个方法是比较两个任务的延迟时间进行排序,第二个方法用来获取延迟时间。 
     2、DelayQueue队列没有大小限制,因此向队列插数据不会阻塞 
     3、DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。否则线程阻塞。 
     4、DelayQueue中的元素不能为null。
     5、DelayQueue内部是使用PriorityQueue实现的。compareTo()比较后越小的越先取出来。

2.使用场景

DelayQueue能做什么?

在我们的业务中通常会有一些需求是这样的:

1、订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。

2、订餐通知:下单成功后60s之后给用户发送短信通知。

那么这类业务我们可以总结出一个特点:需要延迟工作。
由此的情况,就是我们的DelayQueue应用需求的产生。

3.简单实例

下面通过一个简单实例来了解一用法

public static void main(String[] args) {DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();//生产者producer(delayQueue);//消费者consumer(delayQueue);while (true) {try {TimeUnit.HOURS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}
}/*** 每100毫秒创建一个对象,放入延迟队列,延迟时间1毫秒* @param delayQueue*/
private static void producer(final DelayQueue<DelayedElement> delayQueue) {new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}DelayedElement element = new DelayedElement(1000, "test");delayQueue.offer(element);}}}).start();/*** 每秒打印延迟队列中的对象个数*/new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {TimeUnit.MILLISECONDS.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("delayQueue size:" + delayQueue.size());}}}).start();
}/*** 消费者,从延迟队列中获得数据,进行处理* @param delayQueue*/
private static void consumer(final DelayQueue<DelayedElement> delayQueue) {new Thread(new Runnable() {@Overridepublic void run() {while (true) {DelayedElement element = null;try {element = delayQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("now=" + System.currentTimeMillis() + "---" + element);}}}).start();
}

}

class DelayedElement implements Delayed {private final long   delay; //延迟时间
private final long   expire;  //到期时间
private final String msg;   //数据
private final long   create; //创建时间public DelayedElement(long delay, String msg) {this.delay = delay;this.msg = msg;expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间create = System.currentTimeMillis();
}/*** 需要实现的接口,获得延迟时间   用过期时间-当前时间* @param unit* @return*/
@Override
public long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}/*** 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间* @param o* @return*/
@Override
public int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}@Override
public String toString() {final StringBuilder sb = new StringBuilder("DelayedElement{");sb.append("delay=").append(delay);sb.append(", expire=").append(expire);sb.append(", msg='").append(msg).append('\'');sb.append(", create=").append(create);sb.append('}');return sb.toString();
}

运行结果:

4.源码分析

下面来看看其主要实现的源码分析

(1)、从队列中取元素

public E take() throws InterruptedException {  final ReentrantLock lock = this.lock;  lock.lockInterruptibly();  try {  for (;;) {  E first = q.peek();  if (first == null)  available.await();  else {  long delay = first.getDelay(TimeUnit.NANOSECONDS);  if (delay <= 0)  return q.poll();  else if (leader != null)  available.await();  else {  Thread thisThread = Thread.currentThread();  leader = thisThread;  try {  available.awaitNanos(delay);  } finally {  if (leader == thisThread)  leader = null;  }  }  }  }  } finally {  if (leader == null && q.peek() != null)  available.signal();  lock.unlock();  }
}

可以看到,在这段代码里,在第一个元素的延迟时间还没到的情况下: 
a.如果当前没有其他线程等待,则阻塞当前线程直到延迟时间。 
b.如果有其他线程在等待,则阻塞当前线程。

(2)、向队列中放入元素

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}

在放入元素的时候,会唤醒等待中的读线程。

5.进阶-实现一个延迟缓存队列

实现一个带延迟缓存队列

(1)、定义delayIetm对象

public class DelayItem<T> implements Delayed  {//创建时间
private long MILLISECONDS_ORIGIN = System.currentTimeMillis();//元素
private T item;//元素的存活时间,单位为毫秒 (unit:milliseconds)
private long liveMilliseconds;public DelayItem(T item, long liveMilliseconds) {this.liveMilliseconds = liveMilliseconds;this.item = item;
}private final long now() {return System.currentTimeMillis() - MILLISECONDS_ORIGIN;
}public void setMilliseconds(long milliseconds) {MILLISECONDS_ORIGIN = System.currentTimeMillis();this.liveMilliseconds = milliseconds;
}/*** 如果超时,或者Map缓存中已经没有该元素,都会导致失效** @param unit* @return*/
public long getDelay(TimeUnit unit) {long d = unit.convert(liveMilliseconds - now(), TimeUnit.MILLISECONDS);//        LOGGER.debug("=============key:" + item + ",time:" + milliseconds + " , now:" + now() + ",times:{}", checkTimesLeft);return d;
}@Override
public boolean equals(Object obj) {if (obj instanceof com.github.lin.DelayItem) {return item.equals(((com.github.lin.DelayItem) obj).getItem());} else {return false;}
}public int compareTo(Delayed o) {if (o == this) {return 0;}//根据距离下次超时时间的长短来排优先级,越接近下次超时时间的优先级越高long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}public T getItem() {return item;}
}

(2)、定义缓存工厂

public class CacheFactory<K, V> {/**缓存map*/
private ConcurrentHashMap<K, V> concurrentHashMap = new ConcurrentHashMap<K, V>();/**延迟队列*/
private DelayQueue<com.github.lin.DelayItem<K>> delayQueue = new DelayQueue<com.github.lin.DelayItem<K>>();/**过期检查队列*/
private Thread expireCheckThread;public CacheFactory() {//定时清理过期缓存expireCheckThread = new Thread() {@Overridepublic void run() {dameonCheckOverdueKey();}};expireCheckThread.setDaemon(true);expireCheckThread.start();
}/*** 放入带有过期时间的元素* @param key* @param value* @param liveMilliseconds*/
public void put(K key, V value, long liveMilliseconds) {V oldValue = concurrentHashMap.put(key, value);if (oldValue != null) {//todo 这个地方性能比较差,DelayQueue删除元素慢boolean result = delayQueue.remove(new com.github.lin.DelayItem<K>(key, 0L));}com.github.lin.DelayItem delayItem = new com.github.lin.DelayItem(key, liveMilliseconds);delayQueue.put(delayItem);
}/*** 取元素* @param key* @return*/
public V get(K key) {return concurrentHashMap.get(key);
}/*** 检查过期的key,从cache中删除*/
private void dameonCheckOverdueKey() {DelayItem<K> delayedItem;while (true) {try {delayedItem = delayQueue.take();if (delayedItem != null) {concurrentHashMap.remove(delayedItem.getItem());System.out.println(System.nanoTime() + " remove " + delayedItem.getItem() + " from cache");}} catch (InterruptedException e) {e.printStackTrace();}}
}
}

其实就是一个map和一个queue来实现,后台有一个守护线程,如果检查到有过期的key。就从queue里取出来,再从map删除。

(3)、测试用例

public class CacheFactoryTest {public static void main(String[] args) throws InterruptedException {CacheFactory<String,String> cacheFactory = new CacheFactory<String, String>();//存活1scacheFactory.put("key1","value1",1000);//存活10scacheFactory.put("key2","value2",10000);System.out.println("begin get key1:" + cacheFactory.get("key1"));System.out.println("begin get key2:" +cacheFactory.get("key2"));//等待2sThread.sleep(2000);System.out.println("after 2s:" +cacheFactory.get("key1"));System.out.println("after 2s:" +cacheFactory.get("key2"));//等待10sThread.sleep(10000);System.out.println("after 10s:" +cacheFactory.get("key1"));System.out.println("after 10s:" +cacheFactory.get("key2"));}
}

执行结果


扫码向博主提问

zhangxing52077

非学,无以致疑;非问,无以广识

延迟队列DelayQueue研究相关推荐

  1. 【JAVA】延迟队列DelayQueue的应用

    最近在开发CRM管理系统时遇到一个需求:销售部门的人员在使用该系统时,可以从[线索公海]模块中 "领取" 潜在的客户线索到自己的[线索私海]模块中,成为自己私有的潜在客户线索,以便 ...

  2. JUC学习 - 延迟队列 DelayQueue 详解

    1.DelayQueue基本特征 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>impl ...

  3. 完整案例:实现延迟队列的两种方法

    延迟队列是指把当前要做的事情,往后推迟一段时间再做. 延迟队列在实际工作中和面试中都比较常见,它的实现方式有很多种,然而每种实现方式也都有它的优缺点,接下来我们来看. 延迟队列的使用场景 延迟队列的常 ...

  4. java实现消息队列以及延迟消息(队列DelayQueue)

    1.java实现延迟消息(队列DelayQueue) DelayQueue是一个支持延时获取元素的无界阻塞队列.队列使用PriorityQueue来实现.队列中的元素必须实现Delayed接口,在创建 ...

  5. java 延时队列_Java实现简单延迟队列和分布式延迟队列

    在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队 ...

  6. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

  7. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  8. RabbitMQ 延迟队列,太实用了!

    点击关注公众号,Java干货及时送达 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付 ...

  9. rabbitmq利用死信队列+TTL 实现延迟队列

    2019独角兽企业重金招聘Python工程师标准>>> 适用场景:订单超时未支付,倘若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性 ...

  10. RabbitMQ(九):RabbitMQ 延迟队列,消息延迟推送(Spring boot 版)

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

最新文章

  1. 解决虚拟机vmware安装64位系统“此主机支持 Intel VT-x,但 Intel VT-x 处于禁用状态”的问题...
  2. 如何识别数据中心的能源浪费?
  3. ACM错误提示/错误原因
  4. spring发展历程
  5. pfile文件怎么恢复格式_回收站清空的文件怎么恢复?值得收藏的恢复方法
  6. 面试官面试前端_如何面试面试官
  7. 虚拟化技术--服务器虚拟化
  8. Go工程化 - 手摸手带你理解依赖注入
  9. GPIO应用开发方法【ZT】
  10. C#中的 Stream
  11. mysql基础知识(二)
  12. HDOJ水题集合2:最短路(Dijkstra)
  13. Android GMS重要工具和资料下载
  14. git制作patch补丁
  15. bandwidth看内存带宽性能
  16. 集线器工作原理简要说明
  17. 怎么建立局域网_win8系统如何建立局域网 win8建立局域网操作方法【步骤详解】...
  18. 神马都是浮云!神马浮云是什么意思?-出自小月月
  19. 最常用的几个数据验证正则判断,手机号,车牌号,身份证,Email,IP
  20. iOS App由生到死的过程

热门文章

  1. win11 windows 服务打开word 另存为pdf
  2. cesium添加高德路网中文注记图及高德在线地图介绍
  3. Arduino Uno + HMC5883L电子罗盘 实验
  4. java文件上传下载接口_java 文件上传下载
  5. 关于vue、js连接打印机
  6. xmapp 终端数据库问题记录 已解决
  7. Elasticsearch 相关知识
  8. freeswitch通话记录mysql_freeswitch电话计费详单入库方法
  9. EQ一卡通踩过的坑,解决加载不到动态库,输出数据到led屏乱码问题
  10. 安徽新华学院计算机学院官网,安徽新华学院计算机协会第十八届换届大会