一、什么是阻塞队列:

阻塞队列最大的特性在于支持阻塞添加和阻塞删除方法:

  • 阻塞添加:当阻塞队列已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行加入元素操作。

  • 阻塞删除:但阻塞队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作

Java 中的阻塞队列接口 BlockingQueue 继承自 Queue 接口,因此先来看看阻塞队列接口为我们提供的主要方法:

public interface BlockingQueue<E> extends Queue<E> {// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)// 在成功时返回 true,如果此队列已满,则抛IllegalStateException。 boolean add(E e); // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量) // 如果该队列已满,则在到达指定的等待时间之前等待可用的空间,该方法可中断 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。 void put(E e) throws InterruptedException; //获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作 E take() throws InterruptedException; //获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束E poll(long timeout, TimeUnit unit) throws InterruptedException; //从此队列中移除指定元素的单个实例(如果存在)。 boolean remove(Object o);
}//除了上述方法还有继承自Queue接口的方法 //获取但不移除此队列的头元素,没有则跑异常NoSuchElementException E element(); //获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek(); //获取并移除此队列的头,如果此队列为空,则返回 null。 E poll();

这里我们把上述操作进行分类:

(1)插入方法:

  • add(E e):添加成功返回 true,失败抛 IllegalStateException 异常
  • offer(E e):成功返回 true,如果此队列已满,则返回 false
  • put(E e):将元素插入此队列的尾部,如果该队列已满,则一直阻塞

(2)删除方法

  • remove(Object o):移除指定元素,成功返回 true,失败返回 false
  • poll():获取并移除此队列的头元素,若队列为空,则返回 null
  • take():获取并移除此队列头元素,若没有元素则一直阻塞

(3)检查方法:

  • element() :获取但不移除此队列的头元素,没有元素则抛异常
  • peek() :获取但不移除此队列的头;若队列为空,则返回 null

二、阻塞队列的实现原理:

1、ArrayBlockingQueue:

1.1、数据结构:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 存储数据的数组 */final Object[] items;/**获取数据的索引,主要用于take,poll,peek,remove方法 */int takeIndex;/**添加数据的索引,主要用于 put, offer, or add 方法*/int putIndex;/** 队列元素的个数 */int count;/** 控制并非访问的锁 */final ReentrantLock lock;/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */private final Condition notEmpty;/** notFull条件对象,用于通知put方法队列未满,可执行添加操作 */private final Condition notFull;/** 迭代器 */transient Itrs itrs = null;
}

ArrayBlockingQueue 内部通过数组对象 items 来存储所有的数据,需要注意的是ArrayBlockingQueue 通过一个 ReentrantLock 来同时控制添加线程与移除线程的并发访问,这点与 LinkedBlockingQueue 区别很大(稍后会分析)。而对于 notEmpty 条件对象则是用于存放等待或唤醒调用 take() 方法的线程,告诉他们队列已有元素,可以执行获取操作。同理 notFull 条件对象是用于等待或唤醒调用 put() 方法的线程,告诉它们队列未满,可以执行添加元素的操作。takeIndex 代表的是下一个方法(take,poll,peek,remove)被调用时获取数组元素的索引,putIndex 则代表下一个方法(put, offer, or add)被调用时元素添加到数组中的索引。

1.2、阻塞添加:put() 

put() 方法特点是阻塞添加,当队列满时通过条件对象来阻塞当前调用 put() 方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况:一是队列已满,那么新到来的put 线程将添加到 notFull 的条件队列中等待;二是有移除线程执行移除操作,移除成功同时唤醒put线程。

具体代码如下:

//put方法,阻塞时可中断public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();//该方法可中断try {//当队列元素个数与数组长度相等时,无法添加元素while (count == items.length)//将当前调用线程挂起,添加到notFull条件队列中等待唤醒notFull.await();enqueue(e);//如果队列没有满直接添加。。} finally {lock.unlock();}}//入队操作
private void enqueue(E x) {//获取当前数组final Object[] items = this.items;//通过putIndex索引对数组进行赋值items[putIndex] = x;//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;if (++putIndex == items.length)putIndex = 0;count++;//队列中元素数量加1//唤醒调用take()方法的线程,执行元素获取操作。notEmpty.signal();
}

1.3、阻塞删除:take() 

take() 方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty 条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作,图示如下:

具体代码如下:

//从队列头部删除,队列没有元素就阻塞,可中断public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();//中断try {//如果队列没有元素while (count == 0)//执行阻塞操作notEmpty.await();return dequeue();//如果队列有元素执行删除操作} finally {lock.unlock();}}//删除队列头元素并返回private E dequeue() {//拿到当前数组的数据final Object[] items = this.items;@SuppressWarnings("unchecked")//获取要删除的对象E x = (E) items[takeIndex];将数组中takeIndex索引位置设置为nullitems[takeIndex] = null;//takeIndex索引加1并判断是否与数组长度相等,//如果相等说明已到尽头,恢复为0if (++takeIndex == items.length)takeIndex = 0;count--;//队列个数减1if (itrs != null)itrs.elementDequeued();//同时更新迭代器中的元素数据//删除了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操作notFull.signal();return x;}

