一、前言

  在完成Map下的并发集合后,现在来分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一个阻塞型队列,支持多任务并发操作,有了之前看源码的积累,再看ArrayBlockingQueue源码会很容易,下面开始正文。

二、ArrayBlockingQueue数据结构

  通过源码分析,并且可以对比ArrayList可知,ArrayBlockingQueue的底层数据结构是数组,数据结构如下

  说明:ArrayBlockingQueue底层采用数据才存放数据,对数组的访问添加了锁的机制,使其能够支持多线程并发。

三、ArrayBlockingQueue源码分析

  3.1 类的继承关系  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {}

  说明:可以看到ArrayBlockingQueue继承了AbstractQueue抽象类,AbstractQueue定义了对队列的基本操作;同时实现了BlockingQueue接口,BlockingQueue表示阻塞型的队列,其对队列的操作可能会抛出异常;同时也实现了Searializable接口,表示可以被序列化。

  3.2 类的属性  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 版本序列号private static final long serialVersionUID = -817911632652898426L;// 存放实际元素的数组final Object[] items;// 取元素索引int takeIndex;// 获取元素索引int putIndex;// 队列中的项int count;// 可重入锁final ReentrantLock lock;// 等待获取条件private final Condition notEmpty;// 等待存放条件private final Condition notFull;// 迭代器transient Itrs itrs = null;
}

View Code

  说明:从类的属性中可以清楚的看到其底层的结构是Object类型的数组,取元素和存元素有不同的索引,有一个可重入锁ReentrantLock,两个条件Condition。对ReentrantLock和Condition不太熟悉的读者可以参考笔者的这篇博客,【JUC】JDK1.8源码分析之ReentrantLock(三)。

  3.3 类的构造函数

  1. ArrayBlockingQueue(int)型构造函数 

    public ArrayBlockingQueue(int capacity) {// 调用两个参数的构造函数this(capacity, false);}

View Code

  说明:该构造函数用于创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。

  2. ArrayBlockingQueue(int, boolean)型构造函数  

    public ArrayBlockingQueue(int capacity, boolean fair) {// 初始容量必须大于0if (capacity <= 0)throw new IllegalArgumentException();// 初始化数组this.items = new Object[capacity];// 初始化可重入锁lock = new ReentrantLock(fair);// 初始化等待条件notEmpty = lock.newCondition();notFull =  lock.newCondition();}

View Code

  说明:该构造函数用于创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。

  3. ArrayBlockingQueue(int, boolean, Collection<? extends E>)型构造函数 

    public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {// 调用两个参数的构造函数this(capacity, fair);// 可重入锁final ReentrantLock lock = this.lock;// 上锁lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) { // 遍历集合// 检查元素是否为空
                    checkNotNull(e);// 存入ArrayBlockingQueue中items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) { // 当初始化容量小于传入集合的大小时,会抛出异常throw new IllegalArgumentException();}// 元素数量count = i;// 初始化存元素的索引putIndex = (i == capacity) ? 0 : i;} finally {// 释放锁
            lock.unlock();}}

View Code

  说明:该构造函数用于创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。

  3.4 核心函数分析

  1. put函数  

    public void put(E e) throws InterruptedException {checkNotNull(e);// 获取可重入锁final ReentrantLock lock = this.lock;// 如果当前线程未被中断,则获取锁
        lock.lockInterruptibly();try {while (count == items.length) // 判断元素是否已满// 若满,则等待
                notFull.await();// 入队列
            enqueue(e);} finally {// 释放锁
            lock.unlock();}}

View Code

  说明:put函数用于存放元素,在当前线程被中断时会抛出异常,并且当队列已经满时,会阻塞一直等待。其中,put会调用enqueue函数,enqueue函数源码如下  

    private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;// 获取数组final Object[] items = this.items;// 将元素放入items[putIndex] = x;if (++putIndex == items.length) // 放入后存元素的索引等于数组长度(表示已满)// 重置存索引为0putIndex = 0;// 元素数量加1count++;// 唤醒在notEmpty条件上等待的线程
        notEmpty.signal();}

View Code

  说明:enqueue函数用于将元素存入底层Object数组中,并且会唤醒等待notEmpty条件的线程。

  2. offer函数  

    public boolean offer(E e) {// 检查元素不能为空
        checkNotNull(e);// 可重入锁final ReentrantLock lock = this.lock;// 获取锁
        lock.lock();try {if (count == items.length) // 元素个数等于数组长度,则返回return false; else { // 添加进数组
                enqueue(e);return true;}} finally {// 释放数组
            lock.unlock();}}

