概述

  • BlockingQueue:阻塞FIFO队列,在接口设计层面,对于从队列尾部添加元素,从队列头部获取并删除元素的方法,在队列满时添加元素或者队列空时获取元素,则提供了四个版本:分别是:抛异常,直接返回一个特殊值null或者false,无限阻塞直到队列不满或者队列不空,阻塞指定的时间。实现类包括:ArrayBlockingQueue(基于数组),LinkedBlockingQueue(基于单向链表)

  • BlockingDeque:阻塞双端队列,继承于BlockingQueue,线程安全,在BlockingQueue的继承上增加了在队列两端均可添加和获取并删除元素的功能,即在add,offer,poll等方法的基础上,增加了addFirst,addLast,offerFirst,offerLast等方法。实现类包括:LinkedBlockingDeque(基于双向链表)

数据增删读

  • 数据写入(队列尾):如果队列满了,无法添加数据

    • add:抛异常,
    • put:无限阻塞当前线程直到队列不满,
    • offer:非阻塞直接返回false,
    • offer timeout:阻塞当前线程指定的时间
  • 数据读取(队列头):队列为空
    • element:抛异常,(不删除)
    • take:无限阻塞当前线程直到队列存在元素,
    • poll:非阻塞直接返回null,
    • peek:非阻塞,(不删除)
    • poll timeout:阻塞指定的时间,
  • 数据删除(队列头):队列为空
    • remove:抛异常,
    • poll:非阻塞直接返回,
    • take:无限阻塞当前线程
    • poll timeout:阻塞当前线程指定时间

