(接上文《源码阅读(33):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(3)》)

2.3.3.3、forEachRemaining() 方法

forEachRemaining(Consumer<? super E> action) 方法是JDK 1.8+之后的版本新增的一个方法,是java.util.Iterator接口中定义的一个新方法,该方法类似于java.lang.Iterable接口定义的forEach(Consumer<? super T> action)方法。但是两者是有区别的:

  • forEach(Consumer<? super T> action)方法在java.lang.Iterable接口中被定义,表示一个可迭代的java Collection Framework具体集合类(实际上java.lang.Iterable接口比java.util.Collection接口层次更低)。调用者对具体集合类的forEach方法调用多少次,后者就会执行多少次
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
queue.add("1");
// ......
queue.add("6");
queue.add("7");
// 第一次执行forEach,并且会执行
queue.forEach(item -> {// ......
});
// 第二次执行forEach,并且会执行
queue.forEach(item -> {// ......
});
  • forEachRemaining(Consumer<? super E> action) 方法在java.util.Iterator接口中被定义,后者表示一个具体迭代器的实现。无论调用者对具体迭代的的forEachRemaining(Consumer<? super E> action) 方法调用多少次,后者只会执行一次
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
queue.add("1");
// ......
queue.add("7");
// 创建一个迭代器
Iterator<String> itr = queue.iterator();
// 第一次执行forEachRemaining,并且会执行
itr.forEachRemaining(item -> {// ......
});
// 第二次执行forEachRemaining,但是会执行
itr.forEachRemaining(item -> {// ......
});

究其原因,是因为forEachRemaining(Consumer<? super E> action) 方法默认基于迭代器的hasNext()方法和next()方法配合进行工作,在迭代器对象完成所有的数据的遍历后,第二次调用同一个迭代器对象的forEachRemaining(Consumer<? super E> action) 方法,当然就不会执行了。以下是forEachRemaining方法的默认实现:

public interface Iterator<E> {// ......default void forEachRemaining(Consumer<? super E> action) {// Consumer过程必须定义,否则就抛出异常Objects.requireNonNull(action);// 这里就是hasNext()方法和next()方法的配合工作while (hasNext()) {action.accept(next());} } // ......
}

由于ArrayBlockingQueue内部的循环数组结构和多线程场景下的工作要求,所以ArrayBlockingQueue队列的迭代器中,对forEachRemaining方法的定义进行了调整,如下所示:

private class Itr implements Iterator<E> {// ......public void forEachRemaining(Consumer<? super E> action) {Objects.requireNonNull(action);final ReentrantLock lock = ArrayBlockingQueue.this.lock;lock.lock();try {final E e = nextItem;if (e == null) return;if (!isDetached()) {incorporateDequeues();}// 以上代码片段是操作方法获取到了ArrayBlockingQueue队列的操作权后的规范化处理过程// 这里就不再进行赘述。我们主要从action.accept(e);这句代码开始介绍// 该语句将上次next()方法记录的nextItem数据输出给下一消费者(Consumer)action.accept(e);// 这里还要再进行一次是否过期的判定,因为incorporateDequeues()方法运行后,当前Itr迭代器可能已经无法取数// 具体的场景可参见上一篇文章的描述if (isDetached() || cursor < 0) {return;}final Object[] items = ArrayBlockingQueue.this.items;// 接着基于当前有效的cursor游标位置,开始对ArrayBlockingQueue队列中还没有遍历(且可遍历)的数据进行遍历for (int i = cursor, end = putIndex, to = (i < end) ? end : items.length; ; i = 0, to = end) {for (; i < to; i++) {action.accept(itemAt(items, i));}if (to == end) break;}} finally {// 当完成所有数据的遍历后(无论成功还是失败)// 将当前迭代器设置为“独立/无效”工作模式cursor = nextIndex = lastRet = NONE;nextItem = lastItem = null;detach();lock.unlock();}}// ......
}