2、LinkedBlockingQueue:

LinkedBlockingQueue 是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,但大小默认值为 Integer.MAX_VALUE,建议使用 LinkedBlockingQueue时手动传值,避免队列过大造成机器负载或者内存爆满等情况

2.1、数据结构:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/*** 节点类,用于存储数据*/static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}/** 阻塞队列的大小,默认为Integer.MAX_VALUE */private final int capacity;/** 当前阻塞队列中的元素个数 */private final AtomicInteger count = new AtomicInteger();/** 阻塞队列的头结点 */transient Node<E> head;/** 阻塞队列的尾节点 */private transient Node<E> last;/** 获取并移除元素时使用的锁,如take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */private final Condition notEmpty = takeLock.newCondition();/** 添加元素时使用的锁如 put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */private final Condition notFull = putLock.newCondition();
}

从上述可看成,每个添加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点,添加的链表队列中,其中 head 和 last 分别指向队列的头结点和尾结点。与 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,可以大大提高吞吐量。这里再次强调如果没有给 LinkedBlockingQueue 指定容量大小,其默认值将是 Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出。至于 LinkedBlockingQueue 的实现原理图与 ArrayBlockingQueue 是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的 Condition 条件对象作为等待队列,用于挂起 take 线程和 put 线程。

2.2、阻塞添加:put()

    public void put(E e) throws InterruptedException {//添加元素为null直接抛出异常if (e == null) throw new NullPointerException();int c = -1;//构建节点Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;//获取队列的个数final AtomicInteger count = this.count;putLock.lockInterruptibly();try {//判断队列是否已满,如果已满则阻塞当前线程while (count.get() == capacity) {notFull.await();}//添加元素并更新count值enqueue(node);c = count.getAndIncrement();//如果队列容量还没满,唤醒下一个添加线程,执行添加操作if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}//由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等待线程,因此count肯定会变化//这里的if条件表示如果队列中还有1条数据,由于队列中存在数据那么就唤醒消费锁if (c == 0)signalNotEmpty();}

这里的 put()方法做了两件事,第一件事是判断队列是否满,满了将当前线程加入等下队列,没满就将节点封装成 Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象 notFull 上的添加线程。第二件事是,判断是否需要唤醒等到在 notEmpty 条件对象上的消费线程。这里我们可能会有点疑惑,为什么添加完成后是继续唤醒在条件对象 notFull 上的添加线程而不是像 ArrayBlockingQueue 那样直接唤醒 notEmpty 条件对象上的消费线程?而又为什么要当 if (c == 0) 时才去唤醒消费线程呢?

  • (1)唤醒添加线程的原因:在添加新元素完成后,会判断队列是否已满,不满就继续唤醒在条件对象 notFull 上的添加线程,这点与前面分析的 ArrayBlockingQueue 很不相同,在ArrayBlockingQueue 内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为ArrayBlockingQueue 只用了一个 ReenterLock 同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于 LinkedBlockingQueue 来说就不一样了,其内部对添加线程和消费线程分别使用了各自的 ReenterLock 锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,注意消费线程的执行过程也是如此。这也是为什么 LinkedBlockingQueue 的吞吐量要相对大些的原因。
  • (2)为什么 if (c == 0) 时才去唤醒消费线程:这是因为消费线程一旦被唤醒,就一直处于消费的状态,直到队列为空才结束,所以 c 值是一直在变化的(c值是添加完元素前队列的大小),此时 c 只可能是等于0或大于0,如果是 c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是 c+1,那么有数据就直接唤醒等待消费线程,如果没有就结束啦,等待下一次的消费操作。如果 c>0 那么消费线程就不会被唤醒,只能等待下一个消费操作(poll、take、remove)的调用,那为什么不是条件 c>0 才去唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前 c>0,那么很可能上一次调用的消费线程后,数据并没有被消费完,条件队列上也就不存在等待的消费线程了,所以 c>0 唤醒消费线程得意义不是很大,当然如果添加线程一直添加元素,那么一直 c>0,消费线程执行的换就要等待下一次调用消费操作了(poll、take、remove)

2.3、阻塞删除:take()

public E take() throws InterruptedException {E x;int c = -1;//获取当前队列大小final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();//可中断try {//如果队列没有数据,挂机当前线程到条件对象的等待队列中while (count.get() == 0) {notEmpty.await();}//如果存在数据直接删除并返回该数据x = dequeue();c = count.getAndDecrement();//队列大小减1if (c > 1)notEmpty.signal();//还有数据就唤醒后续的消费线程} finally {takeLock.unlock();}//满足条件,唤醒条件对象上等待队列中的添加线程if (c == capacity)signalNotFull();return x;}

take() 方法是一个可阻塞可中断的移除方法,主要做了两件事,一是,如果队列没有数据就挂起当前线程到 notEmpty 条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程,二是尝试唤醒条件对象 notFull 上等待队列中的添加线程。

3、ArrayBlockingQueue 和 LinkedBlockingQueue 迥异:

通过上述的分析,对于 ArrayBlockingQueue 和 LinkedBlockingQueue 的基本使用以及内部实现原理我们已较为熟悉了,这里我们就对它们两间的区别来个小结:

  • (1)队列大小有所不同,ArrayBlockingQueue 是有界的初始化必须指定大小,而LinkedBlockingQueue 可以是有界的也可以是无界的(默认是 Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题
  • (2)数据存储容器不同,ArrayBlockingQueue 采用的是数组作为数据存储容器,而LinkedBlockingQueue 采用的则是以 Node 节点作为连接对象的链表
  • (3)创建与销毁对象的开销不同,ArrayBlockingQueue 采用数组作为存储容器,在插入或删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 则会生成一个额外的 Node 对象。在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  • (4)队列添加或移除的锁不一样,ArrayBlockingQueue 的锁是没有分离的,添加操作和移除操作采用同一个 ReenterLock 锁,而 LinkedBlockingQueue 的锁是分离的,添加采用的是 putLock,移除采用的是 takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

参考文章:https://blog.csdn.net/javazejian/article/details/77410889

JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue相关推荐

  1. Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

    转载自  Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析 Java中的阻塞队列接口BlockingQueue继承自Queue接口. Block ...

  2. Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

    1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过s ...

  3. 有界阻塞队列ArrayBlockingQueue和无界阻塞队列LinkedBlockingQueue

    ArrayBlockingQueue和LinkedBlockingQueue最大的区别是一个是有界无界,各有优劣. 先看实例代码: main函数起2个线程模拟生成消费者 import java.uti ...

  4. Java多线程-新特征-阻塞队列ArrayBlockingQueue

    阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素 ...

  5. java 多线程阻塞队列 与 阻塞方法与和非阻塞方法

    Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移 ...

  6. java多线程-阻塞队列BlockingQueue

    大纲 BlockingQueue接口 ArrayBlockingQueue 一.BlockingQueue接口 public interface BlockingQueue<E> exte ...

  7. 阻塞队列 java 源码_Java源码解析阻塞队列ArrayBlockingQueue常用方法

    本文基于jdk1.8进行分析 首先看一下ArrayBlockingQueue的成员变量.如下图.最主要的成员变量是items,它是一个Object类型的数组用于保存阻塞队列中的元素.其次是takeIn ...

  8. java 队列 array_Java源码解析阻塞队列ArrayBlockingQueue常用方法

    本文基于jdk1.8进行分析 首先看一下ArrayBlockingQueue的成员变量.如下图.最主要的成员变量是items,它是一个Object类型的数组用于保存阻塞队列中的元素.其次是takeIn ...

  9. JAVA可阻塞队列-ArrayBlockingQueue

    在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,就是ArrayBlockingQueue, ArrayBlockingQu ...

最新文章

  1. UA SIE545 优化理论基础1 凸分析2 仿射组合与仿射包
  2. Hebb负向规则与矛盾解对
  3. linux页表,arm linux 页表(转)
  4. linux prel安装_Linux下Perl的安装(转)
  5. 一份好的工作总结才能帮你升职加薪
  6. vim 底行命令模式下的全局命令 g(global)
  7. oracle10g启动顺序,oracle 10g rac维护:开机 关机顺序,流程
  8. [每日一题] 11gOCP 1z0-052 :2013-09-1 RMAN-- repair failure........................................A20...
  9. 循环自增_大学C语言—循环结构及应用
  10. UML部署图和构件图
  11. (16):Silverlight 2 数据与通信之JSON
  12. python抽取指定url页面的title_Python新手写爬虫全过程记录分析
  13. Java Netty 初步
  14. 解析DATASTAGE导出文件dsx和congnos的mdl文件
  15. Adobe Illustrator (AI)安装教程 (附安装包下载资源)
  16. Au 音频效果参考:振幅和压限
  17. Matlab 仿真——直流电机速度控制(1)直流电机建模
  18. java里什么是注释,全面解析Java中的注解与注释
  19. Tomcat部署及优化
  20. 媒体查询、px和rem转换、浏览器兼容、手机端视频播放解决方案

热门文章

  1. 二十五、爬取毛豆新车的数据
  2. 六十一、分析Springboot中的项目结构介绍
  3. 前端利用JS导出数据到Excel表 数字是文本类型 无法计算
  4. 史上最全推荐系统传统算法合集
  5. 对抗训练浅谈:意义、方法和思考(附Keras实现)
  6. 如何用最简单的方式理解傅立叶变换?
  7. KDD 2019 | 使用神经网络为A*搜索算法赋能:以个性化路径推荐为例
  8. 本周值得读的15篇AI论文,还有源码搭配服用
  9. NLP、CV、ML全覆盖,这份私藏论文清单你一定要看看
  10. acm公选课笔记 2020.3.31