六、阻塞队列与源码分析(上)
一、阻塞队列BlockingQueue
1、先理解Queue、Deque
1、Queue(队列):用于保存一组元素,不过
在存取元素的时候必须遵循先进先出原则
。队列是一种特殊的线性表,它只允许在表的前端(front)进行删除操作,在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时称为空队列。在队列这种数据结构中,最先插入的元素将是最先被删除的元素;反之最后插入的元素将是最后被删除的元素,因此队列又称为“先进先出”(FIFO——First In First Out)的线性表。2、Deque(双端队列):两端都可以进出的队列。当约束从队列的一端进出队列时,就形成了另外一种存取模式,它遵循先进后出原则,这就是栈结构。双端队列主要用于栈操作。使用栈结构让操作有可追溯性。
2、阻塞队列概述
1、阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,两个附加操作是:
当阻塞队列为空时,获取元素的线程会等待队列变为非空(获取元素被阻塞)。
当阻塞队列满时,存储元素的线程会等待队列可用(添加元素被阻塞)。
2、应用场景:常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里取元素。
3、架构关系:
3、BlockingQueue核心方法
1、BlockingQueue具有4组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。如下表
方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法
add(e)
offer(e)
put(e)
offer(e,time,unit)
移除方法
remove()
poll()
take()
poll(time,unit)
检查方法
element()
peek()
不可用
不可用
2、对4组不同的表现解释:
抛出异常
:是指当阻塞队列满时,再往队列里插入元素,会抛出IllegalStateException: Queue full
异常;当队列为空时,从队列里获取元素时会抛出NoSuchElementException
异常。
返回特殊值
:插入方法会返回是否成功,成功返回true,失败返回false
;移除方法,是从队列里拿出一个元素,如果没有则返回null
。
一直阻塞
:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据或者响应中断退出;当队列为空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出
:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
/*** @Date: 2022/6/13* 阻塞队列:第一组*/
public class BlockingQueueTest {public static void main(String[] args) {BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);System.out.println(queue.add("a"));System.out.println(queue.add("b"));System.out.println(queue.add("c"));// System.out.println(queue.add("d"));//会抛出java.lang.IllegalStateException: Queue full异常//检索但不删除此队列的头部。此方法与peek的不同之处仅在于如果此队列为空,它将抛出NoSuchElementException异常。System.out.println(queue.element());//aSystem.out.println(queue.remove());System.out.println(queue.remove());System.out.println(queue.remove());// System.out.println(queue.remove());//会抛出java.util.NoSuchElementException异常}
}
/*** @Date: 2022/6/13* 阻塞队列:第二组*/
public class BlockingQueueTest {public static void main(String[] args) {BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);System.out.println(queue.offer("a"));//trueSystem.out.println(queue.offer("b"));//trueSystem.out.println(queue.offer("c"));//trueSystem.out.println(queue.offer("d"));//false//检索但不删除此队列的头部,如果此队列为空,则返回nullSystem.out.println(queue.peek());//aSystem.out.println(queue.poll());//aSystem.out.println(queue.poll());//bSystem.out.println(queue.poll());//cSystem.out.println(queue.poll());//null}
}
/*** @Date: 2022/6/13* 阻塞队列:第三组*/
public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);queue.put("a");queue.put("b");queue.put("c");// queue.put("d");//队列满时,一直阻塞queue.take();queue.take();queue.take();// queue.take();//队列为空时,一直阻塞}
}
/*** @Date: 2022/6/13* 阻塞队列:第四组*/
@Slf4j
public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);log.info("插入状态:" + queue.offer("a", 2L, TimeUnit.SECONDS));log.info("插入状态:" + queue.offer("b", 2L, TimeUnit.SECONDS));log.info("插入状态:" + queue.offer("c", 2L, TimeUnit.SECONDS));log.info("插入状态:" + queue.offer("d", 2L, TimeUnit.SECONDS));//会阻塞2秒,然后退出}
}
/*** 00:29:58.918 [main] INFO com.itan.queue.BlockingQueueTest - 插入状态:true* 00:29:58.923 [main] INFO com.itan.queue.BlockingQueueTest - 插入状态:true* 00:29:58.923 [main] INFO com.itan.queue.BlockingQueueTest - 插入状态:true* 00:30:00.929 [main] INFO com.itan.queue.BlockingQueueTest - 插入状态:false*/
二、数组阻塞队列ArrayBlockingQueue
1、概述
1、ArrayBlockingQueue是一个
有界的阻塞队列
,其内部实现是将对象放到一个数组里。有界也就意味着,它不能存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。2、ArrayBlockingQueue内部以FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。新元素插入队列的尾部,队列获取操作则是从队列头部开始获取元素。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么也将导致当前线程阻塞。
3、在创建ArrayBlockingQueue时,可以控制对象内部是否采用公平锁,默认采用非公平锁。
2、ArrayBlockingQueue原理(部分源码)
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 底层存放元素的数组 */final Object[] items;/** 将取元素的下标索引,代表逻辑头部 */int takeIndex;/** 将存元素的下标索引,代表队列逻辑尾部 */int putIndex;/** 队列中元素的数量 */int count;/** 可重入锁lock */final ReentrantLock lock;/** 存放等待消费的消费者的条件队列 */private final Condition notEmpty;/** 存放等待生产的生产者的条件队列 */private final Condition notFull;/*** 创建一个带有指定容量和默认非公平访问策略的ArrayBlockingQueue* @param capacity 指定容量*/public ArrayBlockingQueue(int capacity) {//内部调用另一个构造器,公平策略为false:非公平模式this(capacity, false);}/*** 创建一个具有指定容量和指定访问策略的ArrayBlockingQueue* @param capacity 指定容量* @param fair 如果为true,则按照FIFO顺序访问插入或移除时受阻塞线程的队列;如果为false则访问顺序是不确定的*/public ArrayBlockingQueue(int capacity, boolean fair) {//容量校验if (capacity <= 0)throw new IllegalArgumentException();//初始化数组this.items = new Object[capacity];//初始化lock锁lock = new ReentrantLock(fair);//初始化两个条件队列notEmpty = lock.newCondition();notFull = lock.newCondition();}/*** 创建一个具有指定容量和指定访问策略的ArrayBlockingQueue* 它最初包含给定collection的元素,并以collection迭代器的遍历顺序添加元素。* @param capacity 指定容量* @param fair 如果为true,则按照FIFO顺序访问插入或移除时受阻塞线程的队列;如果为false则访问顺序是不确定的* @param c 指定集合* @throws IllegalArgumentException 如果 capacity 小于 c.size(),或者小于 1* @throws NullPointerException 如果指定 collection 或任何内部元素为null*/public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {//调用两个参数的构造器初始化全局属性this(capacity, fair);final ReentrantLock lock = this.lock;//加锁,这个加锁的操作并不是为了互斥操作,而是保证其可见性。比如count、putIndexlock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {//遍历指定集合for (E e : c) {//元素null检测checkNotNull(e);//存放数据items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {//如果容量超过capacitythrow new IllegalArgumentException();}//计数器count = i;//下一次要存放元素的索引,如果等于capacity,那么置为0,回到数组头部putIndex = (i == capacity) ? 0 : i;} finally {//解锁lock.unlock();}}/*** 将指定的元素插入到此队列的尾部,成功返回true,如果此队列已满,则立即返回false*/public boolean offer(E e) {//e的检测,为空则抛出NullPointerException异常checkNotNull(e);//获取锁实例,使用ReentrantLock锁机制final ReentrantLock lock = this.lock;//加锁lock.lock();try {//如果count等于容量,说明队列满了,直接返回falseif (count == items.length)return false;else {//调用enqueue存放元素enqueue(e);return true;}} finally {//释放锁lock.unlock();}}/*** 将指定的元素插入此队列的尾部,如果该队列已满,则线程等待*/public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//可中断的等待获取锁,即响应中断lock.lockInterruptibly();try {//如果count等于容量,说明队列满了while (count == items.length)//那么该线程在notFull条件队列中等待notFull.await();//队列不满,调用enqueue存放元素enqueue(e);} finally {//释放锁lock.unlock();}}/*** 插入元素到下一个索引位置*/private void enqueue(E x) {//获取当前数组final Object[] items = this.items;//将当前值x插入到数组的putIndex索引位置items[putIndex] = x;//如果putIndex+1等于数组的长度,说明此时存放元素的下标走到了数组的末尾if (++putIndex == items.length)//则初始化putIndex=0,回到数组的开头putIndex = 0;//上面的if可以看出这个数组是个逻辑环形数组,这样每次插入、移除的时候不需要复制移动数组中的元素,同时能更加有效的利用空间count++;//计数器自增1//唤醒在notEmpty等待的消费线程notEmpty.signal();}/*** 获取并移除此队列的头部元素*/public E take() throws InterruptedException {//获取锁实例,使用ReentrantLock锁机制final ReentrantLock lock = this.lock;//可中断的等待获取锁,即响应中断lock.lockInterruptibly();try {//如果count等于0,说明队列空了while (count == 0)//那么该线程在notEmpty条件队列中等待,被唤醒之后会继续循环notEmpty.await();//队列不为空,调用dequeue取出元素return dequeue();} finally {//释放锁lock.unlock();}}/*** 获取并移除此队列的头,如果此队列为空,则返回null* 相比于take方法,如果因为获取不到锁而在同步队列中等待的时候被中断也会继续等待获取锁,即不响应中断*/public E poll() {//获取锁实例final ReentrantLock lock = this.lock;//不可中断的等待获取锁,即不响应中断lock.lock();try {//如果count等于0,说明队列空了,那么直接返回null,否则调用dequeue用于获取并移除下一个索引位置的元素return (count == 0) ? null : dequeue();} finally {//释放锁lock.unlock();}}/*** 获取并移除此队列的头部,仅在获取锁之后调用*/private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")//获取takeIndex索引处的元素E x = (E) items[takeIndex];//并将该位置的元素置空items[takeIndex] = null;//如果takeIndex+1等于数组的长度,说明此时取出元素的下标走到了数组的末尾if (++takeIndex == items.length)//则初始化takeIndex=0,回到数组的开头takeIndex = 0;//上面的if可以看出这个数组是个逻辑环形数组,这样每次插入、移除的时候不需要复制移动数组中的元素,同时能更加有效的利用空间count--;//计数器自减1//如果itrs不为null,说明此前获取过迭代器if (itrs != null)//更新迭代器中的元素数据itrs.elementDequeued();//唤醒在notFull等待的消费线程notFull.signal();//返回元素return x;}
}
三、链表阻塞队列LinkedBlockingQueue
1、概述
1、LinkedBlockingQueue是
基于链表的有界阻塞队列
,底层数据结构是一个单链表。2、作为有界队列,容量范围是
[1, Integer.MAX_VALUE]
,也可以指定容量,若没有指定容量,则默认等于Integer.MAX_VALUE
(即最大容量)。3、由于消费线程只操作队头,而生产线程只操作队尾,这里巧妙地采用了两把锁,对插入数据采用putLock,对移除数据采用takeLock,即生产者锁和消费者锁,这样避免了生产线程和消费线程竞争同一把锁的现象(
比如ArrayBlockingQueue就只用同一把锁
),因此LinkedBlockingQueue在高并发的情况下,性能会比ArrayBlockingQueue好很多,但是在需要遍历整个队列的情况下则要把两把锁都锁住(比如clear、contains操作)。4、LinkedBlockingQueue的工作模式都是非公平的,也不能手动指定为公平模式,即获取锁的实际线程顺序不能保证是等待获取锁的线程顺序,好处是可以提升并发量。
5、如果在构建一个LinkedBlockingQueue对象,没有指定容量大小,默认是最大容量,如果生产者的速度一旦大于消费者的速度,也许还没有等待队列满阻塞产生,系统内存就可能被消耗完了(即OOM)。
2、LinkedBlockingQueue原理(部分源码)
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/*** 链表的结点内部类,用于存储数据*/static class Node<E> {//数据域E item;//后继引用Node<E> next;//构造器Node(E x) { item = x; }}/** 阻塞队列的容量,默认为Integer.MAX_VALUE,最大为Integer.MAX_VALUE */private final int capacity;/** 阻塞队列的元素个数,原子变量 */private final AtomicInteger count = new AtomicInteger();/*** 阻塞队列的头结点,并不是真正的头结点*/transient Node<E> head;/*** 阻塞队列的尾结点*/private transient Node<E> last;/** 消费线程使用的锁,take, poll操作 */private final ReentrantLock takeLock = new ReentrantLock();/** notEmpty条件对象,当队列为空时用于挂起消费线程 */private final Condition notEmpty = takeLock.newCondition();/** 生产线程使用的锁,put、offer操作 */private final ReentrantLock putLock = new ReentrantLock();/** notFull条件对象,当队列已满时用于挂起生产线程 */private final Condition notFull = putLock.newCondition();/*** 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue*/public LinkedBlockingQueue() {//内部调用另一个构造器this(Integer.MAX_VALUE);}*** 创建一个具有指定容量的LinkedBlockingQueue* @param capacity 指定容量* @throws IllegalArgumentException 如果capacity小于1*/public LinkedBlockingQueue(int capacity) {//capacity大小校验if (capacity <= 0) throw new IllegalArgumentException();//capacity赋值this.capacity = capacity;//初始化头结点和尾节点,指向同一个值为null的哨兵结点last = head = new Node<E>(null);}/*** 创建一个容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含指定集合的全部元素,元素按该集合迭代器的遍历顺序添加* @param c 指定集合* @throws NullPointerException 如果指定集合或任意元素为null*/public LinkedBlockingQueue(Collection<? extends E> c) {//调用另一个构造器,初始化队列,容量为Integer.MAX_VALUEthis(Integer.MAX_VALUE);//和ArrayBlockingQueue是一样的,需要加锁来保证数据的可见性,因为头、尾结点没有使用volatile修饰final ReentrantLock putLock = this.putLock;//获取生产者锁,仅用于保证可见性putLock.lock(); // Never contended, but necessary for visibilitytry {//n作为计数器int n = 0;//遍历指定集合for (E e : c) {//null校验if (e == null)throw new NullPointerException();//容量校验if (n == capacity)throw new IllegalStateException("Queue full");//调用enqueue方法插入新结点到队列尾部enqueue(new Node<E>(e));//计数器自增1++n;}//设置队列的元素数量count.set(n);} finally {//释放生产者锁putLock.unlock();}}/*** 指定结点链接到队列尾部成为新的尾结点,在获取锁之后才会调用该方法* @param node 指定结点*/private void enqueue(Node<E> node) {//原尾结点的next引用指向node结点,然后last指向最新node结点last = last.next = node;}/*** 将指定的元素插入此队列的尾部,如果该队列已满,则线程等待* @param e 指定元素*/public void put(E e) throws InterruptedException {//e的null校验if (e == null) throw new NullPointerException();//初始化c为-1,表示存放元素失败int c = -1;//新建一个节点Node<E> node = new Node<E>(e);//获取生产者锁final ReentrantLock putLock = this.putLock;//获取阻塞队列的元素个数final AtomicInteger count = this.count;//可中断的等待获取生产者锁,即响应中断putLock.lockInterruptibly();try {//循环判断此时结点数量是否等于容量,即队列是否满了while (count.get() == capacity) {//如果满了,那么该线程在notFull条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断notFull.await();}//队列没有满,调用enqueue方法插入新结点到队列尾部enqueue(node);//获取此时计数器的值赋给c,并且计数器值自增1c = count.getAndIncrement();//如果c+1小于阻塞队列的容量(capacity),说明还可以入队if (c + 1 < capacity)//唤醒一个在notFull条件队列中等待的生产线程notFull.signal();} finally {//释放生产者锁putLock.unlock();}//如果前面没有抛出异常,那么在finally之后会执行下面的代码//如果c为0,那么此时队列中还可能有存在1条数据,即刚放进去的//那么由于刚才队列没有数据,可能此时有消费者线程在等待,这里需要唤醒一个消费者线程//如果此前队列中就有数据没有消费完毕,那么也不必唤醒唤醒消费者if (c == 0)//获取消费者锁并且尝试唤醒一个消费者线程signalNotEmpty();}/*** 将指定的元素插入到此队列的尾部* @param e 指定元素* @return 在成功时返回 true,如果此队列已满,则不阻塞,立即返回 false。*/public boolean offer(E e) {//e的null校验if (e == null) throw new NullPointerException();//获取阻塞队列的元素个数final AtomicInteger count = this.count;//在获取锁之前就判断一次,如果容量满了if (count.get() == capacity)return false;//初始化c为-1,表示存放元素失败int c = -1;//新建一个节点Node<E> node = new Node<E>(e);//获取生产者锁final ReentrantLock putLock = this.putLock;//不可中断的等待获取生产者锁,即不响应中断putLock.lock();try {//如果队列未满if (count.get() < capacity) {//调用enqueue方法插入新结点到队列尾部enqueue(node);//获取此时计数器的值赋给c,并且计数器值自增1,这里的c一定是大于等于0的值c = count.getAndIncrement();//如果c+1小于阻塞队列的容量(capacity),说明还可以入队if (c + 1 < capacity)//唤醒一个在notFull条件队列中等待的生产线程notFull.signal();}} finally {//释放生产者锁putLock.unlock();}//如果前面没有抛出异常,那么在finally之后会执行下面的代码//如果c为0,那么此时队列中还可能有存在1条数据,刚放进去的//那么由于刚才队列没有数据,可能此时有消费者线程在等待,这里需要唤醒一个消费者线程//如果此前队列中就有数据没有消费完毕,那么也不必唤醒唤醒消费者if (c == 0)//获取消费者锁并且尝试唤醒一个消费者线程signalNotEmpty();//如果c>=0,表示该元素已添加到此队列,则返回true;否则返回falsereturn c >= 0;}/*** 唤醒一个在notEmpty条件队列中等待的消费线程,需要先获取消费者锁,只会在put/offer方法中被调用*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;//阻塞式的获取消费者锁,即不响应中断takeLock.lock();try {//唤醒一个在notEmpty条件队列中等待的消费线程//要想调用Condition对象的方法,必须先要获取该Condition对象对应的lock锁notEmpty.signal();} finally {//释放消费者锁takeLock.unlock();}}/*** 获取并移除此队列的头部,在元素变得可用(队列非空)之前一直等待*/public E take() throws InterruptedException {//元素E x;//初始化c为-1int c = -1;//获取阻塞队列的元素个数final AtomicInteger count = this.count;//获取消费者锁final ReentrantLock takeLock = this.takeLock;//可中断的等待获取消费者锁,即响应中断takeLock.lockInterruptibly();try {//循环判断此时结点数量是否等于0,即队列是否空了while (count.get() == 0) {//如果空了,那么该线程在notEmpty条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断notEmpty.await();}//队列不为空,获取并移除此队列的头部x = dequeue();//获取此时计数器的值赋给c,并且计数器值自减1c = count.getAndDecrement();//如果c大于1,说明还可以出队列,继续消费if (c > 1)//唤醒一个在notEmpty条件队列中等待的消费线程notEmpty.signal();} finally {//释放消费者锁takeLock.unlock();}//如果前面没有抛出异常,那么在finally之后会执行下面的代码//如果c为capacity,那么此前队列中可能具有满的数据,可能此时有生产者线程在等待,//这里需要唤醒一个生产者线程//如果此前队列中的数据没有满,那么也不必唤醒唤醒生产者if (c == capacity)//获取生产者锁并且尝试唤醒一个生产者线程signalNotFull();return x;}/*** 获取并移除此队列的头,如果此队列为空,则直接返回null*/public E poll() {//获取阻塞队列的元素个数final AtomicInteger count = this.count;//在获取锁之前就判断一次,如果队列空了,直接返回nullif (count.get() == 0)return null;//表示要移除的队列头部元素,默认为nullE x = null;//初始化c为-1int c = -1;//获取消费者锁final ReentrantLock takeLock = this.takeLock;//不可中断的等待获取消费者锁,即不响应中断takeLock.lock();try {//判断此时结点数量是否大于0,即队列是否不为空if (count.get() > 0) {//队列不为空,获取并移除此队列的头部x = dequeue();//获取此时计数器的值赋给c,并且计数器值自减1c = count.getAndDecrement();//如果c大于1,说明还可以出队列if (c > 1)//唤醒一个在notEmpty条件队列中等待的消费线程notEmpty.signal();}} finally {//释放消费者锁takeLock.unlock();}//如果前面没有抛出异常,那么在finally之后会执行下面的代码//如果c为capacity,那么此前队列中可能具有满的数据,可能此时有生产者线程在等待,//这里需要唤醒一个生产者线程//如果此前队列中的数据没有满,那么也不必唤醒唤醒生产者(消费一个生产一个)if (c == capacity)//取生产者锁并且尝试唤醒一个生产者线程signalNotFull();return x;}/*** 获取并移除此队列的头部,这里面的新的头部元素会变成哨兵结点,即item置为null** @return the node*/private E dequeue() {//获取此时头部元素Node<E> h = head;//first指向下一个元素Node<E> first = h.next;//原头结点的next指向自己,为什么不指向null呢?//因为在LinkedBlockingQueue中,一个结点的next为null的话//那么表示遍历到了队列末尾,这在迭代器的时候会用到,如果指向自己则表示头结点出了队列h.next = h; // help GC//head指向此时的头部元素head = first;//获取此时头部元素的值E x = first.item;//此时头部元素的值置空,即变成哨兵结点first.item = null;//返回头部元素的值return x;}/*** 唤醒一个生产者线程,只会在take/poll方法中被调用*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;//阻塞式的获取生产者锁,即不响应中断putLock.lock();try {//唤醒一个在notFull条件队列中等待的生产线程notFull.signal();} finally {//释放生产者锁putLock.unlock();}}
}
3、与ArrayBlockingQueue相比较
1、
底层结构实现
:
ArrayBlockingQueue底层采用
数组结构
来实现阻塞队列,内部没有数据结点,数组位置直接存放的元素值,对内存空间的质量要求更高一些(要求连续的内存空间
,数组的性质)。LinkedBlockingQueue底层采用
单链表
来实现阻塞队列,内部具有结点的实现类Node,每一个元素值都有一个Node结点对象来保存,使得LinkedBlockingQueue占用的内存空间要更多一些。2、
初始容量
:
ArrayBlockingQueue在初始化的时候
必须指定容量
,最大为Integer.MAX_VALUE
LinkedBlockingQueue则可以不指定容量,
默认就是最大容量Integer.MAX_VALUE
,如果但是容量过大、元素过大,并且生产线程速度快于消费线程,则可能造成内存溢出。3、
底层实现
:
ArrayBlockingQueue内部采用一个lock锁来控制同步,生产者线程和消费者线程甚至size计数线程都必须获取该锁,使用条件队列notEmpty用于消费线程的阻塞和唤醒,使用条件队列notFull条件变量用于生产线程的阻塞和唤醒,并发效率很低。
LinkedBlockingQueue采用了锁分离技术,具有两把锁takeLock、putLock;takeLock作为消费线程获取的锁,同时有个对应的notEmpty条件变量用于消费线程的阻塞和唤醒。putLock作为生产线程获取的锁,同时有个对应的notFull条件变量用于生产线程的阻塞和唤醒。避免了生产线程和消费线程竞争同一把锁的现象,因此LinkedBlockingQueue在高并发的情况下,性能会比ArrayBlockingQueue好很多,但是在需要遍历整个队列的情况下则要把两把锁都锁住(比如clear、contains、remove(o)、迭代器等方法)。
4、
公平性
:
ArrayBlockingQueue的工作模式可以自己指定公平模式或者非公平模式。
LinkedBlockingQueue的工作模式都是非公平的,也不能手动指定为公平模式,即获取锁的实际线程顺序不能保证是等待获取锁的线程顺序,这样的好处是可以提升并发量。
四、优先级阻塞队列PriorityBlockingQueue
1、概述
1、PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列(底层是数组),优先级的判断通过构造函数传入的Compator对象来决定,每次出队列的元素都是优先级最高的元素。
2、内部只有一个锁lock和一个条件队列notEmpty,生产和消费线程都需要获取lock锁,而notEmpty用于消费线程的等待和唤醒(没有可消费的数据时,阻塞消费线程),因为是无界队列,生产线程不需要等待和唤醒,因此需要注意:
生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则会OOM
。
2、PriorityBlockingQueue原理(部分源码)
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 默认数组容量11 */private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** 要分配的数组的最大大小。尝试分配较大的数组可能会导致内存错误OutOfMemoryError:请求的数组大小超过VM限制* 实际最大数组长度可以超过该值,没有特别实际的意义*/private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** 优先级队列表示为平衡二叉堆:queue[n]的两个孩子是queue[2*n+1]和queue[2*(n+1)]*/private transient Object[] queue;/** 元素数量 */private transient int size;/** 自定义比较器,如果为null,则使用元素的自然顺序比较 */private transient Comparator<? super E> comparator;/** 锁实例,生产和消费线程都需要获取该lock锁 */private final ReentrantLock lock;/** 条件变量实例,消费线程的等待和唤醒 */private final Condition notEmpty;/** 用于手动实现自旋锁的标志位,在tryGrow扩容方法中会用到 */private transient volatile int allocationSpinLock;/** 仅用于序列化和反序列化操作,为了兼容老版本 */private PriorityQueue<E> q;/*** 用默认的初始容量11创建一个PriorityBlockingQueue,并根据元素的自然顺序对其元素进行排序*/public PriorityBlockingQueue() {//调用另一个构造器this(DEFAULT_INITIAL_CAPACITY, null);}/*** 使用指定的初始容量创建一个PriorityBlockingQueue,并根据元素的自然顺序对其元素进行排序* @param initialCapacity 指定初始容量*/public PriorityBlockingQueue(int initialCapacity) {//调用另一个构造器this(initialCapacity, null);}/*** 使用指定的初始容量创建一个PriorityBlockingQueue,并根据指定的比较器对其元素进行排序* 可以看到这里的initialCapacity并没有判断是否超过了MAX_ARRAY_SIZE,因此即使超过了MAX_ARRAY_SIZE也会进行初始化的,这就是最大容量有可能比MAX_ARRAY_SIZE更大的原因!* @param initialCapacity 指定初始容量* @param comparator 指定比较器*/public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {//initialCapacity的校验if (initialCapacity < 1)throw new IllegalArgumentException();//初始化lock锁this.lock = new ReentrantLock();//初始化条件变量this.notEmpty = lock.newCondition();//初始化比较器this.comparator = comparator;//使用initialCapacity初始化底层数组this.queue = new Object[initialCapacity];}/*** 将指定元素e插入此优先级队列。该队列是无界的,所以此方法不会阻塞,一定会返回true*/public boolean offer(E e) {if (e == null)throw new NullPointerException();//获取锁final ReentrantLock lock = this.lock;//不可中断的等待获取lock锁,即不响应中断lock.lock();int n, cap;Object[] array;/** n=size,array = queue,cap=array.length* 循环,如果n大于等于数组长度cap,表示数组容量已满,需要扩容;* 否则结束循环,表示扩容完毕或者不需要扩容*/while ((n = size) >= (cap = (array = queue).length))//调用tryGrow方法扩容,只有在tryGrow方法中被调用,即只有生产者线程会调用tryGrow方法tryGrow(array, cap);//到这一步,一定是不需要扩容或者扩容完毕了,一定是获取到了锁try {//获取指定比较器cmpComparator<? super E> cmp = comparator;if (cmp == null)//如果cmp为null,调用siftUpComparable根据新结点构建小顶堆,使用自然排序siftUpComparable(n, e, array);else//如果cmp不为null,调用siftUpComparable根据新结点构建小顶堆,使用指定比较器排序siftUpUsingComparator(n, e, array, cmp);//size自增1size = n + 1;//尝试唤醒一个在notEmpty中等待的消费线程notEmpty.signal();} finally {//释放锁lock.unlock();}return true;}/*** 尝试增加足够多的数组容量,扩容时没有加锁扩,实际上是采用的一个CAS锁控制只有一个线程能够成功* 并且CAS方法并没有采用循环重试机制,但是没关系,因为如果CAS失败导致扩容失败,那么在外面的offer方法的下一次while循环中* 会判断此时size可能还会大于等于底层数组的容量,如果是然后又会进入该方法,这就相当于一个循环了,如果不是说明其他线程已经扩容成功* @param array 原数组* @param oldCap 原数组大小*/private void tryGrow(Object[] array, int oldCap) {//首先释放获取到的锁lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;/** 下面是第一步扩容的逻辑,用于计算新数组,这里i没有加锁实际上是采用的一个CAS锁* 如果allocationSpinLock为0并且尝试CAS的将allocationSpinLock的值从0变成1成功,那么可以进入if代码块中* 在扩容活动的过程中,如果有多条线程扩容,这里的CAS操作并不能保证最终只有一条线程能够进入if代码块* 但是能保证同时只有一个线程能够进入if代码块中,失败的线程进入下一步,或者某条线程的if代码块执行完毕之后,后续线程再进入if代码块* 这里的if并不能保证,最终扩容的安全,是在if下面重新获取锁之后的赋值时才保证的*/if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {/** 计算新容量newCap,原容量越小增长的越快,这样可以降低扩容次数,因此有两种情况:* 1、如果oldCap小于64,那么newCap = oldCap + oldCap + 2,即扩容增量为oldCap + 2* 2、如果oldCap大于等于64,那么newCap = oldCap + oldCap >> 1 ,即扩容增量为oldCap >> 1(老容量的一半)*/int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));//当oldCap位于[1431655760, 2147483647]区间时,计算出来的newCap - MAX_ARRAY_SIZE将会大于0//如果newCap减去MAX_ARRAY_SIZE大于0,表示容量可能溢出了if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow//计算最小容量minCap = 旧容量+1int minCap = oldCap + 1;//如果minCap小于0,这表示此前的oldCap就是Integer.MAX_VALUE//或者 minCap大于MAX_ARRAY_SIZE,这表示此前的oldCap范围是[Integer.MAX_VALUE-8,Integer.MAX_VALUE-1]//这两种情况就是容量溢出的情况,满足一种即抛出异常if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();//到这一步,说明此前oldCap范围是[1431655760,Integer.MAX_VALUE-9]//这时newCap直接赋值为MAX_ARRAY_SIZE,即最大容量newCap = MAX_ARRAY_SIZE;}/** 如果新容量newCap大于旧容量oldCap* 并且如果底层数组queue还是目前的数组array,说明还没有线程成功扩容过,那么新建数组* 这里的底层数组queue不是目前的数组array的情况是完全可能存在的:* 第一个线程A在执行到tryGrow之中的if条件之前由于时间片到了而释放CPU的执行权,然后第二个线程B扩容成功,此时在queue被设置为线程B的newArray* 随后第程A重新获取到CPU执行权之后,此时allocationSpinLock为0,尽管此时第二个线程已经完成扩容了,但是第一个线程仍然会执行CAS操作并且可以成功* 因此线程A会进入if代码块,执行到这一步会发现此时底层数组queue不是目前的数组array,说明其他线程已经扩容了,那么线程A放弃操作* 在下面的判断中将释放CPU的执行权,随后获取锁,并且由于newArray为null而直接返回*/if (newCap > oldCap && queue == array)//新建数组,长度为新容量,然后赋给newArraynewArray = new Object[newCap];} finally {//无论此前是否发生异常,将标志位allocationSpinLock重置为0,同时只有一个线程执行到这里//因此不需要CAS操作就是线程安全的,后续的线程在CAS时就可能成功进入if代码块allocationSpinLock = 0;}}//如果newArray为null,说明是没有执行if代码块的线程if (newArray == null) // back off if another thread is allocating//没有争取到扩容的线程尽量让出CPU的执行权,回到RUNNABLE状态,让自己和其他多个线程重新争夺cpu执行权Thread.yield();//重新获取锁lock.lock();//如果newArray不为null,说明成功执行了扩容了//并且底层数组queue还是目前的数组array,说明没有其他线程成功扩容过,将queue赋值,并复制数据if (newArray != null && queue == array) {//底层数组queue赋值为新数组,queue = newArray;//将老数组中的数据复制到新数组对应的索引位置中System.arraycopy(array, 0, newArray, 0, oldCap);}}/*** 每添加一个元素,则将其与父结点进行比较,如果新添加结点大于等于父结点,则添加元素到该位置;* 否则,继续向上寻找父结点,直到找到某个位置,使得位于该位置的新元素的值大于等于对应父结点的元素的值,并且将原位置上的元素一一向后(下层)挪动。* @param k 存放元素的索引位置* @param x 指定元素* @param array 数组*/private static <T> void siftUpComparable(int k, T x, Object[] array) {//x强转为Comparable类型,使用key保存Comparable<? super T> key = (Comparable<? super T>) x;/** 循环,如果k大于0,表示还存在父结点* 寻找合适的位置k:在某个插入的位置的新结点大于等于对应的父结点的值*/while (k > 0) {//根据完全二叉树规律,获取k结点的父结点索引int parent = (k - 1) >>> 1;//获取父结点值eObject e = array[parent];//如果key大于等于父结点e,那么结束循环if (key.compareTo((T) e) >= 0)break;//如果key小于父结点e,那么不符合小顶堆的规律//k的位置置为e,即原父结点e降低一层array[k] = e;//k设置为parent,即向上递归,下一次将会用 这一次的parent 和 parent的parent 作比较k = parent;}//到这里,可能是://1 k=0,即第一次添加元素//2 找到了真正的位置k,在该位置插入的新结点e大于等于对应的父结点的值,然后插入元素keyarray[k] = key;}/*** 每添加一个元素,则将其与父结点进行比较,如果新添加结点大于等于父结点,则添加元素到该位置;* 否则,继续向上寻找父结点,直到找到某个位置,使得位于该位置的新元素的值大于等于对应父结点的元素的值,并且将原位置上的元素一一向后(下层)挪动。* @param k 存放元素的索引位置* @param x 指定元素* @param array 数组* @param cmp 指定比较器*/private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {/** 循环,如果k大于0,表示还存在父结点* 寻找合适的位置k:在某个插入的位置的新结点大于等于对应的父结点的值*/while (k > 0) {//根据完全二叉树规律,获取k结点的父结点索引int parent = (k - 1) >>> 1;//获取父结点值eObject e = array[parent];//如果x大于等于父结点e,那么结束循环if (cmp.compare(x, (T) e) >= 0)break;//如果key小于父结点e,那么不符合小顶堆的规律//k的位置置为e,即原父结点e降低一层array[k] = e;//k设置为parent,即向上递归,下一次将会用 这一次的parent 和 parent的parent 作比较k = parent;}//到这里,可能是://1 k=0,即第一次添加元素//2 找到了真正的位置k,在该位置插入的新结点e大于等于对应的父结点的值,然后插入元素keyarray[k] = x;}/*** 获取并移除此队列的头部,在元素变得可用(队列非空)之前一直等待*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//可中断的等待获取lock锁,即响应中断lock.lockInterruptibly();//保存被移除的移除此队列的头部元素E result;try {/** 开启一个循环* 调用dequeue方法,获取并移除此队列的头部,并重构小顶堆,队列为空就返回null* 如果返回的值为null,那么说明队列空了,需要在notEmpty上等待;被唤醒之后继续循环*/while ( (result = dequeue()) == null)notEmpty.await();} finally {//释放锁lock.unlock();}return result;}/*** 获取并移除此队列的头部,并重构小顶堆,队列为空就返回null*/private E dequeue() {//n为size-1 ,表示移除头部之后的队列(堆)大小int n = size - 1;//如果n小于0,表示此时队列(堆)没有元素if (n < 0)return null;else {//获取底层数组Object[] array = queue;//获取数组头部元素,这就是需要被移除的队列头,也是小顶堆的根结点E result = (E) array[0];//获取真正被移除元素x,就是队列尾部(堆尾),同时保存尾部元素xE x = (E) array[n];//索引n的位置置空array[n] = null;//获取比较器Comparator<? super E> cmp = comparator;if (cmp == null)/** 调用siftDownComparable对某结点向下构建部分小顶堆,使用自然排序* 前两个参数传入0,x,表示将堆的根结点看作x;后面传入的堆大小为n,即size-1,说明堆减少了一个元素,就是尾部* 这里的意思是将x的元素逻辑移动至队列头部,暂时成为根结点,然后由根结点向下构建小顶堆* 在构造器部分就是调用这个方法*/siftDownComparable(0, x, array, n);else/** 调用siftDownUsingComparator对某结点向下构建部分小顶堆,使用自然排序* 前两个参数传入0,x,表示将堆的根结点看作x;后面传入的堆大小为n,即size-1,说明堆减少了一个元素,就是尾部* 这里的意思是将x的元素逻辑移动至队列头部,暂时成为根结点,然后由根结点向下构建小顶堆* 在构造器部分就是调用这个方法*/siftDownUsingComparator(0, x, array, n, cmp);//size置为n,减少了1size = n;return result;}}
}
六、阻塞队列与源码分析(上)相关推荐
- 阻塞队列 — DelayQueue源码分析
点赞再看,养成习惯,公众号搜一搜[一角钱技术]关注更多原创技术文章. 本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章. 前言 DelayQueue 由优先级 ...
- 生产-消费模型之阻塞队列的源码分析
文章目录 前言 阻塞队列API 存放元素 boolean add(E e) boolean offer(E e) boolean offer(E e, long timeout, TimeUnit u ...
- Linux驱动修炼之道-SPI驱动框架源码分析(上)
Linux驱动修炼之道-SPI驱动框架源码分析(上) SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设 ...
- 阻塞队列BlockingQueue源码
JAVA阻塞队列 在学习线程池框架ThreadPoolExecutor时发现线程池的实现依赖到了阻塞队列BlockingQueue,在队列为空时take方法会阻塞当前线程,因此这里以ThreadPoo ...
- Tomcat 处理 HTTP 请求源码分析(上)【转】
原文地址:https://www.infoq.cn/article/zh-tomcat-http-request-1 很多开源应用服务器都是集成 tomcat 作为 web container 的,而 ...
- Java分布式跟踪系统Zipkin(六):Brave源码分析-Brave和SpringBoot整合
所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问! Zipkin是用当下最流行的SpringBoot开发的,SpringBoot将Spring项目的开发过程大大简化,一 ...
- 阻塞队列 java 源码_Java源码解析阻塞队列ArrayBlockingQueue常用方法
本文基于jdk1.8进行分析 首先看一下ArrayBlockingQueue的成员变量.如下图.最主要的成员变量是items,它是一个Object类型的数组用于保存阻塞队列中的元素.其次是takeIn ...
- SpringCloud Gateway微服务网关实战与源码分析-上
概述 定义 Spring Cloud Gateway 官网地址 https://spring.io/projects/spring-cloud-gateway/ 最新版本3.1.3 Spring Cl ...
- Laravel Database——查询构造器与语法编译器源码分析 (上)
前言 在前两个文章中,我们分析了数据库的连接启动与数据库底层 CRUD 的原理,底层数据库服务支持原生 sql 的运行.本文以 mysql 为例,向大家讲述支持 Fluent 的查询构造器 query ...
最新文章
- 【camera】基于YOLO的车辆多维特征识别系统(车色,车品牌,车标,车型)与PYQT实现(课程设计)
- 简述在虚拟机中安装 centos 的过程_从零构建Fabric开发运行环境手册(一):安装OS虚拟机(CentOS)...
- 基于openfire源码开发插件
- LeetCode 650. 只有两个键的键盘(DP)
- LeetCode 80. 删除排序数组中的重复项 II
- vue数据改变了,视图不更新不刷新问题
- day22:更换yum源及源码包安装
- linux命令 renice,Linux命令之nice和renice
- (C语言)猴子选大王
- asp.net扩展Forms验证
- 【C#网络编程系列】专题十:实现简单的邮件收发器
- python微信群管理开禁言_微信群主怎么禁言别人?微信群怎么让群员禁言?
- python执行excel公式 语法_Python读取excel文件中带公式的值的实现
- idea右键新建(new) 但是没有Scala class选项
- android数据库工具 SQLiteSpy下载 sharePlus.Sqlite下载 DB.Browser.for.SQLite下载
- 如何设计群发系统消息表
- pcb 受潮_硬盘SATA接口断裂及PCB板受潮_希捷 Barracuda 3TB 7200转 64MB_固态硬盘评测-中关村在线...
- 正则表达式切掉log日志前面不需要的内容
- 刮刮彩票 (20 分)
- Android TV APPs 的介绍与创建