2.3.4、Itrs迭代器组的清理过程

本小节我们来详细讲解一下Itrs迭代器组的清理过程。上文已经提到,ArrayBlockingQueue队列集合中所有的迭代器都在Itrs迭代器组中进行管理,这些迭代器将在Itrs迭代器组中以单向链表的方式进行排列。所以ArrayBlockingQueue队列需要在特定的场景下,对已经失效、甚至已经被垃圾回收的迭代器管理节点进行清理。

例如,当ArrayBlockingQueue队列有新的迭代器被创建时(并为非独立/无效工作模式),Itrs迭代器组就会尝试清理那些无效的迭代器,其工作逻辑主要由Itrs.doSomeSweeping(boolean)方法进行实现,代码片段如下所示:

/*** 该方法负责对迭代器管理组Itrs进行清理。如果清理过程中发现了某个迭代器管理节点Itrs.Node* 需要被清理,则扫除过程会更努力的打扫后续的节点——“小扫除”变“大扫除”,当前扫除次数也会被重置* @param tryHarder 该方法采用“大扫除”还是“小扫除”方式进行清理。为true的时候,表示使用“大扫除”模式*/
void doSomeSweeping(boolean tryHarder) {// assert lock.isHeldByCurrentThread();// assert head != null;// 小扫除只会对单向链表清理4次,大扫除会至少清理16次int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;Node o, p;// 从这个变量的单词就可以知道,这是一台扫除车// 这台扫除车,将顺着迭代器组中的单向队列“向前开”final Node sweeper = this.sweeper;// 这是一个标记,最直白的理解就是“扫除车”是否开车boolean passedGo; // sweeper(垃圾车)为null的情况,主要的场景是Itrs迭代器组中没有迭代器对象// 也可能是上次的清理操作已经将单向链表中所有的Node节点扫除完毕if (sweeper == null) {// 从当前单向链表的第一个Node节点开始,顺着链表向后打扫o = null;p = head;passedGo = true;} else {// 从上次扫除结束的Node节点开始,继续顺着单向链表向后打扫o = sweeper;p = o.next;passedGo = false;}// ============ 以上过程决定“扫除车”从哪个地方开始打扫。以下就开始进行具体的打扫了。// 首先根据之前已经确认的扫除次数,决定是“大扫除”还是“小扫除”,16次清扫(循环)还是4次清扫(循环)// 注意,如果在清扫过程中,发现已经被回收或者已经“无效”的迭代器对象,则“小扫除”会变成“大扫除”for (; probes > 0; probes--) {// 如果条件成立,说明当前迭代器组Itrs集合中没有任何迭代器对象,不需要进行扫除if (p == null) {if (passedGo) {break;}// 否则当p(开始扫除的Itrs.Node位置)为null时,就从单向链表的头节点,开始扫除o = null;p = head;passedGo = true;}// ==========   每一次扫除处理,都会做以下操作:// 取得当前这个Itrs.Node位置上所关联的迭代器(注意这里是一个弱引用)final Itr it = p.get();// 取得当前Itrs.Node位置的下一个Itrs.Node位置final Node next = p.next;// 如果条件成立,说明当前Itrs.Node需要被清理// 那么就是用if块中的代码,进行清理,并且如果发现了这种场景,扫除车就会努力的进行后续的清理// 这就是该方法在最开始处提到的“更加努力的做后续清理”。否则,这个节点就不需要被清理if (it == null || it.isDetached()) {// found a discarded/exhausted iterator// 更加努力的做后续清理,“小扫除”会变成“大扫除”,且扫除次数会被重置probes = LONG_SWEEP_PROBES; // "try harder"// unlink p// 在it.isDetached()成立的场景下,主动清理这个弱引用p.clear();// 断开p节点和单向链表后续节点的连接关系p.next = null;// 如果条件成立,则说明当前清扫的节点是单向链表的头节点// 那么需要重新设定头节点为当前被清扫节点的后续结点。// 如果都没有后续结点了,那么说明迭代器管理组中就没有任何节点了,也就没有必要继续清扫下去(return退出)if (o == null) {head = next;if (next == null) {// We've run out of iterators to track; retireitrs = null;return;}}// 其它场景下,则通过该语句将当前被清扫的p节点彻底断开和单向链表各节点的关系else {o.next = next;}} else {o = p;}p = next;}// 扫除结束后,根据最后p节点的引用情况,决定扫除车是停留在扫除结束节点上// 还是设置为null。核心思路是,如果当前单向链表的所有Itrs.Node都扫除了一次,则扫除车没有存在的必要了// 否则让扫除车停留在扫除结束的位置上,以便下一次清扫请求被触发时,继续向后进行打扫this.sweeper = (p == null) ? null : o;
}

