1、特点:
当从这个队列中取元素时,它支持这样的操作:如果队列为空,那它就等待队列不为空时,再执行取操作。
当向这个队列中存元素时,它支持这样的操作:如果队列已满,那它就等待队列可用,再向队列中存放元素。

下面BlockingQueue中的方法支持这样的操作,当不同的方法操作不能立即被满足,但是在将来可能被满足:第一个抛出异常,第二个返回一个特殊的值(null或false,取决于哪种操作),第三块会使得当前线程无限期的等待,直到操作可以执行,第四块仅仅给定一个最大等待时间,一旦超过这个时间,就会放弃操作。

抛出异常 返回特殊值 阻塞 定时等待
插入 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

BlockingQueue不接受空值。如果试图用add, put, 或者offer去添加一个null值,则会抛出NullPointerException异常。如果poll 操作失败,则会返回一个空值。

BlockingQueue可能是有界的。在任何一个给定的时间,它都有一个剩余容量,如果超过这个容量,那么超出的元素将会被阻塞。如果BlockingQueue没有任何内在容量的约束,那么它的剩余容量将会一直是Integer.MAX_VALUE。

BlockingQueue的实现类主要是设计实现生产者-消费者队列,但是额外支持Collection接口。所以你可以利用remove(x)从队列中删除一个元素。但是这样的操作并不特别高效,仅仅是为了在特定的场景中使用,比如当队列中消息被取消时。

BlockingQueue的实现类是线程安全的。通过内在的锁或者其他的同步机制,所有的队列方法都能够原子性的操作。然而大量的集合操作例如:addAll,containsAll,retainsAll,以及removeAll都不一定会保证原子操作,除非在实现中以其他方式指定。所以当你使用addAll方法将集合c加入到队列中时,可能会抛出一个异常。

用典型的生产者和消费者为例,表明BlockingQueue可以被安全的用于多个生产者和消费者。

class Producer implements Runnable {private final BlockingQueue queue;Producer(BlockingQueue q) { queue = q; }public void run() {try {while (true) { queue.put(produce()); }} catch (InterruptedException ex) { ... handle ...}}Object produce() { ... }}class Consumer implements Runnable {private final BlockingQueue queue;Consumer(BlockingQueue q) { queue = q; }public void run() {try {while (true) { consume(queue.take()); }} catch (InterruptedException ex) { ... handle ...}}void consume(Object x) { ... }}class Setup {void main() {BlockingQueue q = new SomeQueueImplementation();//这里的具体实现类,接下来会总结到Producer p = new Producer(q);Consumer c1 = new Consumer(q);Consumer c2 = new Consumer(q);new Thread(p).start();new Thread(c1).start();new Thread(c2).start();}}

接下来我们看一下这个BlockingQueue接口中的方法,都有什么用处

返回值 方法 描述
boolean add(E e) 如果还有剩余容量,则向队列中插入一个元素并返回true。如果没有可用空间,则抛出IllegalStateException
boolean contains(Object o) 如果队列中包含这个元素返回true
int drainTo(Collection<? super E> c) 将队列中所有可用元素移动到这个Collection集合中
int drainTo(Collection<? super E> c,int maxElements) 将队列中最多maxElement个元素中移动到Collection集合中
boolean offer(E e) 如果还有剩余容量,则向队列中插入一个元素并返回true,如果没有剩余空间则返回false
boolean offer(E e, long timeout, TimeUnit unit) 在给定时间内将元素插入,返回true,否则返回false
E poll(long timeout, TimeUnit unit) 在给定时间内获取到队首元素,则返回队首元素,否则返回null
void put(E e) 如果可以,就将元素插入到队列中,否则就一直等待到可以插入
int remainingCapacity() 返回此队列在不阻塞的情况下,可以接受的其他元素的数量,如果没有内在的限制就返回Integer.MAX_VALUE
E take() 获取并移除队首元素,如果队列中没有元素可用,它会等待直到队列中有元素可用

了解了BlockingQueue接口的设计思想后,我们来看一下它的几个实现类吧。
下面是它的所有实现类:


简单介绍一下其中几个常用的实现类

1、ArrayBlockingQueue:内部基于数组实现的有界阻塞队列,按照先进先出(FIFO)的原则对元素进行排序,支持公平锁和非公平锁。由于对于生产者放入数据和消费者取出数据使用的是同一个锁,所以二者无法真正并行运行。同时由于数组中存放的直接是放入的数据,所以不会产生额外的对象实例。

2、LinkedBlockingQueue:内部基于链表实现的有界阻塞队列(默认Integer.MAX_VALUE),按照先进先出的顺序对元素存取,内部使用非公平锁对于生产者和消费者使用的是两把不同的锁所以生产者和消费者可以同步进行。由于是使用的链表结构,所以会生成Node节点对象实例,销毁的时候需要额外处理。

3、DelayQueue:内部基于非线程安全的优先队列实现的无界阻塞队列,内部使用非公平锁。DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。使用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

4、PriorityQueue:内部基于实现的无界阻塞队列(可进行扩容,默认11),内部使用非公平锁。 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。该队列不会阻塞生产者产生数据,当队列中没有数据时,会阻塞消费者。所以如果生产者的速度一旦超过消费者,会快速消耗内存资源。

5、SynchronousQueue:内部是基于无缓冲并且无界的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列;

7、LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

8、LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

下面就详细看一下ArrayBlockingQueue原理,然后简单说一下LinkedBlockingQueue。其他的阻塞队列源码就不介绍了,懂了一个,其他自然就容易看懂了。

1、ArrayBlockingQueue

其中重要的属性解释

