ArrayBlockingQueue

有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {/** 队列元素 */final Object[] items;/** 下一次读取操作的位置, poll, peek or remove */int takeIndex;/** 下一次写入操作的位置, offer, or add */int putIndex;/** 元素数量 */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.* 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。*//** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;/** 指定大小 */public ArrayBlockingQueue(int capacity) {this(capacity, false);}/** * 指定容量大小与指定访问策略 * @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;*/public ArrayBlockingQueue(int capacity, boolean fair) {}/** * 指定容量大小、指定访问策略与最初包含给定集合中的元素 * @param c 将此集合中的元素在构造方法期间就先添加到队列中 */public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {}
}
  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。

  • 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。

  • items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()

  • 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put源码分析

/** 进行入队操作 */
public void put(E e) throws InterruptedException {//e为null,则抛出NullPointerException异常checkNotNull(e);//获取独占锁final ReentrantLock lock = this.lock;/*** lockInterruptibly()* 获取锁定,除非当前线程为interrupted* 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。* 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。* 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态* */lock.lockInterruptibly();try {//空队列while (count == items.length)//进行条件等待处理notFull.await();//入队操作enqueue(e);} finally {//释放锁lock.unlock();}}/** 真正的入队 */private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;//获取当前元素final Object[] items = this.items;//按下一个插入索引进行元素添加items[putIndex] = x;// 计算下一个元素应该存放的下标,可以理解为循环队列if (++putIndex == items.length)putIndex = 0;count++;//唤起消费者notEmpty.signal();
}

这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加锁后获取的共享变量都是从主内存中获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新到主内存。

另外这个队列使用循环数组实现,所以在计算下一个元素存放下标时候有些特殊。另外insert后调用notEmpty.signal();是为了激活调用notEmpty.await();阻塞后放入notEmpty条件队列的线程。

Take源码分析

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;//这里有些特殊if (itrs != null)//保持队列中的元素和迭代器的元素一致itrs.elementDequeued();notFull.signal();return x;
}

Take操作和Put操作很类似

