Java Review - 并发编程_ArrayBlockingQueue原理源码剖析
文章目录
- 概述
- 类图结构
- 构造函数
- 主要方法源码解析
- offer操作
- put操作
- poll操作
- take操作
- peek操作
- size
- 小结
概述
Java Review - 并发编程_LinkedBlockingQueue原理&源码剖析
介绍了使用有界链表方式实现的阻塞队列LinkedBlockingQueue,这里我们继续来研究使用有界数组方式实现的阻塞队列ArrayBlockingQueue的原理。
类图结构
由该图可以看出,ArrayBlockingQueue
- 内部有一个数组items,用来存放队列元素
- putindex变量表示入队元素下标
- takeIndex是出队下标
- count统计队列元素个数
从定义可知,这些变量并没有使用volatile修饰,这是因为访问这些变量都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。
另外有个独占锁lock用来保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作。
另外,notEmpty、notFull条件变量用来进行出、入队的同步。
构造函数
ArrayBlockingQueue是有界队列,所以构造函数必须传入队列大小参数。
构造函数的代码如下。
public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}
由以上代码可知,在默认情况下使用ReentrantLock提供的非公平独占锁进行出、入队操作的同步。
主要方法源码解析
研究过LinkedBlockingQueue的实现后再看ArrayBlockingQueue的实现会感觉后者简单了很多
offer操作
向队列尾部插入一个元素,如果队列有空闲空间则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。
如果e元素为null则抛出NullPointerException异常。
另外,该方法是不阻塞的。
public boolean offer(E e) {// 1 checkNotNull(e);// 2 final ReentrantLock lock = this.lock;lock.lock();try {// 3 if (count == items.length)return false;else {// 4 enqueue(e);return true;}} finally {lock.unlock();}}
- 代码(1) 如果e元素为null则抛出NullPointerException异常
- 代码(2)获取独占锁,当前线程获取该锁后,其他入队和出队操作的线程都会被阻塞挂起而后被放入lock锁的AQS阻塞队列。
- 代码(3)判断如果队列满则直接返回false,否则调用enqueue方法后返回true,enqueue的代码如下
/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;// 6 元素入队final Object[] items = this.items;items[putIndex] = x;// 7 计算下一个元素应该存放的下标位置if (++putIndex == items.length)putIndex = 0;count++;// 8 notEmpty.signal();}
如上代码首先把当前元素放入items数组,然后计算下一个元素应该存放的下标位置,并递增元素个数计数器,最后激活notEmpty的条件队列中因为调用take操作而被阻塞的一个线程。
这里由于在操作共享变量count前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是从CPU缓存或者寄存器获取。
代码(5)释放锁,然后会把修改的共享变量值(比如count的值)刷新回主内存中,这样其他线程通过加锁再次读取这些共享变量时,就可以看到最新的值。
put操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回true,如果队列已满则阻塞当前线程直到队列有空闲并插入成功后返回true,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。
另外,如果e元素为null则抛出NullPointerException异常。
public void put(E e) throws InterruptedException {// 1 checkNotNull(e);final ReentrantLock lock = this.lock;// 2 获取锁 可被中断lock.lockInterruptibly();try {// 3 如果队列满,这把当前下层放入notFull管理的条件队列while (count == items.length)notFull.await();// 4 插入队列 enqueue(e);} finally {// 5 lock.unlock();}}
在代码(2)中,在获取锁的过程中当前线程被其他线程中断了,则当前线程会抛出InterruptedException异常而退出。
代码(3)判断如果当前队列已满,则把当前线程阻塞挂起后放入notFull的条件队列,注意这里也是使用了while循环而不是if语句。\
代码(4)判断如果队列不满则插入当前元素,此处不再赘述。
poll操作
从队列头部获取并移除一个元素,如果队列为空则返回null,该方法是不阻塞的。
public E poll() {// 1 final ReentrantLock lock = this.lock;lock.lock();try {// 2 return (count == 0) ? null : dequeue();} finally {lock.unlock();}}
代码(1)获取独占锁。
代码(2)判断如果队列为空则返回null,否则调用dequeue()方法
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")// 4 获取元素E x = (E) items[takeIndex];// 5 数组中的值为null items[takeIndex] = null;// 6 对头指针计算,队列元素个数减一if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 7 发送信号激活notFull条件队列中的一个线程notFull.signal();return x;}
由以上代码可知,首先获取当前队头元素并将其保存到局部变量,然后重置队头元素为null,并重新设置队头下标,递减元素计数器,最后发送信号激活notFull的条件队列里面一个因为调用put方法而被阻塞的线程。
take操作
获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。
public E take() throws InterruptedException {// 1 final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 2 队列为空则等待,直到队列中有数据 while (count == 0)notEmpty.await();// 3 获取头部元素return dequeue();} finally {// 4 lock.unlock();}
take操作的代码也比较简单,与poll相比只是代码(2)不同。
在这里,如果队列为空则把当前线程挂起后放入notEmpty的条件队列,等其他线程调用notEmpty.signal()方法后再返回。
需要注意的是,这里也是使用while循环进行检测并等待而不是使用if语句。
peek操作
获取队列头部元素但是不从队列里面移除它,如果队列为空则返回null,该方法是不阻塞的。
public E peek() {// 1 final ReentrantLock lock = this.lock;lock.lock();try {// 2 return itemAt(takeIndex); // null when queue is empty} finally {// 3 lock.unlock();}}
final E itemAt(int i) {return (E) items[i];}
peek的实现更简单,首先获取独占锁,然后从数组items中获取当前队头下标的值并返回,在返回前释放获取的锁。
size
计算当前队列元素个数。
public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}
size操作比较简单,获取锁后直接返回count,并在返回前释放锁。
也许你会问,这里又没有修改count的值,只是简单地获取,为何要加锁呢?
其实如果count被声明为volatile的这里就不需要加锁了,因为volatile类型的变量保证了内存的可见性,而ArrayBlockingQueue中的count并没有被声明为volatile的,这是因为count操作都是在获取锁后进行的,
而获取锁的语义之一是,获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。
小结
ArrayBlockingQueue通过使用全局独占锁实现了同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似于在方法上添加synchronized的意思。
其中offer和poll操作通过简单的加锁进行入队、出队操作,
而put、take操作则使用条件变量实现了,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。
另外,相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。
Java Review - 并发编程_ArrayBlockingQueue原理源码剖析相关推荐
- Java Review - 并发编程_ScheduledThreadPoolExecutor原理源码剖析
文章目录 概述 类结构 核心方法&源码解析 schedule(Runnable command, long delay,TimeUnit unit) scheduleWithFixedDela ...
- Java Review - 并发编程_LinkedBlockingQueue原理源码剖析
文章目录 概述 类图结构 主要方法 offer操作 概述 Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析 介绍了使用CAS算法实现的非阻塞队列C ...
- Java Review - 并发编程_DelayQueue原理源码剖析
文章目录 概述 类图结构 小Demo 核心方法&源码解读 offer操作 take操作 poll操作 size操作 小结 概述 DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个 ...
- Java Review - 并发编程_ThreadPoolExecutor原理源码剖析
文章目录 线程池主要解决两个问题 类关系图 ctl 含义 ---- 记录线程池状态和线程池中线程个数 线程池状态 及转换 线程池参数 线程池类型 mainLock & termination ...
- Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析
文章目录 概述 ConcurrentLinkedQueue 核心方法&源码解读 offer add poll peek size remove contains 总结 概述 JDK中提供了一系 ...
- Java Review - 并发编程_PriorityBlockingQueue原理源码剖析
文章目录 概述 类图结构 小Demo 核心方法&源码解析 offer poll put take size 概述 PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都 ...
- Java Review - 并发编程_原子操作类原理剖析
文章目录 概述 原子变量操作类 主要方法 incrementAndGet .decrementAndGet .getAndIncrement.getAndDecrement boolean compa ...
- Java Review - 并发编程_原子操作类LongAdder LongAccumulator剖析
文章目录 概述 小Demo 源码分析 重要的方法 long sum() reset sumThenReset longValue() add(long x) longAccumulate(long x ...
- Java Review - 并发编程_ 回环屏障CyclicBarrier原理源码剖析
文章目录 Pre 小Demo 类图结构 CyclicBarrier核心方法源码解读 int await() int await(long timeout, TimeUnit unit) int dow ...
最新文章
- c#devexpress GridContorl添加进度条
- linux 做双机热备
- python多级字典嵌套_python – 如何拆分字符串并形成多级嵌套字典?
- 多对多的属性对应表如何做按照类别的多属性匹配搜索
- 电脑格式化后需要重装系统吗_重装系统后c盘文件丢失,电脑重装系统后c盘文件能恢复吗...
- 少说话多写代码之Python学习009——字典的创建
- Linux设备树 .dtb文件,内核使用dtb文件的过程
- XML数据读取方式性能比较(一) (转)
- [CLR via C#]1.6 Framework类库~1.9与非托管代码的互操作性
- [转载]用SQL语句添加删除修改字段
- Outlook 2013中 IMAP配置
- C++简介(5)STL
- easyui模版html,EasyUI 模板(Template)_Vue EasyUI Demo
- 关于文本分类(情感分析)的英文数据集汇总
- Mac的游戏开发配置环境笔记
- int类型转换byte类型
- 微信小程序:文档下载功能
- 韩立刚《计算机网络》| 第7章 网络安全
- Jenkins使用问题记录
- python写入excel怎么跨列居中_python文件读写(三)-Excel表格三剑客xlwt,xlrd,xlutils...
热门文章
- bigquery sql 正则表达式
- Leetcode 38.外观数列 (每日一题 20210702)
- shell 指令集锦
- 【数学建模】MATLAB应用实战系列(九十一)-熵权法应用案例(附MATLAB和Python代码)
- 数据中台实战(六):交易分析
- 360手机麦克风测试软件,【奇酷小技巧】教你无需ROOT增大话筒、听筒和外放声音!...
- mysql设置数据库同步_MySQL数据库配置主从同步
- rnn词性标注算法_Python预测算法哪家强?权游龙妈是生还是凉凉?
- python赋值语句格式_Python中变量和变量赋值的几种形式
- #论文 《Deep Residual Learning for Image Recognition》