DelayQueue

DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。
存放到DelayDeque的元素必须继承Delayed接口。Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期,该接口强制执行下列两个方法:

  • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定

DelayQueue使用场景

  • 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
  • 缓存超过了缓存时间,就需要从缓存中移除。

DelayQueue超时订单处理案例

package com.rumenz.learn.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;//DelayQueue里面的元素必须实现Delayedpublic class Item<T> implements Delayed {private Long expireTime;private T data;public Item(Long expireTime, T data) {this.expireTime = expireTime+System.currentTimeMillis();this.data = data;}@Overridepublic long getDelay(TimeUnit unit) {long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);return d;}@Overridepublic int compareTo(Delayed o) {long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);if(d==0){return 0;}return d>0?1:-1;}public Long getExpireTime() {return expireTime;}public void setExpireTime(Long expireTime) {this.expireTime = expireTime;}public T getData() {return data;}public void setData(T data) {this.data = data;}
}// 订单实体类
package com.rumenz.learn.delayqueue;
public class OrderItem {private Double orderAmount;private String orderNo;//0未支付 1支付了private Integer orderStatus;public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {this.orderAmount = orderAmount;this.orderNo = orderNo;this.orderStatus = orderStatus;}public Double getOrderAmount() {return orderAmount;}public void setOrderAmount(Double orderAmount) {this.orderAmount = orderAmount;}public String getOrderNo() {return orderNo;}public void setOrderNo(String orderNo) {this.orderNo = orderNo;}public Integer getOrderStatus() {return orderStatus;}public void setOrderStatus(Integer orderStatus) {this.orderStatus = orderStatus;}
}//package com.rumenz.learn.delayqueue;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;public class DelayQueueExample {//3个线程 1个线程下单 1个线程支付  1个线程关闭超时订单  订单支付超时时间为10spublic static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(3);DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();//下单线程executorService.execute(()->{SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Integer orderNo=100;while (true){try{Thread.sleep(3000);Integer amount = new Random().nextInt(1000);OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);Item<OrderItem> item=new Item<>(10*1000L,orderItem);Date date=new Date();date.setTime(item.getExpireTime());System.out.println("=======================下单==========================");System.out.println("生成订单时间:"+simpleDateFormat.format(new Date()));System.out.println("订单编号:"+orderNo);System.out.println("订单金额:"+orderItem.getOrderAmount());System.out.println("支付过期时间:"+simpleDateFormat.format(date));System.out.println("========================下单=========================");map.put(String.valueOf(orderNo),orderItem);orderNo++;delayeds.offer(item);}catch (Exception e){e.printStackTrace();}}});//支付线程executorService.execute(()->{while (true){try {//随机等待 再支付Thread.sleep(new Random().nextInt(15)*1000);String orderNo="";Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();if(iterator.hasNext()){OrderItem orderItem = iterator.next().getValue();orderItem.setOrderStatus(1);orderNo=orderItem.getOrderNo();System.out.println("-----------------------支付订单-----------------------");System.out.println("订单支付"+orderNo);System.out.println("支付金额"+orderItem.getOrderAmount());System.out.println("-----------------------支付订单-----------------------");}map.remove(orderNo);}catch (Exception e){e.printStackTrace();}}});//关系过期的订单executorService.execute(()->{SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while (true){try{Item<OrderItem> item = delayeds.take();OrderItem data = item.getData();Date date=new Date();date.setTime(item.getExpireTime());if(data.getOrderStatus()==0){System.out.println("########################过期订单########################");System.out.println("订单编号:"+data.getOrderNo());System.out.println("订单金额:"+data.getOrderAmount());System.out.println("订单到期支付时间:"+simpleDateFormat.format(date));System.out.println("########################过期订单########################");}map.remove(data.getOrderNo());}catch (Exception e){e.printStackTrace();}}});executorService.shutdown();}
}

SynchronousQueue

它是一个特殊的队列交做同步队列,特点是当一个线程往队列里写一个元素,写入操作不会理解返回,需要等待另外一个线程来将这个元素拿走。同理,当一个读线程做读操作的时候,同样需要一个相匹配写线程的写操作。这里的Synchronous指的就是读写线程需要同步,一个读线程匹配一个写线程,同理一个写线程匹配一个读线程。

不像ArrayBlockingQueueLinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。

较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。

public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//内部栈static final class TransferStack<E> extends Transferer<E> {}//内部队列static final class TransferQueue<E> extends Transferer<E> {}public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
}

SynchronousQueue代码演示

package com.rumenz.learn.synchronousqueue;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;public class SynchronousQueueExample {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>();ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(()->{try {System.out.println(Thread.currentThread().getName()+"put 1");queue.put("1");System.out.println(Thread.currentThread().getName()+"put 2");queue.put("2");System.out.println(Thread.currentThread().getName()+"put 3");queue.put("3");System.out.println(Thread.currentThread().getName()+"put 4");queue.put("4");}catch (Exception e){e.printStackTrace();}});executorService.execute(()->{try{TimeUnit.SECONDS.sleep(1);System.out.println("获取数据:"+queue.take());TimeUnit.SECONDS.sleep(1);System.out.println("获取数据:"+queue.take());TimeUnit.SECONDS.sleep(1);System.out.println("获取数据:"+queue.take());TimeUnit.SECONDS.sleep(1);System.out.println("获取数据:"+queue.take());}catch (Exception e){e.printStackTrace();}});executorService.shutdown();}}