//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。/**
* 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。
*/
class Itrs {void elementDequeued() {// assert lock.getHoldCount() == 1;if (count == 0)//队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除queueIsEmpty();//takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取else if (takeIndex == 0)takeIndexWrapped();}/*** 当队列为空的时候做的事情* 1. 通知所有迭代器队列已经为空* 2. 清空所有的弱引用,并且将迭代器置空*/void queueIsEmpty() {}/*** 将takeIndex包装成0* 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象)* 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。*/void takeIndexWrapped() {}
}

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象
//那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空
public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {Itr() {//这里就是生产它的地方//count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。//否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。if (count == 0) {// assert itrs == null;cursor = NONE;nextIndex = NONE;prevTakeIndex = DETACHED;} else {final int takeIndex = ArrayBlockingQueue.this.takeIndex;prevTakeIndex = takeIndex;nextItem = itemAt(nextIndex = takeIndex);cursor = incCursor(takeIndex);if (itrs == null) {itrs = new Itrs(this);} else {itrs.register(this); // in this orderitrs.doSomeSweeping(false);}prevCycles = itrs.cycles;// assert takeIndex >= 0;// assert prevTakeIndex == takeIndex;// assert nextIndex >= 0;// assert nextItem != null;}}
}

代码演示

package com.rumenz.task;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @className: BlockingQuqueExample* @description: TODO 类描述* @author: mac* @date: 2021/1/20**/
public class BlockingQueueExample {private static volatile   Boolean flag=false;public static void main(String[] args) {BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(()->{try{blockingQueue.put(1);Thread.sleep(2000);blockingQueue.put(3);flag=true;}catch (Exception e){e.printStackTrace();}});executorService.execute(()->{try {while (!flag){Integer i = (Integer) blockingQueue.take();System.out.println(i);}}catch (Exception e){e.printStackTrace();}});executorService.shutdown();}
}

LinkedBlockingQueue

基于链表的阻塞队列,通ArrayBlockingQueue类似,其内部也维护这一个数据缓冲队列(该队列由一个链表构成),当生产者往队列放入一个数据时,队列会从生产者手上获取数据,并缓存在队列的内部,而生产者立即返回,只有当队列缓冲区到达最大值容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞队列,直到消费者从队列中消费掉一份数据,生产者会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以调高整个队列的并发能力。

如果构造一个LinkedBlockingQueue对象,而没有指定容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量Integer.MAX_VALUE,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。

LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {//队列的容量,指定大小或为默认值Integer.MAX_VALUEprivate final int capacity;//元素的数量private final AtomicInteger count = new AtomicInteger();//队列头节点,始终满足head.item==nulltransient Node<E> head;//队列的尾节点,始终满足last.next==nullprivate transient Node<E> last;/** Lock held by take, poll, etc *///出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes *///当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc *///入队的锁:put, offer 等写操作的方法需要获取到这个锁private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts *///当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件private final Condition notFull = putLock.newCondition();//传说中的无界队列public LinkedBlockingQueue() {}//传说中的有界队列public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//传说中的无界队列public LinkedBlockingQueue(Collection<? extends E> c){}/*** 链表节点类*/static class Node<E> {E item;/*** One of:* - 真正的继任者节点* - 这个节点,意味着继任者是head.next* - 空,意味着没有后继者(这是最后一个节点)*/Node<E> next;Node(E x) { item = x; }}
}

通过其构造函数,得知其可以当做无界队列也可以当做有界队列来使用。

这里用了两把锁分别是takeLockputLock,而Condition分别是notEmptynotFull,它们是这样搭配的。

  • 如果需要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • 如果要插入(put)一个元素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还是需要队列不满(notFull)的这个条件(Condition)。

从上面的构造函数中可以看到,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也是获取头结点后面的一个元素。count的计数值不包含这个头结点。

Put源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {    /*** 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。int c = -1;//包装成node节点Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//获取锁定putLock.lockInterruptibly();try {/** 如果队列满,等待 notFull 的条件满足。 */while (count.get() == capacity) {notFull.await();}//入队enqueue(node);//原子性自增c = count.getAndIncrement();// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。// 哪些线程会等待在 notFull 这个 Condition 上呢?if (c + 1 < capacity)notFull.signal();} finally {//解锁putLock.unlock();}// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作if (c == 0)signalNotEmpty();}/** 链接节点在队列末尾 */private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素//last.next = node;//last = node;// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作last = last.next = node;}/*** 等待PUT信号* 仅在 take/poll 中调用* 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();//唤醒} finally {putLock.unlock();}}
}

Take源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {   public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//首先,需要获取到 takeLock 才能进行出队操作takeLock.lockInterruptibly();try {// 如果队列为空,等待 notEmpty 这个条件满足再继续执行while (count.get() == 0) {notEmpty.await();}出队x = dequeue();//count 进行原子减 1c = count.getAndDecrement();// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}/*** 出队*/private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}/*** Signals a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}
}

ArrayBlockingQueue对比

ArrayBlockingQueue是共享锁,粒度大,入队与出队的时候只能有1个被执行,不允许并行执行。LinkedBlockingQueue是独占锁,入队与出队是可以并行进行的。当然这里说的是读和写进行并行,两者的读读与写写是不能并行的。总结就是LinkedBlockingQueue可以并发读写。

ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象

package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;public class TestBlockingQueue {static long randomTime() {return (long) (Math.random() * 1000);}public static void main(String[] args) {// 能容纳100个文件final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);// 线程池final ExecutorService exec = Executors.newFixedThreadPool(5);final File root = new File("F:\\JavaLib");// 完成标志final File exitFile = new File("");// 读个数final AtomicInteger rc = new AtomicInteger();// 写个数final AtomicInteger wc = new AtomicInteger();// 读线程Runnable read = new Runnable() {public void run() {scanFile(root);scanFile(exitFile);}public void scanFile(File file) {if (file.isDirectory()) {File[] files = file.listFiles(new FileFilter() {public boolean accept(File pathname) {return pathname.isDirectory()|| pathname.getPath().endsWith(".java");}});for (File one : files)scanFile(one);} else {try {int index = rc.incrementAndGet();System.out.println("Read0: " + index + " "+ file.getPath());queue.put(file);} catch (InterruptedException e) {}}}};exec.submit(read);// 四个写线程for (int index = 0; index < 4; index++) {// write threadfinal int NO = index;Runnable write = new Runnable() {String threadName = "Write" + NO;public void run() {while (true) {try {Thread.sleep(randomTime());int index = wc.incrementAndGet();File file = queue.take();// 队列已经无对象if (file == exitFile) {// 再次添加"标志",以让其他线程正常退出queue.put(exitFile);break;}System.out.println(threadName + ": " + index + " "+ file.getPath());} catch (InterruptedException e) {}}}};exec.submit(write);}exec.shutdown();}
}

关注微信公众号:【入门小站】,解锁更多知识点。

Java高并发BlockingQueue重要的实现类相关推荐

  1. Java高并发BlockingQueue重要的实现类二

    DelayQueue DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素.该队列的头部是延迟期满后保存时间最长的Delayed元素. 存放到DelayDeque的元素必须继承De ...

  2. Java高并发编程:同步工具类

    内容摘要 这里主要介绍了java5中线程锁技术以外的其他同步工具,首先介绍Semaphore:一个计数信号量.用于控制同时访问资源的线程个数,CyclicBarrier同步辅助类:从字面意思看是路障, ...

  3. java queue 线程安全_详解Java高并发——设计线程安全的类

    前言: 将现有的线程安全的组件组合为更大规模的组件或程序. 通过使用封装技术可以使得在不对整个程序进行分析的情况下就可以判断一个类是否是线程安全的. 一. 基本要素 1. 找出对象状态的所有变量 如果 ...

  4. 【高并发】JUC底层工具类Unsafe

    1.概述 转载:添加链接描述 参考:[java]java的unsafe 参考:JUC原子类: CAS, Unsafe和原子类详解 1.1 本文主要内容 Unsafe基本介绍 获取Unsafe实例 Un ...

  5. java unsafe获取指针_【实战Java高并发程序设计 1】Java中的指针:Unsafe类

    是<实战Java高并发程序设计>第4章的几点. 如果你对技术有着不折不挠的追求,应该还会特别在意incrementAndGet() 方法中compareAndSet()的实现.现在,就让我 ...

  6. JAVA高并发程序设计(葛一鸣著)读书笔记

    本文为JAVA高并发程序设计(葛一鸣著)读书笔记.这本书对于刚刚入门的童鞋来讲可能有点深,我推荐可以先看看Java多线程编程核心技术(高洪岩著)一书. 第一章 走入并行世界 什么是同步和异步? 同步就 ...

  7. [Java高并发系列(5)][详细]Java中线程池(1)--基本概念介绍

    1 Java中线程池概述 1.1 什么是线程池? 在一个应用当中, 我们往往需要多次使用线程, 这意味着我们需要多次创建和销毁线程.那么为什么不提供一个机制或概念来管理这些线程呢? 该创建的时候创建, ...

  8. Java高并发程序设计入门

    转自:http://blog.csdn.net/johnstrive/article/details/50667557 说在前面 本文绝大部分参考<JAVA高并发程序设计>,类似读书笔记和 ...

  9. Java高并发系列5-线程池

    Java高并发系列5-线程池 接上一篇Java并发系列4-并发容器我们继续 在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销.如果每次执行一个任务都需要开个新线程去执行, ...

最新文章

  1. 千万级饿了么交易系统架构 5 年演化史!
  2. qt 实现窗口局部镂空,并截图显示。
  3. 寒假每日一题(入门组)【week2 完结】
  4. python 廖雪峰_廖雪峰的Python系列教程(20)——高级特性之生成器
  5. 腾讯专家教你如何保证应用开发安全
  6. 洛谷3195(HNOI2008)玩具装箱
  7. 为什么转换到Visual Studio 2017如此 “容易”
  8. 关系与普通表的术语比较
  9. HBase BlockCache系列 - 探求BlockCache实现机制
  10. 还在迷茫于前端如何入门和进阶?万字指南让你不再迷茫!
  11. Java面试问题 021-030
  12. html文件怎么兼容浏览器,如何扫描HTML和跨浏览器兼容的JavaScript文件?
  13. 计算机里保存文件时没有桌面,电脑在保存文件时桌面怎么不见了怎么办
  14. 神秘美女接机刘谦 网友见证奇迹时刻:女子像舒淇
  15. 无线WiFi漫游的基本原理及搭建
  16. 北大青鸟所有学习资料下载地址
  17. 【开源WebGIS】07-Openlayers+Vue 测量功能-02
  18. Chrome 76 新特性
  19. 智慧环保大数据可视化系统建设
  20. 网络安全等级保护测评高风险判定-安全通信网络

热门文章

  1. winlogon病毒清除
  2. Python 处理各种编码的字符串
  3. python socket 实现的简单http服务器
  4. red linux 9 中文,Red Hat Linux 9 命令行中文显示问题
  5. JBoss的部署机制
  6. perl 操作redis 数据库 带用户名和密码验证,选择数据库0.1.2
  7. 21- vue django restful framework 打造生鲜超市 -首页商品分类显示功能
  8. 租客如何玩转物联网,打造智能新生活
  9. CSS的块级元素和内联元素,以及float
  10. 利用stack结构,将中缀表达式转换为后缀表达式并求值的算法实现