特性

  • null值:在add,put,offer方法中不能存储null值,否则抛NullPointerException。
  • 容量:BlockingQueue如果在创建对象实例时不指定最大容量,则默认最大容量为Integer.MAX_VALUE。
  • 线程安全:
    • BlockingQueue是线程安全的,遵循内存可见性的happend-before原则(即往队列写入数据的线程优先于从队列读取或删除数据的线程,从而保证一个线程的写对其他线程可见),在内部通过使用ReentrantLock和Condition来实现增删改的原子操作和线程之间的交互。

    • 通常用在多线程的生产者和消费者模型中,即多个生产者线程和多个消费者线程共享一个BlockingQueue,生产者线程往队列尾部追加数据,消费者线程从队列头部获取数据并从队列中删除。以下为一个生产者和消费者的示例:

      class Producer implements Runnable {private final BlockingQueue queue;Producer(BlockingQueue q) { queue = q; }public void run() {try {while (true) { // put:如果队列满了,则阻塞queue.put(produce()); }} catch (InterruptedException ex) { ... handle ...}}Object produce() { ... }
      }class Consumer implements Runnable {private final BlockingQueue queue;Consumer(BlockingQueue q) { queue = q; }public void run() {try {while (true) { // take:如果队列空,则阻塞consume(queue.take()); }} catch (InterruptedException ex) { ... handle ...}}void consume(Object x) { ... }
      }class Setup {void main() {// 生产者和消费者共享同一个队列BlockingQueue q = new SomeQueueImplementation();// 将q作为参数创建生产者和消费者Producer p = new Producer(q);Consumer c1 = new Consumer(q);Consumer c2 = new Consumer(q);new Thread(p).start();new Thread(c1).start();new Thread(c2).start();}
      }
      

实现

BlockingQueue和BlockingDeque接口主要包括以下三个实现类,在实现类内部主要通过ReentrantLock和Condition来实现多个生产者线程和消费者线程对内部数据存储的数组或链表进行线程安全访问,即多个生产者线程和多个消费者线程可以共享一个实现类队列对象实例,可以通过Condition来在往队列填充了数据或者从队列取出了数据时,通知其他线程。

ArrayBlockingQueue

  • ArrayBlockingQueue为一个有界队列实现,在内部使用一个有界数组(即数组大小是固定的,在创建该队列实例后不能改变)来进行数据存储,通常可以作为一个有界缓冲区来使用,类定义如下:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/*** Serialization ID. This class relies on default serialization* even for the items array, which is default-serialized, even if* it is empty. Otherwise it could not be declared final, which is* necessary here.*/private static final long serialVersionUID = -817911632652898426L;// 数据存储数组/** The queued items */final Object[] items;// 队列头索引(第一个可读)/** items index for next take, poll, peek or remove */int takeIndex;// 队列尾所以(第一个可写)/** items index for next put, offer, or add */int putIndex;// 当前在队列中的元素个数/** Number of elements in the queue */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*/// 线程同步锁/** Main lock guarding all access */final ReentrantLock lock;// 非空,通常是生产者线程调用来通知消费者线程/** Condition for waiting takes */private final Condition notEmpty;// 非满,通常是消费者线程调用来通知生产者线程/** Condition for waiting puts */private final Condition notFull;/*** Shared state for currently active iterators, or null if there* are known not to be any.  Allows queue operations to update* iterator state.*/transient Itrs itrs = null;...// 在构造函数中需要指定数组的固定容量// fair默认为false,即使用非公平锁public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);...}
    }
    
    • 内部是使用一个数组,数组读下标为takeIndex,写下标为putIndex,takeIndex和putIndex之间为可读数据,该数组是一个环形数组,putIndex一直在takeIndex前面,这个前面的定义是:环形的轮次 + 数组下标,其中环形轮次是指当前是第几轮了,如刚开始是0,当一直写,不读的时候,当满的时候,putIndex到了数组末尾,此时不能再写了;当某个线程读了一个数据后,则不满了,则putIndex会重置为0,此时写是第2轮了,而读还是第1轮,然后通过count来控制避免putIndex和takeIndex重叠;
    • 使用一个ReentrantLock,即读写线程共享该lock来执行数据读写。在LinkedBlockingQueue中可以看到是使用了两个lock,即读写线程各一个,所以LinkedBlockingQueue吞吐量相对较高。
生产者写和消费者读
  • 在ArrayBlockingQueue内部通过一个count变量来记录当前队列中存在多少个元素,如果count与队列的capacity相等则说明队列满了,否则队列还可以添加数据。

  • 生产者写:需要先获取lock锁,然后再看队列是否满了来决定是否将数据追加到队列。

    // 非阻塞,如果队列满,则直接返回
    public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}
    }// 阻塞,如果队列满,则等到队列非满
    public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 等待,直到其他线程调用notFull.signalwhile (count == items.length)notFull.await();// 入队enqueue(e);} finally {lock.unlock();}
    }private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;// 递增putIndex,等于items.length之后,// 置为0,重头开始存放,实现了一个环形数组if (++putIndex == items.length)putIndex = 0;// 递增countcount++;notEmpty.signal();
    }
    

    其中追加操作在enqueue方法实现,在enqueue方法中,可以看到当++putIndex==items.length时,将putIndex重置为0,这个实现基础是,在put或者offer当中已经判断过当前队列还没满(count需要小于items.length),这也是实现了一个环形数组。

  • 消费者读:在poll和take方法中,先判断了count > 0的时候,即当前队列存在可读元素,才调用dequeue来读取takeIndex指定的数据。

    public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {// 直接返回return (count == 0) ? null : dequeue();} finally {lock.unlock();}
    }public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// count等于0,表示当前队列为空,等待直到非空while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
    }private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];// 置为null,方便gcitems[takeIndex] = null;// 递增takeIndex,实现环形数组if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 通知生产者线程,当前队列非满notFull.signal();return x;
    }
    

LinkedBlockingQueue

  • LinkedBlockingQueue在内部基于一个单向链表来实现,定义了链表头指针head和链表尾指针last,以及两个lock来实现同时读写,提高吞吐量。如果不指定容量,则链表可以不断添加节点,直到Integer.MAX_VALUE个。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 单向链表节点/*** Linked list node class*/static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*/Node<E> next;Node(E x) { item = x; }}// 队列容量,默认为Integer.MAX_VALUE/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;// 当前链表元素个数/** Current number of elements */private final AtomicInteger count = new AtomicInteger();// 链表头指针,读操作从该指针往后读/*** Head of linked list.* Invariant: head.item == null*/transient Node<E> head;// 链表尾指针,写操作从该指针往后写/*** Tail of linked list.* Invariant: last.next == null*/private transient Node<E> last;// 使用两个锁分别用于控制读写,提高吞吐量// 即读写操作可以同时进行/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();...}
    
