什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

BlockingQueue成员详细介绍

1. ArrayBlockingQueue

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁

2. LinkedBlockingQueue

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

示例:

生产者

package cn.com.example.concurrent.blockingQueue;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** Created by Jack on 2017/1/24.*/
public class Producer implements Runnable {private volatile boolean isRunning = true;private BlockingQueue queue;private static AtomicInteger count = new AtomicInteger();private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;public Producer(BlockingQueue queue) {this.queue = queue;}public void run() {String data = null;Random r = new Random();System.out.println("启动生产者线程!");try {while (isRunning) {System.out.println("正在生产数据...");Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));data = "data:" + count.incrementAndGet();System.out.println("将数据:" + data + "放入队列...");if (!queue.offer(data, 2, TimeUnit.SECONDS)) {System.out.println("放入数据失败:" + data);}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("退出生产者线程!");}}public void stop() {isRunning = false;}
}

消费者

package cn.com.example.concurrent.blockingQueue;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;/*** Created by Jack on 2017/1/24.*/
public class Consumer implements Runnable {private BlockingQueue<String> queue;private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;public Consumer(BlockingQueue<String> queue) {this.queue = queue;}public void run() {System.out.println("启动消费者线程!");Random r = new Random();boolean isRunning = true;try {while (isRunning) {System.out.println("正从队列获取数据...");String data = queue.poll(2, TimeUnit.SECONDS);if (null != data) {System.out.println("拿到数据:" + data);System.out.println("正在消费数据:" + data);Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));} else {// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。isRunning = false;}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("退出消费者线程!");}}
}

测试

package cn.com.example.concurrent.blockingQueue;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;/*** Created by Jack on 2017/1/24.*/
public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {// 声明一个容量为10的缓存队列BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);Producer producer1 = new Producer(queue);Producer producer2 = new Producer(queue);Producer producer3 = new Producer(queue);Consumer consumer = new Consumer(queue);// 借助ExecutorsExecutorService service = Executors.newCachedThreadPool();// 启动线程service.execute(producer1);service.execute(producer2);service.execute(producer3);service.execute(consumer);// 执行10sThread.sleep(10 * 1000);producer1.stop();producer2.stop();producer3.stop();Thread.sleep(2000);// 退出Executorservice.shutdown();}}

结果

启动消费者线程!
正从队列获取数据...
启动生产者线程!
启动生产者线程!
正在生产数据...
启动生产者线程!
正在生产数据...
正在生产数据...
将数据:data:1放入队列...
正在生产数据...
拿到数据:data:1
正在消费数据:data:1
将数据:data:2放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:2
正在消费数据:data:2
将数据:data:3放入队列...
正在生产数据...
将数据:data:4放入队列...
正在生产数据...
将数据:data:5放入队列...
正在生产数据...
将数据:data:6放入队列...
正在生产数据...
将数据:data:7放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:3
正在消费数据:data:3
将数据:data:8放入队列...
正在生产数据...
将数据:data:9放入队列...
正在生产数据...
将数据:data:10放入队列...
正在生产数据...
正从队列获取数据...
拿到数据:data:4
正在消费数据:data:4
将数据:data:11放入队列...
正在生产数据...
将数据:data:12放入队列...
正在生产数据...
将数据:data:13放入队列...
正在生产数据...
将数据:data:14放入队列...
正在生产数据...
将数据:data:15放入队列...

3. DelayQueue

DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
  DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

4. PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

5. SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
  声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

转载于:https://www.cnblogs.com/Zombie-Xian/p/6347386.html

java.util.concurrent BlockingQueue详解相关推荐

  1. 将java.util.concurrent.BlockingQueue用作rx.Observable

    在Java中,经典的生产者-消费者模式相对简单,因为我们有java.util.concurrent.BlockingQueue . 为了避免繁忙的等待和容易出错的手动锁定,我们只需利用put()和ta ...

  2. java.util.concurrent.BlockingQueue指南

    2. 概述 在本文中,我们将介绍一个最有用的java.util.concurrent.BlockingQueue来解决并发生产者 - 消费者问题.我们可以看一下BlockingQueue 接口的API ...

  3. java utill scanner_(转)java.util.Scanner应用详解

    java.util.Scanner应用详解 java.util.Scanner是Java5的新特征,主要功能是简化文本扫描.这个类最实用的地方表现在获取控制台输入,其他的功能都很鸡肋,尽管Java A ...

  4. java的concurrent用法详解

    我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常需要有程序员独立完成代码实现,当然也有一些开源的框架提供了这些功能,但是这些依然没有JDK自带的功能使用起来方便.而当针对高质量Java ...

  5. 【集合类】 1 java.util.ConcurrentModificationException异常详解ArrayListCopyOnWriteArrayList原理探究

    环境:JDK 1.8.0_111 文章目录 概述 一.单线程情况下问题分析及解决方案 1.1 问题复现 1.2 问题原因分析 1.3 问题解决方案 二. 多线程情况下的问题分析及解决方案 2.1 问题 ...

  6. Java日志框架之JUL(java util logging)详解

    定义: JUL全称Java util logging,是java原生的日志框架,使用时不需要另外引入第三方类库,相对于其他框架使用方便,学习简单,能够在小型的应用中灵活使用. 架构: Applicat ...

  7. java.util.stream.Stream详解

    Stream(流)是一个支持顺序和平行聚合操作的元素序列,和java.util.List类似,是jdk1.8的新特性.可以看出Stream还有几个兄弟类IntStream.LongStream和Dou ...

  8. java.util.ResourceBundle使用详解

    为什么80%的码农都做不了架构师?>>>      2009-07-29 00:47:17     一.认识国际化资源文件 这个类提供软件国际化的捷径.通过此类,可以使您所编写的程序 ...

  9. 【转】java.util.ResourceBundle使用详解

    原文链接:http://lavasoft.blog.51cto.com/62575/184605/ 人家写的太好了,条理清晰,表达准确. 一.认识国际化资源文件 这个类提供软件国际化的捷径.通过此类, ...

最新文章

  1. 开源性能监控工具APM之Skywalking和Pinpoint的实测对比
  2. 历届智能车哪个组别最难,哪个竞争最激烈?
  3. oracle视图能增删改,oracle视图的增删改
  4. 开发日记-20190624 关键词 读书笔记《Linux 系统管理技术手册(第二版)》DAY 1
  5. 用Nginx如何配置运行无扩展名PHP文件或非.PHP扩展名文件
  6. Ubuntu终端多窗口分屏Terminator
  7. 计算机用于尖端科技,【判断题】用演绎法教问句的方法适用于中高级型学生
  8. 阿里云mysql5.7 窗口函数_关于阿里云centos版本,mysql5.7的一些注意事项
  9. Laravel 5 多个视图共享数据的方法
  10. SequoiaDB 系列之六 :源码分析之coord节点
  11. 深入浅出oracle锁原理篇
  12. 批量复制文件并改成有顺序的文件名
  13. ElasticSearch搜索底层基础原理总结
  14. Codeforces 71A Way Too Long Words
  15. C#-XML-数据传输
  16. 屏幕空间的动态全局光照(漫反射)
  17. 字节学妹的数据分析笔记,收藏
  18. 百度地图---之---行政区域划分
  19. 路由器与交换机的作用及区别
  20. 11.4王者荣耀服务器维护中,4月11日全服不停机更新公告

热门文章

  1. java web插件_javaweb项目插件实现机制
  2. Picasso-源码解析(三)
  3. 阿里云高级总监谈超大规模超高性能分布式快存储系统
  4. socks5协议RFC文档
  5. 在Exchange Server 2007中使用多主机名称证书
  6. 移植uboot第七步:支持DM9000
  7. python程序开发入门_Python开发入门14天集训营-第一章
  8. 【转载】yolo数据增强和评价方法
  9. openLDAP的编译安装以及配置
  10. 对C语言 结构体 和 结构变量