PriorityBlockingQueue源码
介绍
一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。队列中的元素必须是可比较的,即实现Comparable接口,或者在构建函数时提供可对队列元素进行比较的Comparator对象。不可以放null,会报空指针异常。
数据结构
PriorityBlockingQueue内部使用heap做为存储结构,如下图:
二叉树存入数组的方式很简单,就是从上到下,从左到右。PriorityQueue的是一个有特点的完全二叉树,且不允许出现null节点,其父节点都比叶子节点小,这个是堆排序中的小顶堆。如果按数组顺序我们可以得到如下结论:
- 左叶子节点=父节点下标*2+1
- 右叶子节点=父节点下标*2+2
- 父节点=(叶子节点-1)/2
加入节点:
新加入的元素x
可能会破坏小顶堆的性质,因此需要进行调整。调整的过程为:从k
指定的位置开始,将x
逐层与当前点的parent
进行比较并交换,直到满足x >= queue[parent]
为止
获取元素
由于堆用数组表示,根据下标关系,0
下标处的那个元素既是堆顶元素。所以直接返回数组0
下标处的那个元素即可。
删除第一个元素
从k
指定的位置开始,将x
逐层向下与当前点的左右孩子中较小的那个交换,直到x
小于或等于左右孩子中的任何一个为止。
删除任意一个元素
由于删除操作会改变队列结构,所以要进行调整;又由于删除元素的位置可能是任意的,所以调整过程比其它函数稍加繁琐。具体来说,remove(Object o)
可以分为2种情况:1. 删除的是最后一个元素。直接删除即可,不需要调整。2. 删除的不是最后一个元素,从删除点开始以最后一个元素为参照调用一次siftDown()
即可
主要属性
/*** 空间大小默认值:11.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** 空间大小最大值:Integer.MAX_VALUE - 8.*/private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** 队列元素数组。平衡二叉堆实现,父节点下标是n,左节点则是2n+1,右节点是2n+2。最小的元素在最前面,元素通过comparator比较。*/private transient Object[] queue;/*** 入队元素个数*/private transient int size;/*** The comparator, or null 表示自然排序*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/***扩容数组分配资源时的自旋锁,CAS需要*/private transient volatile int allocationSpinLock;/***只用于序列化的时候,为了兼容之前的版本。只有在序列化和反序列化的时候不为null。*/private PriorityQueue<E> q;
方法实现
offer,poll,peek
public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//如果入队数量大于或者等于heap大小,则扩容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e); // never need to block}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return dequeue();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null && nanos > 0)nanos = notEmpty.awaitNanos(nanos);} finally {lock.unlock();}return result;}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}
put,take
public void put(E e) {offer(e); // never need to block}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}
enqueue,dequeue
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//array[0]即为队首。E result = (E) array[0];//最后一个元素E x = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;//把最后一个元素放置在0位置。并进行下沉。if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}
up,down
/*** 在位置k处插入x。一直向root方向up,直到大于等于等于它的父亲* @param k the position to fill* @param x the item to insert* @param array the heap array*/ private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {//获取父亲元素下标。int parent = (k - 1) >>> 1;Object e = array[parent];//不比父亲元素小,则退出if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}//在位置k处插入xarray[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}
/*** 在k处插入x,将x逐层向下与当前点的左右孩子中较小的那个交换,直到x小于或等于左右孩子中的任何一个为止* @param k the position to fill* @param x the item to insert* @param array the heap array* @param n heap size*/private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // k <half,才可能有子节点。while (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;//有右孩子,并且右孩子值小于或者右孩子,则与右孩子进行交换if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];//如果值小于孩子,直接退出if (key.compareTo((T) c) <= 0)break;//否则,k处放置孩子的值,k设置为孩子的位置array[k] = c;k = child;}array[k] = key;}}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,int n,Comparator<? super T> cmp) {if (n > 0) {int half = n >>> 1;while (k < half) {int child = (k << 1) + 1;Object c = array[child];int right = child + 1;if (right < n && cmp.compare((T) c, (T) array[right]) > 0)c = array[child = right];if (cmp.compare(x, (T) c) <= 0)break;array[k] = c;k = child;}array[k] = x;}}
private void removeAt(int i) {Object[] array = queue;int n = size - 1;if (n == i) // 最后一个元素array[i] = null;else {E moved = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(i, moved, array, n);elsesiftDownUsingComparator(i, moved, array, n, cmp);//如果是最后一个元素移动到i,说明未下层,则UPif (array[i] == moved) {if (cmp == null)siftUpComparable(i, moved, array);elsesiftUpUsingComparator(i, moved, array, cmp);}}size = n;}
注意
- 所有入库操作,例如offer,put等都不会阻塞,因为队列是无界的。
参考
- ReentrantLock源码
- Java并发包--阻塞队列(BlockingQueue)
PriorityBlockingQueue源码相关推荐
- Java8 PriorityBlockingQueue源码分析
在看这篇总结之前,建议大家先熟悉一下 PriorityQueue,这里主要介绍 PriorityBlockingQueue 一些特殊的性质,关于优先级队列的知识不作着重介绍,因为过程与 Priorit ...
- DelayQueue源码
介绍 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素.只有延时期满后才能从队列中获取元素.(DelayQueue可以运用在 ...
- Netty源码解析之内存管理-PooledByteBufAllocator-PoolArena
PooledByteBufAllocator是Netty中比较复杂的一种ByteBufAllocator , 因为他涉及到对内存的缓存,分配和释放策略,PooledByteBufAllocator ...
- 深读源码-java集合类总结篇
概览 我们先来看一看java中所有集合的类关系图. 这里面的类太多了,请放大看,如果放大还看不清,请再放大看,如果还是看不清,请放弃. 我们下面主要分成五个部分来逐个击破. List List中的元素 ...
- Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池
详细介绍了Executors线程池工具类的使用,以及四大内置线程池. 系列文章: Java Executor源码解析(1)-Executor执行框架的概述 Java Executor源码解析(2)-T ...
- Android之图片加载框架Picasso源码解析
转载请标明出处: http://blog.csdn.net/hai_qing_xu_kong/article/details/76645535 本文出自:[顾林海的博客] 个人开发的微信小程序,目前功 ...
- Java线程池 源码分析
1.个人总结及想法: (1)ThreadPoolExecutor的继承关系? ThreadPoolExecutor继承AbstractExectorService,AbstractExecutorSe ...
- 2017-10-9(Volley使用范例源码分析)
Volley应该是比较久远的产物了.google在2013 IO发布,但也可以借鉴学习毕竟是google工程师的AOSP产物. 下面从范例代码分析Volley的结构和核心源码. //创建Request ...
- 阻塞队列和ArrayBlockingQueue源码解析
什么是阻塞队列 当队列中为空时,从队列总获取元素的操作将被阻塞,当队列满时,向队列中添加元素的操作将被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,知道其它的线程往队列中插入新的元素.同样,试图 ...
最新文章
- java B2B2C源码电子商城系统:服务消费(基础)
- layui中laydate兼容ie_layui菜鸟教程--乐字节前端
- C#中委托与事件的使用-以Winform中跨窗体传值为例
- Lambda 表达式详解~Stream Pipelines
- [软件工程学习笔记]个人java小程序---词频统计(二)
- js中filter函数
- 查看 PCD 点云 windows
- python ttk.notebook_Ttk Notebook and PNotebook
- Java字节简单介绍
- 微信分享网页时自定义标题、描述和图片
- 支持邮件群发功能的邮箱有哪些?邮箱如何群发邮件,邮件群发怎么发呢?
- VR虚拟现实心理脱敏训练系统整体解决方案
- matlab 批量修改文件名常见错误
- 502问题怎么排查?
- 二维码生成 API数据接口
- 号)、sex(性别)、birthday(出生日期)、id(身份证号)等等。其中“出生日期”定义为一个“日期”类内嵌子对象。用成员函数实现对人员信息的录入和显示。要求包括:构造函数和析构函数、拷贝构造函
- 徐松亮常用开发软件与网站
- GSM/GPRS+GPS模块SIM808
- 胡伟立-孤独[影视配乐扒曲]
- 使用C语言编写一个两个数的加减乘除程序