读写同时进行的实现基础:AtomicInteger count原子变量
  • 在ArrayBlockingQueue当中,是读写线程是使用一个lock来进行同步的,即任何时候只能存在一个线程在读或者写,而在LinkedBlockingQueue当中,读写线程各使用一个lock,读写之间不存在lock锁的竞争,而如何解决数据并发问题?

  • 实现这个的方法主要是通过定义一个AtomicInteger类型的count来实现的,如下:count记录当前链表中存在的元素个数。

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    
  • AtomicInteger是线程安全的Integer,读写线程共享该count变量,在写线程中递增count,在读线程中递减count,而递增递减对共享的其他线程是可见的,所以在读写方法中,在获取自身相关的lock之后,需要判断count的时,如下:

    • 写操作offer为例:先获取putLock,然后判断count是否小于链表容量capacity,是则将当前数据入队,并且递增count,此时对读线程是可见的。

      // 非阻塞写版本
      public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);// 获取写锁final ReentrantLock putLock = this.putLock;putLock.lock();try {// 非满则入队if (count.get() < capacity) {enqueue(node);// 递增countc = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}// 等于0,表示队列从空的到存在一个元素,// 此时可能存在等待的读线程,则通知等待的读线程if (c == 0)signalNotEmpty();return c >= 0;
      }// 尾部入队
      private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;
      }// 阻塞写版本
      public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();// 可能同时存在多个生产者在写,故通知下一个生产者继续写if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 可能之前为空,故写数据进去之后分空了,故通知等待非空的消费者来读取if (c == 0)signalNotEmpty();
      }
      
    • 读操作poll为例:先获取读锁takeLock,然后判断count是否大于0,大于则说明队列存在元素,数据出队,递减count,count的递减对写线程可见。

      //  非阻塞读版本
      public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;// 获取读lock锁takeLock.lock();try {// count大于0,表示队列存在数据,非空if (count.get() > 0) {x = dequeue();// 递减count,该递减对写线程可见c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}// c等于队列容量capacity,说明之前队列为满,// 可能存在其他等待队列非满的写线程,故通知if (c == capacity)signalNotFull();return x;
      }// 头部出队
      private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;
      }// 阻塞读版本
      public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();// 可能存在其他消费者也在等待(由空变为非空)读取,故通知下一个消费者进行读取if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 原来是满的的,现在不满了,则通知生产者进行写数据进来if (c == capacity)signalNotFull();return x;
      }
      

LinkedBlockingDeque

  • LinkedBlockingDeque基于双向链表实现,默认容量也是Integer.MAX_INTEGER。

  • 与ArrayBlockingQueue一样,也是使用一个ReentrantLock,而不是两个,来实现读写线程的同步。

  • 提供了在队列头部和尾部进行节点增删的功能。

    public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, java.io.Serializable {/** Doubly-linked list node class */static final class Node<E> {E item;Node<E> prev;Node<E> next;Node(E x) {item = x;}}/*** Pointer to first node.* Invariant: (first == null && last == null) ||*            (first.prev == null && first.item != null)*/transient Node<E> first;/*** Pointer to last node.* Invariant: (first == null && last == null) ||*            (last.next == null && last.item != null)*/transient Node<E> last;/** Number of items in the deque */private transient int count;/** Maximum number of items in the deque */private final int capacity;/** Main lock guarding all access */final ReentrantLock lock = new ReentrantLock();/** Condition for waiting takes */private final Condition notEmpty = lock.newCondition();/** Condition for waiting puts */private final Condition notFull = lock.newCondition();...}
    

拓展思考

  • 阻塞队列通常用于生产者和消费者模型的需求当中,多个生产者线程和消费者线程可以通过阻塞队列来进行线程安全的数据交互。
  • 阻塞队列也提供Collection的功能,如普通的remove(E e),删除某个元素,但是在阻塞队列的实现中并不是很高效,所以如果只是需要线程安全队列,而没有生产者消费者模型方面的需要,则可以考虑使用ConcurrentLinkedQueue,ConcurrentLinkedDeque等。