 final Object[] items;    //内部存储数组int takeIndex;         //下一个要取走的元素的下标,用于remove、poll、take方法int putIndex;         //下一个要放入的元素的下标,用于add、offer、put方法int count;               //该队列中的元素final ReentrantLock lock; //保护所有通道的主锁private final Condition notEmpty;//Condition for waiting takes,用于阻塞和唤醒take操作private final Condition notFull;// Condition for waiting puts,用于阻塞和唤醒put操作

接下来是构造函数,从构造函数中我们可以看出ArrayBlockingQueue支持公平锁和非公平锁。

public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}

默认构造函数是非公平的

public ArrayBlockingQueue(int capacity) {this(capacity, false);}

还支持将Collection集合类中的元素直接放到ArrayBlockingQueue的items数组中,进行初始化。

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c){....
}

1、add方法

public boolean add(E e) {return super.add(e);}
//找到父类操作
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}

可以看出,它是通过offer方法添加到数组中,如果已满,则抛出异常。
2、offer方法

public boolean offer(E e) {Objects.requireNonNull(e);ReentrantLock lock = this.lock;lock.lock();boolean var3;try {if (this.count != this.items.length) {//这里确保可以添加到数组中去this.enqueue(e); //offer通过enqueue方法添加到数组var3 = true;return var3;    //成功返回true}var3 = false;    //否则标记为false} finally {lock.unlock();}return var3;      //返回false}
private void enqueue(E e) {Object[] items = this.items;items[this.putIndex] = e;  //这里putIndex即为下一个可放元素的下标if (++this.putIndex == items.length) { //如果下一个下标越界,则返回数组头部this.putIndex = 0;  //这里恰好构成一个循环,从第一个一直加到最后一个,然后返回头部继续添加}++this.count;this.notEmpty.signal();   //这里用来唤醒因数组为空而进入等待状态的方法}

3、put方法

public void put(E e) throws InterruptedException {Objects.requireNonNull(e);ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while(this.count == this.items.length) {//如果已满,则释放锁等待唤醒this.notFull.await();}this.enqueue(e);  //这里put方法也是通过enqueue方法添加元素} finally {lock.unlock();}}

这里对比一下这三个添加方法,虽然前面已经总结过了,但是看过源代码后,应该更直观一些。

方法 是否允许插入空值 队列元素已满
add(E e) 不允许 抛出IllegalStateException异常
offer(E e) 不允许 返回false
put(E e) 不允许 释放锁,等待唤醒

很明显,这三个方法,第一步都对插入元素进行了判空,如果为空则抛出异常。

4、remove方法

public boolean remove(Object o) {if (o == null) {   //如果要移除的元素为空,返回falsereturn false;} else {ReentrantLock lock = this.lock;lock.lock();try {if (this.count > 0) {  //如果数组元素数目大于0Object[] items = this.items;int i = this.takeIndex;int end = this.putIndex;int to = i < end ? end : items.length;label96:while(true) {while(i >= to) {if (to == end) {break label96;}i = 0;to = end;}if (o.equals(items[i])) { //找到元素并移除,返回truethis.removeAt(i);boolean var7 = true;return var7;}++i;}}boolean var11 = false;   //找不到元素返回falsereturn var11;} finally {lock.unlock();}}}

5、poll方法

public E poll() {ReentrantLock lock = this.lock;lock.lock();Object var2;try {var2 = this.count == 0 ? null : this.dequeue();//如果没有元素可取,返回null,否则返回队首元素} finally {lock.unlock();}return var2;}
private E dequeue() {           //利用这个方法来取走队首元素Object[] items = this.items;E e = items[this.takeIndex];items[this.takeIndex] = null;//这里仅需将队首元素赋值为空,其内存交由GC回收if (++this.takeIndex == items.length) {this.takeIndex = 0;}--this.count;if (this.itrs != null) {this.itrs.elementDequeued();}this.notFull.signal();  //这里唤醒因为队列已满而等待的方法。return e;}

5、take方法


public E take() throws InterruptedException {ReentrantLock lock = this.lock;lock.lockInterruptibly();Object var2;try {while(this.count == 0) {  //如果没有元素,则等待唤醒this.notEmpty.await();}var2 = this.dequeue();   //如果有元素则取走队首元素} finally {lock.unlock();}return var2;}

这里再对比一下remove、poll、take方法取走元素时,队列为空的情况

方法 队列为空
remove() 返回false
poll() 返回null
take() 等待唤醒

以上就是对ArrayBlockingQueue添加和删除方法的介绍

2、LinkedBlockingQueue
存储结构(单向链表)

static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }
}

属性

private final int capacity;//链表的最大容量
private final AtomicInteger count;//链表中插入的数据个数
transient LinkedBlockingQueue.Node<E> head;//链表头结点指针
private transient LinkedBlockingQueue.Node<E> last;//链表尾节点指针
private final ReentrantLock takeLock;//取锁
private final Condition notEmpty;
private final ReentrantLock putLock;//放锁
private final Condition notFull;

方法和ArrayBlockingQueue实现原理类似,不同的是ArrayBlockingQueue基于数组实现,LinkedBlockingQueue基于链表实现,所以对元素的操作会有所不同。

参考:https://www.cnblogs.com/WangHaiMing/p/8798709.html
参考:https://www.cnblogs.com/KingIceMou/p/8075343.html

详解BlockingQueue相关推荐

  1. java线程池使用最全详解

    线程池使用 前言 在执行一个异步任务或并发任务时,往往是通过直接new Thread()方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池,线程池的优势很明显,如下: 降低系统资源消 ...

  2. BlockingQueue(阻塞队列)详解

    推荐:Java并发编程汇总 BlockingQueue(阻塞队列)详解 原文地址 BlockingQueue 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程 ...

  3. ThreadPoolExecutor运转机制及BlockingQueue详解

    1.ThreadPoolExecutor的构建参数 最近发现几起对ThreadPoolExecutor的误用,其中包括自己,发现都是因为没有仔细看注释和内部运转机制,想当然的揣测参数导致,先看一下新建 ...

  4. 高并发编程_高并发编程系列:7大并发容器详解(附面试题和企业编程指南)...

    不知道从什么时候起,在Java编程中,经常听到Java集合类,同步容器.并发容器,高并发编程成为当下程序员需要去了解掌握的技术之一,那么他们有哪些具体分类,以及各自之间的区别和优劣呢? 只有把这些梳理 ...

  5. Java线程池详解学习:ThreadPoolExecutor

    Java线程池详解学习:ThreadPoolExecutor Java的源码下载参考这篇文章:Java源码下载和阅读(JDK1.8) - zhangpeterx的博客 在源码的目录java/util/ ...

  6. ScheduledThreadPoolExecutor详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文主要分为两个部分,第一部分首先会对ScheduledThreadPoolExecutor进行简单的介绍,并且会介绍其主要A ...

  7. Executor框架的详解(转载)

    在Java中,使用线程来异步执行任务.Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源.同时,为每一个任务创建一个新线程来执行 ...

  8. GitHub轻松阅读微服务实战项目流程详解【第二天:API网关的设计与实现】

    Two Day 1.配置文件精解 (1)bootstrap.yml文件 (2)nacos中关于gateway的配置信息 (3)applicaton.properties白名单配置 2.代码详解 (1) ...

  9. java多线程学习-java.util.concurrent详解

    http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...

最新文章

  1. RocketMQ:NameServer架构设计以及启动关闭流程源码分析
  2. cron和crontab
  3. 学习C++的五十个建议(转
  4. 迁移学习之域自适应理论简介(Domain Adaptation Theory)
  5. TCM与Cache介绍
  6. C# 8 新增小功能
  7. 【LCT】遥远的国度(P3979)
  8. Python+sklearn使用支持向量机算法实现数字图片分类
  9. contenttype文件ajax_jquery ajax contentType设置
  10. 【python】59个Python使用技巧,从此你的Python与众不同(一)
  11. 揭露微信朋友圈当中出现刷票群0.01一票微信号、刷票0.01一票微信号的虚假面目
  12. 当tomcat启动遇到(你的项目名字) is required and cannot be removed from the server(不能部署到server上)
  13. Unity-Photon Pun2个人总结
  14. C++ -Pointer指针总结(一)
  15. 案例6-1.3 哥尼斯堡的“七桥问题”
  16. obs多推流地址_腾讯推流直播教程OBS下载、安装、使用
  17. Windows家庭版添加本地组策略编辑器的方法
  18. b2b2c 电子商务平台涉及的技术、运营方案
  19. 拼多多面试问了数据库基础知识,今天分享出来
  20. 在公司三年跌宕起伏的经历

热门文章

  1. 希冀平台1-5:针对salaries表emp_no字段创建索引idx_emp_no,查询emp_no为10005, 使用强制索引。 CREATE TABLE `salaries` ( `emp_no`
  2. VS中C++导入并使用DLL文件使用步骤
  3. 在misc中涉及的二维码
  4. 使用vba进行excel超链接设置(链接到当前文档某一单元格)
  5. Win10使用VS2017安装Caffe详细总结
  6. PRA10.3平台API接口调用
  7. 快进来看王冰冰!青年大学习提醒系统来了!!
  8. java map扩容机制_Java HashMap的原理、扩容机制、以及性能思考
  9. 防止用户调整微信浏览器字体大小导致的显示异常
  10. LeetCode 1646. 获取生成数组中的最大值