View Code

  说明:offer函数也用于存放元素,在调用ArrayBlockingQueue的add方法时,会间接的调用到offer函数,offer函数添加元素不会抛出异常,当底层Object数组已满时,则返回false,否则,会调用enqueue函数,将元素存入底层Object数组。并唤醒等待notEmpty条件的线程。

  3. take函数  

    public E take() throws InterruptedException {// 可重入锁final ReentrantLock lock = this.lock;// 如果当前线程未被中断,则获取锁,中断会抛出异常
        lock.lockInterruptibly();try {while (count == 0) // 元素数量为0,即Object数组为空// 则等待notEmpty条件
                notEmpty.await();// 出队列return dequeue();} finally {// 释放锁
            lock.unlock();}}

View Code

  说明:take函数用于从ArrayBlockingQueue中获取一个元素,其与put函数相对应,在当前线程被中断时会抛出异常,并且当队列为空时,会阻塞一直等待。其中,take会调用dequeue函数,dequeue函数源码如下  

    private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")// 取元素E x = (E) items[takeIndex];// 该索引的值赋值为nullitems[takeIndex] = null;// 取值索引等于数组长度if (++takeIndex == items.length)// 重新赋值取值索引takeIndex = 0;// 元素个数减1count--;if (itrs != null) itrs.elementDequeued();// 唤醒在notFull条件上等待的线程
        notFull.signal();return x;}

View Code

  说明:dequeue函数用于将取元素,并且会唤醒等待notFull条件的线程。

  4. poll函数  

    public E poll() {// 重入锁final ReentrantLock lock = this.lock;// 获取锁
        lock.lock();try {// 若元素个数为0则返回null,否则,调用dequeue,出队列return (count == 0) ? null : dequeue();} finally {// 释放锁
            lock.unlock();}}

View Code

  说明:poll函数用于获取元素,其与offer函数相对应,不会抛出异常,当元素个数为0是,返回null,否则,调用dequeue函数,并唤醒等待notFull条件的线程。并返回。

  5. clear函数  

    public void clear() {// 数组final Object[] items = this.items;// 可重入锁final ReentrantLock lock = this.lock;// 获取锁
        lock.lock();try {// 保存元素个数int k = count;if (k > 0) { // 元素个数大于0// 存数元素索引final int putIndex = this.putIndex;// 取元素索引int i = takeIndex;do {// 赋值为nullitems[i] = null;if (++i == items.length) // 重新赋值ii = 0;} while (i != putIndex);// 重新赋值取元素索引takeIndex = putIndex;// 元素个数为0count = 0;if (itrs != null)itrs.queueIsEmpty();for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull条件的线程,则逐一唤醒
                    notFull.signal();}} finally {// 释放锁
            lock.unlock();}}

View Code

  说明:clear函数用于清空ArrayBlockingQueue,并且会释放所有等待notFull条件的线程(存放元素的线程)。

四、示例

  下面给出一个具体的示例来演示ArrayBlockingQueue的使用  

package com.hust.grid.leesf.collections;import java.util.concurrent.ArrayBlockingQueue;class PutThread extends Thread {private ArrayBlockingQueue<Integer> abq;public PutThread(ArrayBlockingQueue<Integer> abq) {this.abq = abq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("put " + i);abq.put(i);Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}
}class GetThread extends Thread {private ArrayBlockingQueue<Integer> abq;public GetThread(ArrayBlockingQueue<Integer> abq) {this.abq = abq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("take " + abq.take());Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}
}public class ArrayBlockingQueueDemo {public static void main(String[] args) {ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10);PutThread p1 = new PutThread(abq);GetThread g1 = new GetThread(abq);p1.start();g1.start();}
}

View Code

  运行结果:  

put 0
take 0
put 1
take 1
put 2
take 2
put 3
take 3
put 4
take 4
put 5
take 5
put 6
take 6
put 7
take 7
put 8
take 8
put 9
take 9

View Code

  说明:示例中使用了两个线程,一个用于存元素,一个用于读元素,存和读各10次,每个线程存一个元素或者读一个元素后都会休眠100ms,可以看到结果是交替打印,并且首先打印的肯定是put线程语句(因为若取线程先取元素,此时队列并没有元素,其会阻塞,等待存线程存入元素),并且最终程序可以正常结束。

  ① 若修改取元素线程,将存的元素的次数修改为15次(for循环的结束条件改为15即可),运行结果如下:  

put 0
take 0
put 1
take 1
put 2
take 2
put 3
take 3
put 4
take 4
put 5
take 5
put 6
take 6
put 7
take 7
put 8
take 8
put 9
take 9