JDK1.8源码分析:阻塞队列LinkedBlockingQueue与BlockingDeque(双端)的设计与实现相关推荐

  1. JDK1.8源码分析:可重入锁ReentrantLock和Condition的实现原理

    synchronized的用法和实现原理 synchronized实现线程同步的用法和实现原理 不足 synchronized在线程同步的使用方面,优点是使用简单,可以自动加锁和解锁,但是也存在一些不 ...

  2. 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)

    一.前言 在完成Map下的并发集合后,现在来分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一个阻塞型队列,支持多任务并发操作,有了之前看源码的积累,再看Arra ...

  3. 【集合框架】JDK1.8源码分析之HashMap(一)

    转载自  [集合框架]JDK1.8源码分析之HashMap(一) 一.前言 在分析jdk1.8后的HashMap源码时,发现网上好多分析都是基于之前的jdk,而Java8的HashMap对之前做了较大 ...

  4. 【集合框架】JDK1.8源码分析HashSet LinkedHashSet(八)

    一.前言 分析完了List的两个主要类之后,我们来分析Set接口下的类,HashSet和LinkedHashSet,其实,在分析完HashMap与LinkedHashMap之后,再来分析HashSet ...

  5. synchronousqueue场景_【JUC】JDK1.8源码分析之SynchronousQueue(九)

    一.前言 本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后 ...

  6. 【JUC】JDK1.8源码分析之ConcurrentLinkedQueue(五)

    一.前言 接着前面的分析,接下来分析ConcurrentLinkedQueue,ConcurerntLinkedQueue一个基于链接节点的无界线程安全队列.此队列按照 FIFO(先进先出)原则对元素 ...

  7. 【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer

    一.前言 在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.所以很有必 ...

  8. Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2

    https://blog.csdn.net/programmer_at/article/details/79715177 https://blog.csdn.net/qq_41737716/categ ...

  9. 基于JDK1.8---HashMap源码分析

    基于JDK1.8-HashMap源码简要分析 HashMap继承关系 HashMap:根据键的 hashCode 值存储数据,大多数情况下可以直接定位到它的值,因而具有很快的访问速度,但遍历顺序却是不 ...

  10. JDK1.8源码分析之HashMap(一) (转)

    一.前言 在分析jdk1.8后的HashMap源码时,发现网上好多分析都是基于之前的jdk,而Java8的HashMap对之前做了较大的优化,其中最重要的一个优化就是桶中的元素不再唯一按照链表组合,也 ...

最新文章

  1. Chapter 1 Securing Your Server and Network(9):使用Kerberos用于身份验证
  2. 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
  3. JVM内存结构|程序计数器
  4. 调用其他app 的lib_ButterKnife执行效率为什么比其他注入框架高?它的原理是什么...
  5. RTT的IPC机制篇——消息队列
  6. java工作笔记019---java8新特性判断非null
  7. codevs 2928 你缺什么
  8. 使用layui的laypage完成分页
  9. BaiduPan百度网盘不限速教程
  10. OpenStack部署(未完成)
  11. 信道检测手机软件 ios_3.2、《无线通信基础》--点对点通信:检测、分集与信道的不确定性--时间分集...
  12. 机房里的未卜先知!PAKDD2021 第二届阿里云智能运维算法大赛启动
  13. 红米5无线网连接上但是没有网络连接服务器,红米路由器wifi已连接但无法访问互联网怎么办 | 192路由网...
  14. Ansible批量管理Windows服务器,winrm配置
  15. Pytorch-模型参数:named_parameters()、parameters()、state_dict()区别
  16. WPF学习之深入浅出话属性
  17. HP Smart Tank 518 在 macOS 下如何进行手动双面打印
  18. uniapp图片或文件的预览和下载,兼容ios+安卓+浏览器
  19. 黑客黑掉15万台打印机,可打印任意文档
  20. 前端实现图片快速反转替换_HTML5开发之canvas实现元素图片镜像翻转动画效果的方法...

热门文章

  1. 品牌LOGO设计丨商业实践设计思路大揭秘 难怪他接单不断
  2. 常用增强学习实验环境 I (MuJoCo, OpenAI Gym, rllab, DeepMind Lab, TORCS, PySC2)
  3. Linux 三剑客(grep、sed、awk)
  4. 悟空CRM java版(基于jfinal+vue+ElementUI的前后端分离CRM系统)
  5. 关于Windows10服务中没有SNMP Service问题以及SNMP没有安全选项的问题
  6. Python decode()方法
  7. 【实习_面试全程辅导分享】简历篇
  8. vue 网格组件_简单的Vue组件可显示带有事件的月网格日历
  9. 【大数据】数据驱动的大数据金融应用-2017CCTC大会-专题视频课程
  10. Python爬虫——豆瓣评分8分以上电影爬取-存储-可视化分析