java 之DelayQueue实际运用示例

在学习Java 多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

在网上也看到两个示例,但这两个示例个人在实际运行时均没有达到满足业务场景的效果,因而对其进行了修改,供大家参考讨论。

业务场景一:多考生考试

该场景来自于http://ideasforjava.iteye.com/blog/657384,模拟一个考试的日子,考试时间为120分钟,30分钟后才可交卷,当时间到了,或学生都交完卷了考试结束。

这个场景中几个点需要注意:

  1. 考试时间为120分钟,30分钟后才可交卷,初始化考生完成试卷时间最小应为30分钟
  2. 对于能够在120分钟内交卷的考生,如何实现这些考生交卷
  3. 对于120分钟内没有完成考试的考生,在120分钟考试时间到后需要让他们强制交卷
  4. 在所有的考生都交完卷后,需要将控制线程关闭

实现思想:用DelayQueue存储考生(Student类),每一个考生都有自己的名字和完成试卷的时间,Teacher线程对DelayQueue进行监控,收取完成试卷小于120分钟的学生的试卷。当考试时间120分钟到时,先关闭Teacher线程,然后强制DelayQueue中还存在的考生交卷。每一个考生交卷都会进行一次countDownLatch.countDown(),当countDownLatch.await()不再阻塞说明所有考生都交完卷了,而后结束考试。