View Code

  说明:运行结果与上面的运行结果相同,但是,此时程序无法正常结束,因为take方法被阻塞了,等待被唤醒。

五、总结

  总的来说,有了前面分析的基础,分析ArrayBlockingQueue就会非常的简单,ArrayBlockingQueue是通过ReentrantLock和Condition条件来保证多线程的正确访问的。ArrayBockingQueue的分析就到这里,欢迎交流,谢谢各位园友的观看~

转载于:https://www.cnblogs.com/leesf456/p/5533770.html

【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)相关推荐

  1. 【集合框架】JDK1.8源码分析之HashMap(一)

    转载自  [集合框架]JDK1.8源码分析之HashMap(一) 一.前言 在分析jdk1.8后的HashMap源码时,发现网上好多分析都是基于之前的jdk,而Java8的HashMap对之前做了较大 ...

  2. 【集合框架】JDK1.8源码分析HashSet LinkedHashSet(八)

    一.前言 分析完了List的两个主要类之后,我们来分析Set接口下的类,HashSet和LinkedHashSet,其实,在分析完HashMap与LinkedHashMap之后,再来分析HashSet ...

  3. JDK1.8源码分析:可重入锁ReentrantLock和Condition的实现原理

    synchronized的用法和实现原理 synchronized实现线程同步的用法和实现原理 不足 synchronized在线程同步的使用方面,优点是使用简单,可以自动加锁和解锁,但是也存在一些不 ...

  4. synchronousqueue场景_【JUC】JDK1.8源码分析之SynchronousQueue(九)

    一.前言 本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后 ...

  5. 【JUC】JDK1.8源码分析之ConcurrentHashMap

    一.前言 最近几天忙着做点别的东西,今天终于有时间分析源码了,看源码感觉很爽,并且发现ConcurrentHashMap在JDK1.8版本与之前的版本在并发控制上存在很大的差别,很有必要进行认真的分析 ...

  6. 【JUC】JDK1.8源码分析之ConcurrentLinkedQueue(五)

    一.前言 接着前面的分析,接下来分析ConcurrentLinkedQueue,ConcurerntLinkedQueue一个基于链接节点的无界线程安全队列.此队列按照 FIFO(先进先出)原则对元素 ...

  7. 【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer

    一.前言 在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.所以很有必 ...

  8. Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2

    https://blog.csdn.net/programmer_at/article/details/79715177 https://blog.csdn.net/qq_41737716/categ ...

  9. JUC AQS ReentrantLock源码分析

    Java的内置锁一直都是备受争议的,在JDK 1.6之前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略,但是与Lock相比synchronized还 ...

最新文章

  1. 暑期集训2:ACM基础算法 练习题A:CF-1008C
  2. centos6.9安装oracle10,2018-10-09 centos6.9 安装cx_Oracle
  3. 绝对养眼 Tech·Ed 2008大会SHOW GIRL动感热舞
  4. matlab用循环批量生成多个变量,神奇的eval()函数
  5. ftl数据类型转换以及list遍历的使用
  6. 杨幂掐点祝福唐嫣,打破不和传言,情感营销还能这么玩?
  7. java学习(104):字符串equals,charAt,endwith,startwith方法
  8. 这些面试题你需要知道
  9. 内存管理之memblock探寻
  10. PPT双屏抽奖,大气,能Hold住全场!与其它PPT内容可融为一体,实现无缝切换!
  11. Charles 导出所有request, response, cookie, 导出为HTTP Archive .har
  12. 如何用3D Max进行三维建模
  13. 学生社团管理系统(Java+Swing+mysql)(超简陋)
  14. 读取日志时发生乱码的解决方法
  15. UG二次开发GRIP标准件库
  16. sofa-seata
  17. python实现四种基本图形的面积计算 :圆形,长方形,正方形,梯形。
  18. 深入理解Java虚拟机——魔数与Class文件的版本
  19. Excel如何批量在空白单元格录入相同内容
  20. JAVA开源仿知乎问答源码

热门文章

  1. python3将列表当作队列使用
  2. eureka实例相关配置
  3. redis sorted_set数据类型常用命令及跳表skip_list原理
  4. MySQL搭建主从复制架构实战
  5. Java设计模式--使用内部类实现线程安全且懒加载的单例模式
  6. jQuery easyUI--layout布局页面
  7. 2015国产手机圈成绩单
  8. main()与_tmain()区别
  9. python从标准输入读取数据_在PYTHON中如何从标准输入读取内容stdin
  10. python排行_如何看待Python排名超越C++?