通过以上方法的详细解读,我们知道了迭代器管理组Itrs对其中迭代器对象的清理,主要包括以下关键点:

  • 首先根据当前sweeper扫除车的状态,决定本次扫除的开始位置——在单向链表中扫除的开始位置
  • 在确认完扫除开始位置后,再依次进行扫除。扫除模式分为“小扫除”和“大扫除”。
  • “小扫除”的定义是从扫除开始的位置,向后扫描最多4个节点。“小扫除”的目的在于节约操作步骤的同时,校验链表中的迭代器部分正确。
  • “大扫除”的定义是从扫除开始的位置,至少向后扫描16个节点。“大扫除”的目的是保证链表中依然有效的迭代器的管理准确性。
  • “小扫除”模式可以转变为“大扫除”模式,既是“小扫除”过程中,发现其中一个Itrs.Node节点已经无效(为null或者isDetached()为true)

可以用下图来表示doSomeSweeping(boolean)方法的主要清扫场景:

3、ArrayBlockingQueue队列的主要构造函数

ArrayBlockingQueue队列中一共有三个构造函数,如下所示:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {// ....../*** 该构造函数给定一个capacity大小,以便设定ArrayBlockingQueue队列中环形数组的最大容量* (也就是)ArrayBlockingQueue队列的最大容量* 注意:如果capacity < 1,则会抛出异常*/public ArrayBlockingQueue(int capacity) {this(capacity, false);}/*** 该构造函数给定两个值,进行ArrayBlockingQueue队列的实例化* @param capacity 当前ArrayBlockingQueue队列的最大容量* @param fair 是否启用公平锁方式,默认情况下不启用* @throws IllegalArgumentException if {@code capacity < 1}*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}/*** 该构造函数给定三个值,进行ArrayBlockingQueue队列的实例化* @param capacity 当前ArrayBlockingQueue队列的最大容量* @param fair 是否启用公平锁方式,默认情况下不启用* @param c 这是一个外部集合,这个集合不能为null否则要报错。* 这些集合中的数据将会按照特定的顺序被复制到ArrayBlockingQueue队列中*/public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {final Object[] items = this.items;int i = 0;try {for (E e : c)items[i++] = Objects.requireNonNull(e);} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}// ......
}

ArrayBlockingQueue队列自身的方法,相较于其下的Itr迭代器和Itrs迭代器分组而言,就要简单许多了。例如其三个构造函数,不需要进行逐行注释说明,读者就能看懂其中的意义了。

这里只需要特别注意的是构造函数中创建的两个condition对象,关于condition对象的详细介绍已经在讲解AQS的章节中进行了详细说明,这里notEmpty对象负责在ArrayBlockingQueue队列至少有一个数据的场景下,通知可能处于阻塞状态的消费者线程结束阻塞状态;notFull对象作用正好相反,它负责在ArrayBlockingQueue队列至少有一个空余的索引位可以放入新的数据时,通知可能处于阻塞状态的生产者线程结束阻塞状态。(后文讲解ArrayBlockingQueue队列的具体方法时,会涉及这些过程的详细介绍)