package com.my.base.concurrent.delayQueue;import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/***this project is created for my partactice.*In the  project I will write the mybatis by myself**2014-1-10  下午9:43:48*@author 孙振超   mychaoyue2011@163.com*/public class Exam {/****2014-1-10 下午9:43:48 by 孙振超**@param args*void* @throws InterruptedException */public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubint studentNumber = 20;CountDownLatch countDownLatch = new CountDownLatch(studentNumber+1);DelayQueue< Student> students = new DelayQueue<Student>();Random random = new Random();for (int i = 0; i < studentNumber; i++) {students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));}Thread teacherThread =new Thread(new Teacher(students)); students.put(new EndExam(students, 120,countDownLatch,teacherThread));teacherThread.start();countDownLatch.await();System.out.println(" 考试时间到,全部交卷!");  }}class Student implements Runnable,Delayed{private String name;private long workTime;private long submitTime;private boolean isForce = false;private CountDownLatch countDownLatch;public Student(){}public Student(String name,long workTime,CountDownLatch countDownLatch){this.name = name;this.workTime = workTime;this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime();this.countDownLatch = countDownLatch;}@Overridepublic int compareTo(Delayed o) {// TODO Auto-generated method stubif(o == null || ! (o instanceof Student)) return 1;if(o == this) return 0; Student s = (Student)o;if (this.workTime > s.workTime) {return 1;}else if (this.workTime == s.workTime) {return 0;}else {return -1;}}@Overridepublic long getDelay(TimeUnit unit) {// TODO Auto-generated method stubreturn unit.convert(submitTime - System.nanoTime(),  TimeUnit.NANOSECONDS);}@Overridepublic void run() {// TODO Auto-generated method stubif (isForce) {System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" );}else {System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 "+workTime +" 分钟");  }countDownLatch.countDown();}public boolean isForce() {return isForce;}public void setForce(boolean isForce) {this.isForce = isForce;}}class EndExam extends Student{private DelayQueue<Student> students;private CountDownLatch countDownLatch;private Thread teacherThread;public EndExam(DelayQueue<Student> students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) {super("强制收卷", workTime,countDownLatch);this.students = students;this.countDownLatch = countDownLatch;this.teacherThread = teacherThread;}@Overridepublic void run() {// TODO Auto-generated method stubteacherThread.interrupt();Student tmpStudent;for (Iterator<Student> iterator2 = students.iterator(); iterator2.hasNext();) {tmpStudent = iterator2.next();tmpStudent.setForce(true);tmpStudent.run();}countDownLatch.countDown();}}class Teacher implements Runnable{private DelayQueue<Student> students;public Teacher(DelayQueue<Student> students){this.students = students;}@Overridepublic void run() {// TODO Auto-generated method stubtry {System.out.println(" test start");while(!Thread.interrupted()){students.take().run();}} catch (Exception e) {// TODO: handle exceptione.printStackTrace();}}}

业务场景二:具有过期时间的缓存

该场景来自于http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html,向缓存添加内容时,给每一个key设定过期时间,系统自动将超过过期时间的key清除。

这个场景中几个点需要注意:

  1. 当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间
  2. 为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数
  3. 当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程

具体实现如下:

package com.my.base.concurrent.delayQueue;import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/***Cache.java** Created on 2014-1-11 上午11:30:36 by sunzhenchao mychaoyue2011@163.com*/
public class Cache<K, V> {public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<K, V>();public DelayQueue<DelayedItem<K>> queue = new DelayQueue<DelayedItem<K>>();public void put(K k,V v,long liveTime){V v2 = map.put(k, v);DelayedItem<K> tmpItem = new DelayedItem<K>(k, liveTime);if (v2 != null) {queue.remove(tmpItem);}queue.put(tmpItem);}public Cache(){Thread t = new Thread(){@Overridepublic void run(){dameonCheckOverdueKey();}};t.setDaemon(true);t.start();}public void dameonCheckOverdueKey(){while (true) {DelayedItem<K> delayedItem = queue.poll();if (delayedItem != null) {map.remove(delayedItem.getT());System.out.println(System.nanoTime()+" remove "+delayedItem.getT() +" from cache");}try {Thread.sleep(300);} catch (Exception e) {// TODO: handle exception}}}/*** TODO* @param args* 2014-1-11 上午11:30:36* @author:孙振超* @throws InterruptedException */public static void main(String[] args) throws InterruptedException {Random random = new Random();int cacheNumber = 10;int liveTime = 0;Cache<String, Integer> cache = new Cache<String, Integer>();for (int i = 0; i < cacheNumber; i++) {liveTime = random.nextInt(3000);System.out.println(i+"  "+liveTime);cache.put(i+"", i, random.nextInt(liveTime));if (random.nextInt(cacheNumber) > 7) {liveTime = random.nextInt(3000);System.out.println(i+"  "+liveTime);cache.put(i+"", i, random.nextInt(liveTime));}}Thread.sleep(3000);System.out.println();}}class DelayedItem<T> implements Delayed{private T t;private long liveTime ;private long removeTime;public DelayedItem(T t,long liveTime){this.setT(t);this.liveTime = liveTime;this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();}@Overridepublic int compareTo(Delayed o) {if (o == null) return 1;if (o == this) return  0;if (o instanceof DelayedItem){DelayedItem<T> tmpDelayedItem = (DelayedItem<T>)o;if (liveTime > tmpDelayedItem.liveTime ) {return 1;}else if (liveTime == tmpDelayedItem.liveTime) {return 0;}else {return -1;}}long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);return diff > 0 ? 1:diff == 0? 0:-1;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(removeTime - System.nanoTime(), unit);}public T getT() {return t;}public void setT(T t) {this.t = t;}@Overridepublic int hashCode(){return t.hashCode();}@Overridepublic boolean equals(Object object){if (object instanceof DelayedItem) {return object.hashCode() == hashCode() ?true:false;}return false;}}

分类: Java基础

标签: Java, Delayed, DelayQueue, CountDownLatch

好文要顶 关注我 收藏该文  

孙振超
关注 - 6
粉丝 - 102

+加关注

6

1

« 上一篇:JVM之---垃圾回收
» 下一篇:CDN和镜像站点比较

posted on 2014-01-11 16:21 孙振超 阅读(31146) 评论(27) 编辑 收藏

评论:#1楼 2015-08-24 16:25 | snowwolf101

写的非常不错啊 谢谢楼主的总结

支持(0)反对(0)

#2楼 2015-09-19 22:57 | 龙戏天

this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime();
这个时间转化没用。

支持(0)反对(0)

#3楼 2016-03-27 20:56 | 被罚站的树

mark!

支持(0)反对(0)

#4楼 2016-07-13 14:18 | Eric Lan

nice job and a lot help for me!

支持(0)反对(0)

#5楼 2016-12-02 16:20 | 李鸿远

好坑的一篇文章,为了指出楼主的错误,专门注册了一个账号。
问题1,既然是根据失效时间来判断出栈,怎么能用livetime,如果live时间是一样的,中间我删除了几个元素再加入几个元素,livetime都是一样的,顺序就乱了。需要用removeTime判断。
问题2,为何要重写equals,如果我中间删除一个元素,再加入一个元素,key是一样的,之前我删除的那个元素,时间到了,仍然会take出来。
问题3:,while(true)里面应该用take方法,大负载下poll会造成cpu过高。

支持(3)反对(0)

#6楼[楼主] 2016-12-05 16:48 | 孙振超

@ 李鸿远
非常难得!!
有换工作的计划吗?

支持(0)反对(0)

#7楼 2016-12-05 18:22 | 李鸿远

@ 孙振超
哈哈 谢谢
你要是西安的就考虑

支持(0)反对(0)

#8楼[楼主] 2016-12-15 12:20 | 孙振超

@ 李鸿远
哎,我这边是在北京。有在北京的同学或者朋友可以推荐,我们这边是java服务端,有不少牛人,要处理每秒几十万的请求。

支持(0)反对(0)

#9楼 2016-12-19 11:05 | 李鸿远

@ 孙振超
北上广还是领先呀,我们很少有这样的机会处理这么多请求。
首先光这网络带宽都不是一般用户负担的起的,你们使用弹性计算了吗,就是自动扩容?

支持(0)反对(0)

#10楼[楼主] 2017-01-04 11:15 | 孙振超

@ 李鸿远
有一个管理平台,可以设置cpu使用上限多少扩容多少机器,cpu使用下限多少缩容多少机器。配置好后会自动进行处理。

支持(0)反对(0)

#11楼 2017-02-26 23:45 | leon66666

对于场景一。有一些不同的观点。
首先,宣布考试结束和强制收试卷这些事情由老师线程来处理更加的符合实际。而不应该终止老师线程。
其次,两个类能实现的场景。却使用了三个类,涉及到了继承关系,增加了复杂度,违背了基本的设计原则。
最后,我在您的基础上进行了改造,去除了EndExam类和countDownLatch,这些都封装到了teacher类中。
源码放到了这里。说的不对的地方欢迎批评指正

支持(0)反对(0)

#12楼 2017-03-01 08:34 | 举个栗子

谢楼主总结。参考实现了一个模拟Session的应用场景
http://blog.csdn.net/soonfly/article/details/58599087

支持(0)反对(0)

#13楼[楼主] 2017-03-06 16:09 | 孙振超

@ 王中秋
hi,请问你在哪里工作啊?如果有大并发量的应用场景、有可以和行业技术高手共事的机会 可能换工作吗?

支持(0)反对(0)

#14楼 2017-03-14 19:00 | leon66666

@ 孙振超
目前在朝阳区望京附近工作,金融行业。很乐意进一步的了解一下贵公司

支持(0)反对(0)

#15楼 2017-03-22 18:06 | 雾和狼

this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime();
应该是
this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.MINUTES)+System.nanoTime();
单位用错了吧,submitTime的单位不是分钟吗

支持(0)反对(0)

#16楼 2017-04-11 10:05 | 杉木的征途

@王中秋 
你的程序有问题,如果现在时间不是毫微妙,你的程序就会有问题,因为 Student student = students.poll();并没有保证student为null啊。可以使用take方法,我将时间改为秒,你的程序就出问题了,此为拙见。

支持(2)反对(0)

#17楼 2017-08-24 19:05 | 荐辕

DelayedItem<K> tmpItem = new DelayedItem<K>(k, liveTime);
if (v2 != null) {
queue.remove(tmpItem);
}
这段代码有问题。原因在于:
1. 重写hashcode不应重写equals方法,原因上面有人已经说明。
2. 如果不重写equals方法,那么上面从queue中删除tmpItem的方法也不对。新new出来的对象,和原来的DelayedItem物理地址不同,调用Object基类的equals方法也不可能删掉。
3. hashCode方法可能引发潜在问题。如果是根据t.hashCode来确定DelayedItem的hash值,但是DelayedItem中同时由set(T t)方法会动态改变hash值。若将DelayedItem存在HashSet等集合当中,容易引发内存泄漏无法GC对象。

支持(1)反对(0)

#18楼 2017-08-24 19:23 | 荐辕

另外还存在一个问题。
多线程环境中。假设dameonCheckOverdueKey线程在执行if (delayedItem != null) 判断之后,另一个线程更新了map缓存中key值对应的value。然后map.remove(delayedItem.getT())。这会导致最近put进去的缓存丢失。问题不是很严重,但是缓存系统应该充分考虑到这种情况。
建议这部分代码加上多线程同步控制。

支持(0)反对(1)

#19楼[楼主] 2017-09-10 16:16 | 孙振超

@ 荐辕
顶!d=====( ̄▽ ̄*)b

支持(0)反对(0)

#20楼 2018-04-25 14:00 | 承载

@ leon66666
你写的代码是有错误的,你添加的是6个人。。。学生编号只有5个。你可以多运行几次,肯定会有一个学生叫student 这是不对的。

支持(0)反对(0)

#21楼 2018-04-25 14:02 | 承载

students.put(new EndExam(students, 120,countDownLatch,teacherThread));为什么要多加一个人。

支持(0)反对(0)

#22楼 2018-05-31 11:20 | wumc_time

@ 杉木的征途
改为秒后,pool老拿的是头部元素,意味着很多后面不达到强制交卷的都被迫交卷了(也就是部分提前交卷的生生被搞成被迫交卷了

支持(0)反对(0)

#23楼 2018-05-31 11:28 | wumc_time

@ 荐辕
ConcurrentHashMap

支持(0)反对(0)

#24楼 2018-07-21 15:35 | 林冲—first

第一个场景的代码Studnet不是根据getdelay()来进行排序的,实现了compareable接口利用workTimke来进行排序,submitTime这个没啥用,getdelay()直接返回零也没关系,这样容易误导读者

支持(0)反对(0)

#25楼 2018-11-15 09:42 | 林雪莲

@ 荐辕
为啥不能重写equals方法 能不能解释下?这点没明白

支持(0)反对(0)

#26楼 2018-11-15 09:43 | 林雪莲

@ 李鸿远
问题2没明白 能否解释一下

支持(0)反对(0)

#27楼 2019-01-29 15:57 | zhO

重写的DelayedItem<T>的equals方法应该比较T(key)是否一样。

java 之DelayQueue实际运用示例相关推荐

  1. 10个Java 8 Lambda表达式经典示例

    Java 8 刚于几周前发布,日期是2014年3月18日,这次开创性的发布在Java社区引发了不少讨论,并让大家感到激动.特性之一便是随同发布的lambda表 达式,它将允许我们将行为传到函数里.在J ...

  2. Java IOUtils.copy方法代码示例(亲测)

    本文整理汇总了Java中org.apache.commons.io.IOUtils.copy方法的典型用法代码示例.如果您正苦于以下问题:Java IOUtils.copy方法的具体用法?Java I ...

  3. java中DelayQueue的使用

    文章目录 简介 DelayQueue DelayQueue的应用 总结 java中DelayQueue的使用 简介 今天给大家介绍一下DelayQueue,DelayQueue是BlockingQue ...

  4. Java中的Volatile如何工作? Java中的volatile关键字示例

    如何在Java中使用Volatile关键字 在Java采访中,什么是volatile变量以及何时在Java中使用volatile变量是Java 采访中一个著名的多线程采访问题 . 尽管许多程序员都知道 ...

  5. java自建ocr完整示例_Java 7:完整的invokedynamic示例

    java自建ocr完整示例 我当前的Java 7系列中的另一个博客条目. 这次它处理的是invokedynamic,这是JVM上用于方法调用的新字节码指令. invokedynamic指令允许呼叫站点 ...

  6. Java异常处理教程(包含示例和最佳实践)

    异常是可能在程序执行期间发生的错误事件,它会破坏其正常流程. Java提供了一种健壮且面向对象的方式来处理异常情况,称为Java异常处理 . 我们将在本教程中研究以下主题. Java异常处理概述 异常 ...

  7. Java依赖注入 - DI设计模式示例教程

    Java依赖注入 - DI设计模式示例教程 Java依赖注入 设计模式允许我们删除硬编码的依赖项,并使我们的应用程序松散耦合,可扩展和可维护.我们可以在java中实现依赖注入,以将依赖项解析从编译时移 ...

  8. java组合与继承始示例_排列组合:用公式示例解释的差异

    java组合与继承始示例 Permutations and Combinations are super useful in so many applications – from Computer ...

  9. java基础之----java常见异常及代码示例

    java基础之----java常见异常及代码示例 参考文章: (1)java基础之----java常见异常及代码示例 (2)https://www.cnblogs.com/gunduzi/p/1203 ...

  10. java组合与继承始示例_Java 8特性与示例

    java组合与继承始示例 Java 8 was released on 18th March 2014, so it's high time to look into Java 8 Features. ...

最新文章

  1. 作为互联网流量入口,CDN日志大数据你该怎么玩?
  2. VS Code 1.29 发布,众多新功能有没有你想要的?
  3. boost::mpi::wait_any相关用法的测试程序
  4. php autoload用法,php自动加载__autoload()函数用法
  5. ReportViewer不连接数据库,自定义DataSet导出到报表
  6. 启动activity的标准的action常量及对应的字符串
  7. matlab中D A1在哪,A1=d(1:15,:);A2=d(16:30,:);A3=
  8. vbreport8.wpf.viewer 个别电脑不显示_【电脑手机小技巧】新买的电脑,第一次开机最好要这样设置...
  9. Google位置服务模板
  10. 华为荣耀9升降级系统 | 华为荣耀9变砖后如何救砖 | 华为荣耀9获取BL解锁码以及如何解BL锁 | 华为荣耀9如何通过写ramdisk.img来获取root
  11. 细说php完美分页类
  12. 数学教育与计算机教育ppt,计算机基础教育课件.ppt
  13. AI配音专家(文字转语音真人发声工具)官方中文版V1.0.5 | 文字转语音软件下载 | 这是一款阿里语音合成引擎且能把文字读出来的软件
  14. 软件模拟I2C(万能模板)
  15. 如何用云计算搭建服务器,如何搭建一个云服务器
  16. 发表Nature等杂志四十多篇论文老师带您学单细胞测序数据挖掘和课题设计 2020年1月11-12日 上海...
  17. PS制作透明的BMP图片
  18. 阿里云服务器CentOS开放特定端口
  19. 【牛客】链表的回文结构
  20. Open3d之计算点云边界框

热门文章

  1. 锐捷530-E无线AP配置
  2. Redis端口为什么是6379?
  3. 关于各种网站音频mp3的外链地址,真实的外链播放地址
  4. OO系统分析员之路--用例分析系列(1)--什么是用例
  5. linux服务添加互信,Linux多节点互信配置
  6. DART booster
  7. Redis Cluster集群原理+三主三从交叉复制实战+故障切换(十)
  8. ARM CPU Cortex-X3,Cortex-A715,Cortex-A510 | GPU Immortalis-G715
  9. mooc-人工智能与信息社会-基于决策树和搜索的智能系统(上)
  10. android自动打开软键盘,Android打开关闭软键盘