阻塞队列(二):PriorityBlockingQueue
阻塞队列(二):PriorityBlockingQueue
我们知道线程Thread可以调用setPriority(int newPriority)来设置优先级的,线程优先级高的线程先执行,优先级低的后执行。而前面介绍的ArrayBlockingQueue、LinkedBlockingQueue都是采用FIFO原则来确定线程执行的先后顺序,那么有没有一个队列可以支持优先级呢? PriorityBlockingQueue 。
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
二叉堆
由于PriorityBlockingQueue底层采用二叉堆来实现的,所以有必要先介绍下二叉堆。
二叉堆是一种特殊的堆,就结构性而言就是完全二叉树或者是近似完全二叉树,满足树结构性和堆序性。树机构特性就是完全二叉树应该有的结构,堆序性则是:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。它有两种表现形式:最大堆、最小堆。
最大堆:父节点的键值总是大于或等于任何一个子节点的键值(下右图)
最小堆:父节点的键值总是小于或等于任何一个子节点的键值(下走图)
二叉堆一般用数组表示,如果父节点的节点位置在n处,那么其左孩子节点为:2 *n + 1 ,其右孩子节点为2 *(n + 1),其父节点为(n - 1) / 2 处。上左图的数组表现形式为:
二叉堆的基本结构了解了,下面来看看二叉堆的添加和删除节点。二叉堆的添加和删除相对于二叉树来说会简单很多。
添加元素
首先将要添加的元素N插添加到堆的末尾位置(在二叉堆中我们称之为空穴)。如果元素N放入空穴中而不破坏堆的序(其值大于跟父节点值(最大堆是小于父节点)),那么插入完成。否则,我们则将该元素N的节点与其父节点进行交换,然后与其新父节点进行比较直到它的父节点不在比它小(最大堆是大)或者到达根节点。
假如有如下一个二叉堆
这是一个最小堆,其父节点总是小于等于任一一个子节点。现在我们添加一个元素2。
第一步:在末尾添加一个元素2,如下:
第二步:元素2比其父节点6小,进行替换,如下:
第三步:继续与其父节点5比较,小于,替换:
第四步:继续比较其跟节点1,发现跟节点比自己小,则完成,到这里元素2插入完毕。所以整个添加元素过程可以概括为:在元素末尾插入元素,然后不断比较替换直到不能移动为止。
复杂度:Ο(logn)
删除元素
删除元素与增加元素一样,需要维护整个二叉堆的序。删除位置1的元素(数组下标0),则把最后一个元素空出来移到最前边,然后和它的两个子节点比较,如果两个子节点中较小的节点小于该节点,就将他们交换,知道两个子节点都比该元素大为止。
就上面二叉堆而言,删除的元素为元素1。
第一步:删掉元素1,元素6空出来,如下:
第二步:与其两个子节点(元素2、元素3)比较,都小,将其中较小的元素(元素2)放入到该空穴中:
第三步:继续比较两个子节点(元素5、元素7),还是都小,则将较小的元素(元素5)放入到该空穴中:
第四步:比较其子节点(元素8),比该节点小,则元素6放入该空穴位置不会影响二叉堆的树结构,放入:
到这里整个删除操作就已经完成了。
二叉堆的添加、删除操作还是比较简单的,很容易就理解了。下面我们就参考该内容来开启PriorityBlockingQueue的源代码研究。
PriorityBlockingQueue
PriorityBlockingQueue继承AbstractQueue,实现BlockingQueue接口。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable
定义了一些属性
// 默认容量private static final int DEFAULT_INITIAL_CAPACITY = 11;// 最大容量private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;// 二叉堆数组private transient Object[] queue;// 队列元素的个数private transient int size;// 比较器,如果为空,则为自然顺序private transient Comparator<? super E> comparator;// 内部锁private final ReentrantLock lock;private final Condition notEmpty;//private transient volatile int allocationSpinLock;// 优先队列:主要用于序列化,这是为了兼容之前的版本。只有在序列化和反序列化才非空private PriorityQueue<E> q;
内部仍然采用可重入锁ReentrantLock来实现同步机制,但是这里只有一个notEmpty的Condition,了解了ArrayBlockingQueue我们知道它定义了两个Condition,之类为何只有一个呢?原因就在于PriorityBlockingQueue是一个无界队列,插入总是会成功,除非消耗尽了资源导致服务器挂。
入列
PriorityBlockingQueue提供put()、add()、offer()方法向队列中加入元素。我们这里从put()入手:put(E e) :将指定元素插入此优先级队列。
public void put(E e) {offer(e); // never need to block
}
PriorityBlockingQueue是无界的,所以不可能会阻塞。内部调用offer(E e):
public boolean offer(E e) {// 不能为nullif (e == null)throw new NullPointerException();// 获取锁final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;// 扩容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 根据比较器是否为null,做不同的处理if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;// 唤醒正在等待的消费者线程notEmpty.signal();} finally {lock.unlock();}return true;}
siftUpComparable
当比较器comparator为null时,采用自然排序,调用siftUpComparable方法:
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;// “上冒”过程while (k > 0) {// 父级节点 (n - ) / 2int parent = (k - 1) >>> 1;Object e = array[parent];// key >= parent 完成(最大堆)if (key.compareTo((T) e) >= 0)break;// key < parant 替换array[k] = e;k = parent;}array[k] = key;}
这段代码所表示的意思:将元素X插入到数组中,然后进行调整以保持二叉堆的特性。
siftUpUsingComparator
当比较器不为null时,采用所指定的比较器,调用siftUpUsingComparator方法:
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}
扩容:tryGrow
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // 扩容操作使用自旋,不需要锁主锁,释放Object[] newArray = null;// CAS 占用if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {// 新容量 最小翻倍int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));// 超过if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE; // 最大容量}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0; // 扩容后allocationSpinLock = 0 代表释放了自旋锁}}// 到这里如果是本线程扩容newArray肯定是不为null,为null就是其他线程在处理扩容,那就让给别的线程处理if (newArray == null)Thread.yield();// 主锁获取锁lock.lock();// 数组复制if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}
}
整个添加元素的过程和上面二叉堆一模一样:先将元素添加到数组末尾,然后采用“上冒”的方式将该元素尽量往上冒。
出列
PriorityBlockingQueue提供poll()、remove()方法来执行出对操作。出对的永远都是第一个元素:array[0]。
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return dequeue();} finally {lock.unlock();}}
先获取锁,然后调用dequeue()方法:
private E dequeue() {// 没有元素 返回nullint n = size - 1;if (n < 0)return null;else {Object[] array = queue;// 出对元素E result = (E) array[0];// 最后一个元素(也就是插入到空穴中的元素)E x = (E) array[n];array[n] = null;// 根据比较器释放为null,来执行不同的处理Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}
siftDownComparable
如果比较器为null,则调用siftDownComparable来进行自然排序处理:
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;// 最后一个叶子节点的父节点位置int half = n >>> 1;while (k < half) {int child = (k << 1) + 1; // 待调整位置左节点位置Object c = array[child]; //左节点int right = child + 1; //右节点//左右节点比较,取较小的if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];//如果待调整key最小,那就退出,直接赋值if (key.compareTo((T) c) <= 0)break;//如果key不是最小,那就取左右节点小的那个放到调整位置,然后小的那个节点位置开始再继续调整array[k] = c;k = child;}array[k] = key;}}
处理思路和二叉堆删除节点的逻辑一样:就第一个元素定义为空穴,然后把最后一个元素取出来,尝试插入到空穴位置,并与两个子节点值进行比较,如果不符合,则与其中较小的子节点进行替换,然后继续比较调整。
siftDownUsingComparator
如果指定了比较器,则采用比较器来进行调整:
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,int n,Comparator<? super T> cmp) {if (n > 0) {int half = n >>> 1;while (k < half) {int child = (k << 1) + 1;Object c = array[child];int right = child + 1;if (right < n && cmp.compare((T) c, (T) array[right]) > 0)c = array[child = right];if (cmp.compare(x, (T) c) <= 0)break;array[k] = c;k = child;}array[k] = x;}
}
PriorityBlockingQueue采用二叉堆来维护,所以整个处理过程不是很复杂,添加操作则是不断“上冒”,而删除操作则是不断“下掉”。掌握二叉堆就掌握了PriorityBlockingQueue,无论怎么变还是不离其宗。对于PriorityBlockingQueue需要注意的是他是一个无界队列,所以添加操作是不会失败的,除非资源耗尽。
阻塞队列(二):PriorityBlockingQueue相关推荐
- 死磕Java并发:J.U.C之阻塞队列:PriorityBlockingQueue
作者:chenssy 来源:Java技术驿站 我们知道线程Thread可以调用setPriority(int newPriority)来设置优先级的,线程优先级高的线程先执行,优先级低的后执行.而前面 ...
- 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )
文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...
- 并发-阻塞队列源码分析
阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...
- Java多线程(六)——多线程的阻塞队列
目录 一.引言 二.阻塞队列种类 三.阻塞队列使用 四.阻塞队列实现原理 五.总结 一.引言 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取 ...
- 阻塞队列(BlockingQueue)
目录 一.队列(Queue) 1.Queue接口 2.Deque接口 3.ArrayDeque类 二.非阻塞队列(AbstractQueue) 1.优先级队列(priority queue) 2.Co ...
- java之阻塞队列和非阻塞队列
目录 一.什么是阻塞队列 二.阻塞队列种类分析 三.非阻塞队列 Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子 ...
- 阻塞队列,有界队列,无界队列
在并发编程中,有时候需要使用线程安全的队列.如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法. 使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和 ...
- 锁与并发工具包与线程池与LockSupport与Fork/Join框架与并行流串行流与阻塞队列与JPS,jstack命令查看死锁查看线程状态与AQS个人笔记九
朝闻道,夕死可矣 本文共计 86564字,估计阅读时长1小时 点击进入->Thread源码万字逐行解析 文章目录 本文共计 86564字,估计阅读时长1小时 一锁 二Java中13个原子操作类 ...
- 阻塞队列与非阻塞队列
阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用 ...
- 聊聊并发(七)——Java中的阻塞队列
1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用 ...
最新文章
- 用弹性振子力学系统方法计算一组反对角矩阵的质量和频率n+m=8
- 电脑桌面的东西突然不显示了
- Tomcat中组件的生命周期管理(三)
- 4.由键盘任意输入10个数据,分别统计其中的正数个数、正数之和、负数个数、负数之和。
- 《犯罪心理学》读书笔记(part11)--犯罪心理的性别差异(中)
- Linux视频 pad,Wine 1.9.10 发布下载, 改善视频输出
- LeetCode:砖墙【554】
- 使用Docker高效搭建开发环境
- Postgresql之split_part()切割函数,取最后一部分
- vue循环渲染本地图片不显示?
- linux 软硬连接
- django 轮播图上传_拼多多规则更新:关于【商品轮播图】你所不知道的秘密!...
- cors java 安全问题_使用CORS实现JavaWeb跨域请求问题的方法
- 安装vbb的一些经验(包括mysql,apache)
- KEIL中无IAP或者STC芯片型号怎么办
- win7计算机管理没有用户模块,Win7系统安装“ipx协议”提示“找不到相应的模块”如何解决...
- java 长方形正方形_面向对象的疑惑,java设计正方形,长方形
- 心知天气api PHP,php调用心知天气API,实现微信自动回复
- AndroidWear官方文档总结01 - 简介
- AC_PosControl.cpp的AC_PosControl::set_alt_target_with_slew函数代码分析