4、主要方法

ArrayBlockingQueue队列实现了java.util.concurrent.BlockingQueue接口,总的来说ArrayBlockingQueue队列中的常用方法遵循相同的处理逻辑,区别点主要在于不能正常操作时的处理方式。这里我们选择几个具有代表性的操作方法进行介绍:

4.1、offer(E e) 方法

根据官方的描述,offer(E e) 方法的主要工作过程是将特定的数据添加到队列尾部,这个数据不能为null。如果添加操作成功,则返回true,其他情况(添加失败)则返回false。

public boolean offer(E e) {// 进行添加的数据对象,不能为nullObjects.requireNonNull(e);// 获取队列集合的操作权限final ReentrantLock lock = this.lock;lock.lock();try {// 如果条件成立,说明ArrayBlockingQueue队列集合// 已经没有多余的空间进行添加操作,则返回falseif (count == items.length) {return false;}// 这个else多余了else {// 使用enqueue方法进行新的数据对象添加// 最后返回trueenqueue(e);return true;}} finally {lock.unlock();}
}

4.2、put(E e) 方法

和offer(E e) 方法类似的,还有put(E e) 方法,两者的区别是:如果ArrayBlockingQueue队列集合不能(已经没有多余的空间)进行添加操作,那么put(E e) 方法将进入阻塞状态,直到被唤醒并能够进行添加操作为止。代码片段如下:

/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.*/
public void put(E e) throws InterruptedException {Objects.requireNonNull(e);final ReentrantLock lock = this.lock;// 获取到操作权后,才能进行后续操作lock.lockInterruptibly();try {// 如果条件成立,说明当前队列中已经没有空间进行添加操作// 那么进入阻塞状态while (count == items.length) {notFull.await();}// 通过enqueue方法进行添加操作enqueue(e);} finally {lock.unlock();}
}

lock.lock()方法和lock.lockInterruptibly()方法的区别,在之前介绍AQS的文章中已经进行了说明。这里再做一次简单说明:lockInterruptibly()方法在获取锁之前会确认线程中断信号(Thread.interrupted()),如果收到线程中断信号,则会抛出InterruptedException 异常;而lock()方法不会考虑线程中断信号的问题。

4.3、E take() 方法

take()方法可以从ArrayBlockingQueue队列头部获取一个数据对象,如果当前ArrayBlockingQueue队列已经没有数据对象可以获取,则进入阻塞状态。该方法中实际获取数据对象的方法,是前文中已经介绍过的dequeue()方法。

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}
}

========
(ArrayBlockingQueue完,接后文《源码阅读(34):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(4)》)

