ArrayBlockingQueue源码分析
简介
一个先进先出的阻塞队列,且是一个阻塞队列,新元素从队列末尾加入,从队列头部开始检索操作。
这个队列一旦构造好,队列大小不可变。
类声明
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
核心字段
/** 内部维护的数组 */final Object[] items;/**下一个 take, poll, peek 或 remove操作的items的索引 */int takeIndex;/**下一个put,offer或add操作的索引*/int putIndex;/** 这个队列中元素的个数 */int count;/** 所有操作依赖的锁 */final ReentrantLock lock;/** take操作的非空Condition*/private final Condition notEmpty;/**put操作的非满Condition */private final Condition notFull;
核心方法
- 构造函数
/*** Creates an {@code ArrayBlockingQueue} with the given (fixed)* capacity and the specified access policy.** @param capacity 队列的大小,固定的* @param fair if {@code true} then queue accesses for threads blocked* on insertion or removal, are processed in FIFO order;* if {@code false} the access order is unspecified.* @throws IllegalArgumentException if {@code capacity < 1}*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
- 用于循环队列取索引的方法
/*** 循环队列的实现:对i进行加1操作*/final int inc(int i) {//i自增要越界的让它指向第0个元素return (++i == items.length) ? 0 : i;}/***循环队列的实现:对i进行减1操作*/final int dec(int i) {//i为零时,让i指向最后一个元素return ((i == 0) ? items.length : i) - 1;}
- 将Object类型转换为E的方法
static <E> E cast(Object item) {return (E) item;
}
下面介绍内部定义的几个方法,主要是为take()
,offer()
等方法服务的
/*** 在当前的putIndex位置插入元素并且发送非空信号* 在持有锁的情况下调用*/private void insert(E x) {items[putIndex] = x;putIndex = inc(putIndex);//保存下一个putIndex的位置++count;//队列元素加1notEmpty.signal();//发送非空信号 唤醒取元素线程}/***从takeIndex位置取元素 并发送信号 * 在持有锁的情况下调用*/private E extract() {final Object[] items = this.items;E x = this.<E>cast(items[takeIndex]);items[takeIndex] = null;//将当前位置的元素设为nulltakeIndex = inc(takeIndex);//保存下一个取元素的位置--count;//队列元素减1notFull.signal();//发送非满信号 唤醒存元素线程return x;//返回取到的值}
- 存数据的方法
首先看一下offer()
方法:
//返回是否插入成功
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)//如果队列满了,直接返回falsereturn false;else {insert(e);return true;}} finally {lock.unlock();}}
offer()
还有一个带timeout
的版本,可以设置一个超时等待时间
用法queue.offer(2,10,TimeUnit.SECONDS);
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {//满了if (nanos <= 0)//时间到了return false;//返回nanos>0,说明收到了signal()信号,继续while条件检测//返回<=0 说明时间到了nanos = notFull.awaitNanos(nanos);}insert(e);return true;} finally {lock.unlock();}}
解释一下awaitNanos()
方法:
/* 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否则调用该方法时会抛出IllegalMonitorStateException。nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;若指定时间内未收到通知,则返回0或负数。 */nanos = notFull.awaitNanos(nanos);
add()
方法是依赖offer()
方法实现的:
//返回true或者抛异常
public boolean add(E e) {return super.add(e);
}//in class AbstractQueue
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
put()
方法会在队列满的情况下进入阻塞:
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();//等待一个notFull信号,并释放锁//收到notFull()信号,会重新获得锁//并再一次执行一次while条件判断insert(e);} finally {lock.unlock();}}
- 取数据的方法
首先看下offer()
对应的版本poll()
//返回null或者一个元素public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : extract();} finally {lock.unlock();}}//含有超时时间设置的版本public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return extract();} finally {lock.unlock();}}
还有一个比较有意思的方法:peek()
,看一眼数据并不取出来:
//返回null或者takeIndex处的数据
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : itemAt(takeIndex);} finally {lock.unlock();}}
- 容量大小相关的
/*** 返回队列中的元素个数**/public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}//返回队列剩余的大小public int remainingCapacity() {final ReentrantLock lock = this.lock;lock.lock();try {return items.length - count;} finally {lock.unlock();}}
ArrayBlockingQueue源码分析相关推荐
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
一.前言 在完成Map下的并发集合后,现在来分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一个阻塞型队列,支持多任务并发操作,有了之前看源码的积累,再看Arra ...
- 阻塞队列(ArrayBlockingQueue) 迭代器源码分析
文章目录 为什么 ArrayBlockingQueue 迭代器复杂呢? 提出几个 ArrayBlockingQueue 迭代器的问题用于下面代码分析时进行思考 Itrs Itr 1.重要变量 2. 构 ...
- Java线程池 源码分析
1.个人总结及想法: (1)ThreadPoolExecutor的继承关系? ThreadPoolExecutor继承AbstractExectorService,AbstractExecutorSe ...
- Android多线程之ArrayBlockingQueue源码解析
阻塞队列系列 Android多线程之LinkedBlockingQueue源码解析 Android多线程之SynchronousQueue源码解析 Andorid多线程之DelayQueue源码分析 ...
- Java8 ArrayBlockingQueue 源码阅读
一.什么是 ArrayBlockingQueue ArrayBlockingQueue 是 GUC(java.util.concurrent) 包下的一个线程安全的阻塞队列,底层使用数组实现. 除了线 ...
- 深入源码分析Java线程池的实现原理
转载自 深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...
- Java高并发程序设计学习笔记(五):JDK并发包(各种同步控制工具的使用、并发容器及典型源码分析(Hashmap等))...
转自:https://blog.csdn.net/dataiyangu/article/details/86491786#2__696 1. 各种同步控制工具的使用 1.1. ReentrantLoc ...
- c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析
前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...
- 并发-阻塞队列源码分析
阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...
- Druid.io index_realtime实时任务源码分析
目录 前言 以[消防]工作来形象类比 实时任务大体流程介绍 Ingest 阶段 Persist 阶段 Merge 阶段 Hand off 阶段 任务的提交到启动 任务的提交 相关源码分析 任务队列和o ...
最新文章
- 一家两位Fellow大满贯!北大谢涛当选ACM Fellow,与胞兄谢源完成会师
- sa执行命令方法总结
- 【深度学习】新的深度学习优化器探索(协同优化)
- python学习系列day4-python基础
- 编程生涯 21 载,那些我踩过的坑
- SQL Server2014 SP2新增的数据库克隆功能
- IO、NIO、AIO
- K_Nearest_Neighbot(knn)方法及其Pyhon 实现
- 游戏筑基开发之测试篇(C语言)
- Kubernetes 小白学习笔记(2)--基本概念2
- A* 算法求解八数码问题
- Latex数学公式-矩阵中省略号的表示
- R语言绘图 | Venn图
- 西门子PLC学习笔记十-(计数器)
- 易签指纹签到系统测试文档
- android 很多牛叉布局github地址
- 如何检查Mac配备的显卡(GPU)?
- 正确处理时间和时区问题(java+mysql)
- Vue 3 中 v-if 和 v-show 指令实现的原理(源码分析)
- 关于stc,stm32,gd32单片机 isp 通过CAT1 ota升级的说明