关注微信公众号:【入门小站】,关注更多知识点

Java高并发BlockingQueue重要的实现类二相关推荐

  1. Java高并发BlockingQueue重要的实现类

    ArrayBlockingQueue 有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部. ...

  2. Java高并发编程:同步工具类

    内容摘要 这里主要介绍了java5中线程锁技术以外的其他同步工具,首先介绍Semaphore:一个计数信号量.用于控制同时访问资源的线程个数,CyclicBarrier同步辅助类:从字面意思看是路障, ...

  3. java queue 线程安全_详解Java高并发——设计线程安全的类

    前言: 将现有的线程安全的组件组合为更大规模的组件或程序. 通过使用封装技术可以使得在不对整个程序进行分析的情况下就可以判断一个类是否是线程安全的. 一. 基本要素 1. 找出对象状态的所有变量 如果 ...

  4. 【高并发】JUC底层工具类Unsafe

    1.概述 转载:添加链接描述 参考:[java]java的unsafe 参考:JUC原子类: CAS, Unsafe和原子类详解 1.1 本文主要内容 Unsafe基本介绍 获取Unsafe实例 Un ...

  5. java unsafe获取指针_【实战Java高并发程序设计 1】Java中的指针:Unsafe类

    是<实战Java高并发程序设计>第4章的几点. 如果你对技术有着不折不挠的追求,应该还会特别在意incrementAndGet() 方法中compareAndSet()的实现.现在,就让我 ...

  6. JAVA高并发程序设计(葛一鸣著)读书笔记

    本文为JAVA高并发程序设计(葛一鸣著)读书笔记.这本书对于刚刚入门的童鞋来讲可能有点深,我推荐可以先看看Java多线程编程核心技术(高洪岩著)一书. 第一章 走入并行世界 什么是同步和异步? 同步就 ...

  7. [Java高并发系列(5)][详细]Java中线程池(1)--基本概念介绍

    1 Java中线程池概述 1.1 什么是线程池? 在一个应用当中, 我们往往需要多次使用线程, 这意味着我们需要多次创建和销毁线程.那么为什么不提供一个机制或概念来管理这些线程呢? 该创建的时候创建, ...

  8. Java高并发程序设计入门

    转自:http://blog.csdn.net/johnstrive/article/details/50667557 说在前面 本文绝大部分参考<JAVA高并发程序设计>,类似读书笔记和 ...

  9. Java高并发系列5-线程池

    Java高并发系列5-线程池 接上一篇Java并发系列4-并发容器我们继续 在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销.如果每次执行一个任务都需要开个新线程去执行, ...

最新文章

  1. 解决IE8,7下设置背景图片 background-size 不支持问题
  2. Spring教程--IOC(控制反转)详解
  3. java 递归编译_java计算x^n的递归方法?求高手给个算法最佳的 最好能编译通过 本人处于java初学者时期^^...
  4. linux删除modules文件夹,linux – 为什么我不能删除这个dkms模块?
  5. ubuntu安装codeblock的方法
  6. ARM 寄存器 详解
  7. 口琴膜片什么作用_2020年半音阶口琴选购攻略,让小白告别选择困难
  8. python selenium headless chrome chromedriver 等安装
  9. linux复制文件命令
  10. 【8001】解决打开idea出现红色感叹号报错信息Cannot find keymap Windows copy?
  11. 实验5、D/A转换实验
  12. 次氯酸钠phP,次氯酸钠
  13. 查询至少选了1班2号同学所选课的所有同学班号、学号
  14. CANVAS LMS开源系统
  15. Threejs实现模拟河流,水面水流,水管水流,海面
  16. Linux系统中的mount挂载命令及参数详解
  17. 小丸子学Docker系列之——实战Dockerfile
  18. MATLAB科学计算机lnx代码,[2018年最新整理]Matlab科学计算.ppt
  19. jQuery css选择器大全,总有你用得到的东西。
  20. 【安全资讯】东京奥运会现钓鱼网站已有观众受骗

热门文章

  1. 通用Makefile模板
  2. 1062. Talent and Virtue (25)-PAT甲级真题
  3. L1-017. 到底有多二-PAT团体程序设计天梯赛GPLT
  4. Perl 监控 tomcat,可以安心回家过年了
  5. 非广告--推荐Dynatrace:树立数字化性能管理DPM标杆
  6. linux下重命名文件
  7. jQuery常用工具方法
  8. centos 6.7 安装php7
  9. 第三次冲刺--软件工程
  10. springMvc整合hibernate出现问题