源码阅读(34):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(4)相关推荐

  1. java中arraycopy的用法_[jdk源码阅读系列]Java中System.arraycopy()的用法

    本文转载,原文链接: 3分钟了解Java中System.arraycopy的用法 - 伊万夫斯基 - 博客园  https://www.cnblogs.com/benjieqiang/p/114288 ...

  2. 源码阅读(32):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(2)

    (接上文<源码阅读(31):Java中线程安全的Queue.Deque结构--ArrayBlockingQueue(1)>) 本篇内容我们专门分析ArrayBlockingQueue中迭代 ...

  3. 【源码阅读】Java集合之一 - ArrayList源码深度解读

    Java 源码阅读的第一步是Collection框架源码,这也是面试基础中的基础: 针对Collection的源码阅读写一个系列的文章,从ArrayList开始第一篇. ---@pdai JDK版本 ...

  4. mysql thread conn_MySQL源码阅读2-连接与线程管理

    本篇是第二篇,MySQL初始化完成之后,便进入一个死循环中,接受客户端请求,并完成客户端的命令(如果在window下启动多个listener,则分别启动线程监听).该篇介绍MySQL服务中的连接与线程 ...

  5. 通过跟踪源码证明在Java中通过执行Start()方法创建线程

    /**** 线程创建跟踪*/ public class ThreadCreate {public static void main(String[] args) {new Thread(()-> ...

  6. 源码阅读之Java栈的实现

    0x00 栈 栈是 Last-In-First-Out (后进先出)的线性表.对栈的操作主要有两个:入栈(push)和出栈(pop).因此它也是一种操作受限的线性表.尽管如此,它在计算机中应用非常广泛 ...

  7. 【一起读源码】1. Java 中元组 Tuple

    1.1 问题描述 使用 Java 做数据分析.机器学习的时候,常常需要对批量的数据进行处理,如果需要处理的数据的维度不超过10时,可以考虑使用 org.javatuples 提供的 Tuple 类工具 ...

  8. Tomcat源码阅读---ServletContext.java(小白写作,持续更新)

    everybody,想我没,刚考完操作系统,我胡汉三又回来啦!!!掌声!!!鲜花!!!接着奏乐,接着舞!!!来个大的!不要说我不够意思,这个ServletContext读完的确是让我对于web程序有了 ...

  9. 数据结构中缀表达式转后缀表达式与后缀表达式的求值实训报告_动图+源码,演示 Java 中常用数据结构执行过程及原理...

    程序员的成长之路互联网/程序员/成长/职场 关注 阅读本文大概需要 3.7 分钟. 作者:大道方圆cnblogs.com/xdecode/p/9321848.html 最近在整理数据结构方面的知识, ...

最新文章

  1. 从客户端(...)中检测到有潜在危险的Request.Form 值的处理办法
  2. js判断字符串包含某个字符_python判断字符串以什么开始
  3. IOI 2007 Sail (线段树+贪心)
  4. SVG技术入门:线条动画实现原理
  5. RS485串口光端机产品功能特点介绍
  6. 一个逼格很低的appium自动化测试框架
  7. [常用知识]如何在Eclipse、myEclipse中分别配置Tomcat和JBoss应用服务器
  8. 产品经理必备利器:UML
  9. 2020年领导最满意的可视化工具!分分钟吊打python
  10. 罗永浩直播首秀开卖小米 10;微信能转账 QQ;Ruby 2.7.1 发布 | 极客头条
  11. java 月度相减_java根据日期获取月龄,按照减法原理,先day相减,不够向month借;然后month相减,不够向year借;最后year相减。...
  12. (转载)RESTORE DATABASE命令还原SQLServer 2005 数据库
  13. 阶段2 JavaWeb+黑马旅游网_15-Maven基础_第5节 使用骨架创建maven的java工程_18maven的java工程取mysql数据库...
  14. GRE阅读高频机经原文及答案之鸟叫研究
  15. 共轭梯度法python实现
  16. java-->if顺序结构-->骰子游戏(小案例)
  17. android电视无线同屏,手机连接电视同屏操作方法详解
  18. 在linux服务器上部署禅道环境
  19. html做出来发给别人链接,FINEBI仪表板的公共链接分享后,将分享的链接发给别人后无法在另外一台电脑上打开该链接网址。...
  20. 用python提取字符串的中英文——建议收藏反复观看

热门文章

  1. vue中获取组件的位置
  2. 使用ScrollView实现上下联动(标题栏与内容)
  3. 全球与中国振动监测与诊断系统市场行业供需现状及投资策略分析报告2022-2028年
  4. 【PyCharm】Couldn‘t refresh skeletons for remote interpreter: Can‘t get remote credentials for server
  5. 为何鲁大师为变成一个流氓软件?
  6. AI人工智能发展的经典算法
  7. 免费文字转语音软件有哪些?这几款宝藏工具你值得拥有
  8. kafka的auto.offset.reset详解与测试
  9. ODOO15中如何让销售工作流程全自动完成?
  10. PADS Layout VX.2.2 - 导出 DXF 文件