介绍

一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。队列中的元素必须是可比较的,即实现Comparable接口,或者在构建函数时提供可对队列元素进行比较的Comparator对象。不可以放null,会报空指针异常。

数据结构

PriorityBlockingQueue内部使用heap做为存储结构,如下图:

二叉树存入数组的方式很简单,就是从上到下,从左到右。PriorityQueue的是一个有特点的完全二叉树,且不允许出现null节点,其父节点都比叶子节点小,这个是堆排序中的小顶堆。如果按数组顺序我们可以得到如下结论:

  • 左叶子节点=父节点下标*2+1
  • 右叶子节点=父节点下标*2+2
  • 父节点=(叶子节点-1)/2

加入节点:

新加入的元素x可能会破坏小顶堆的性质,因此需要进行调整。调整的过程为:k指定的位置开始,将x逐层与当前点的parent进行比较并交换,直到满足x >= queue[parent]为止

获取元素

由于堆用数组表示,根据下标关系,0下标处的那个元素既是堆顶元素。所以直接返回数组0下标处的那个元素即可

删除第一个元素

k指定的位置开始,将x逐层向下与当前点的左右孩子中较小的那个交换,直到x小于或等于左右孩子中的任何一个为止

删除任意一个元素

由于删除操作会改变队列结构,所以要进行调整;又由于删除元素的位置可能是任意的,所以调整过程比其它函数稍加繁琐。具体来说,remove(Object o)可以分为2种情况:1. 删除的是最后一个元素。直接删除即可,不需要调整。2. 删除的不是最后一个元素,从删除点开始以最后一个元素为参照调用一次siftDown()即可

主要属性

/*** 空间大小默认值:11.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** 空间大小最大值:Integer.MAX_VALUE - 8.*/private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** 队列元素数组。平衡二叉堆实现,父节点下标是n,左节点则是2n+1,右节点是2n+2。最小的元素在最前面,元素通过comparator比较。*/private transient Object[] queue;/*** 入队元素个数*/private transient int size;/*** The comparator, or null 表示自然排序*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/***扩容数组分配资源时的自旋锁,CAS需要*/private transient volatile int allocationSpinLock;/***只用于序列化的时候,为了兼容之前的版本。只有在序列化和反序列化的时候不为null。*/private PriorityQueue<E> q;

方法实现

offer,poll,peek

    public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//如果入队数量大于或者等于heap大小,则扩容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e); // never need to block}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return dequeue();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null && nanos > 0)nanos = notEmpty.awaitNanos(nanos);} finally {lock.unlock();}return result;}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}

put,take

    public void put(E e) {offer(e); // never need to block}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}

enqueue,dequeue

    private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//array[0]即为队首。E result = (E) array[0];//最后一个元素E x = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;//把最后一个元素放置在0位置。并进行下沉。if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}
    private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}

up,down

    /*** 在位置k处插入x。一直向root方向up,直到大于等于等于它的父亲* @param k the position to fill* @param x the item to insert* @param array the heap array*/   private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {//获取父亲元素下标。int parent = (k - 1) >>> 1;Object e = array[parent];//不比父亲元素小,则退出if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}//在位置k处插入xarray[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}
   /*** 在k处插入x,将x逐层向下与当前点的左右孩子中较小的那个交换,直到x小于或等于左右孩子中的任何一个为止* @param k the position to fill* @param x the item to insert* @param array the heap array* @param n heap size*/private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1;           // k <half,才可能有子节点。while (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;//有右孩子,并且右孩子值小于或者右孩子,则与右孩子进行交换if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];//如果值小于孩子,直接退出if (key.compareTo((T) c) <= 0)break;//否则,k处放置孩子的值,k设置为孩子的位置array[k] = c;k = child;}array[k] = key;}}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,int n,Comparator<? super T> cmp) {if (n > 0) {int half = n >>> 1;while (k < half) {int child = (k << 1) + 1;Object c = array[child];int right = child + 1;if (right < n && cmp.compare((T) c, (T) array[right]) > 0)c = array[child = right];if (cmp.compare(x, (T) c) <= 0)break;array[k] = c;k = child;}array[k] = x;}}
    private void removeAt(int i) {Object[] array = queue;int n = size - 1;if (n == i) // 最后一个元素array[i] = null;else {E moved = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(i, moved, array, n);elsesiftDownUsingComparator(i, moved, array, n, cmp);//如果是最后一个元素移动到i,说明未下层,则UPif (array[i] == moved) {if (cmp == null)siftUpComparable(i, moved, array);elsesiftUpUsingComparator(i, moved, array, cmp);}}size = n;}

注意

  • 所有入库操作,例如offer,put等都不会阻塞,因为队列是无界的。

参考

  • ReentrantLock源码
  • Java并发包--阻塞队列(BlockingQueue)

PriorityBlockingQueue源码相关推荐

  1. Java8 PriorityBlockingQueue源码分析

    在看这篇总结之前,建议大家先熟悉一下 PriorityQueue,这里主要介绍 PriorityBlockingQueue 一些特殊的性质,关于优先级队列的知识不作着重介绍,因为过程与 Priorit ...

  2. DelayQueue源码

    介绍 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素.只有延时期满后才能从队列中获取元素.(DelayQueue可以运用在 ...

  3. Netty源码解析之内存管理-PooledByteBufAllocator-PoolArena

      PooledByteBufAllocator是Netty中比较复杂的一种ByteBufAllocator , 因为他涉及到对内存的缓存,分配和释放策略,PooledByteBufAllocator ...

  4. 深读源码-java集合类总结篇

    概览 我们先来看一看java中所有集合的类关系图. 这里面的类太多了,请放大看,如果放大还看不清,请再放大看,如果还是看不清,请放弃. 我们下面主要分成五个部分来逐个击破. List List中的元素 ...

  5. Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池

    详细介绍了Executors线程池工具类的使用,以及四大内置线程池. 系列文章: Java Executor源码解析(1)-Executor执行框架的概述 Java Executor源码解析(2)-T ...

  6. Android之图片加载框架Picasso源码解析

    转载请标明出处: http://blog.csdn.net/hai_qing_xu_kong/article/details/76645535 本文出自:[顾林海的博客] 个人开发的微信小程序,目前功 ...

  7. Java线程池 源码分析

    1.个人总结及想法: (1)ThreadPoolExecutor的继承关系? ThreadPoolExecutor继承AbstractExectorService,AbstractExecutorSe ...

  8. 2017-10-9(Volley使用范例源码分析)

    Volley应该是比较久远的产物了.google在2013 IO发布,但也可以借鉴学习毕竟是google工程师的AOSP产物. 下面从范例代码分析Volley的结构和核心源码. //创建Request ...

  9. 阻塞队列和ArrayBlockingQueue源码解析

    什么是阻塞队列 当队列中为空时,从队列总获取元素的操作将被阻塞,当队列满时,向队列中添加元素的操作将被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,知道其它的线程往队列中插入新的元素.同样,试图 ...

最新文章

  1. java B2B2C源码电子商城系统:服务消费(基础)
  2. layui中laydate兼容ie_layui菜鸟教程--乐字节前端
  3. C#中委托与事件的使用-以Winform中跨窗体传值为例
  4. Lambda 表达式详解~Stream Pipelines
  5. [软件工程学习笔记]个人java小程序---词频统计(二)
  6. js中filter函数
  7. 查看 PCD 点云 windows
  8. python ttk.notebook_Ttk Notebook and PNotebook
  9. Java字节简单介绍
  10. 微信分享网页时自定义标题、描述和图片
  11. 支持邮件群发功能的邮箱有哪些?邮箱如何群发邮件,邮件群发怎么发呢?
  12. VR虚拟现实心理脱敏训练系统整体解决方案
  13. matlab 批量修改文件名常见错误
  14. 502问题怎么排查?
  15. 二维码生成 API数据接口
  16. 号)、sex(性别)、birthday(出生日期)、id(身份证号)等等。其中“出生日期”定义为一个“日期”类内嵌子对象。用成员函数实现对人员信息的录入和显示。要求包括:构造函数和析构函数、拷贝构造函
  17. 徐松亮常用开发软件与网站
  18. GSM/GPRS+GPS模块SIM808
  19. 胡伟立-孤独[影视配乐扒曲]
  20. 使用C语言编写一个两个数的加减乘除程序

热门文章

  1. 关于mobile中datagrid的使用
  2. 应用分类练手项目计划
  3. MIT JOS学习笔记01:环境配置、Boot Loader(2016.10.22)
  4. 面向对象和面向过程连接数据库
  5. 【HDU 5366】The mook jong 详解
  6. windows下git命令的使用
  7. sublime插件调用第三方程序
  8. 网页上有错误(类不能支持 Automation 操作)解决方法
  9. GoAhead 2.5 Web Server 网页ROM化的改进
  10. 一些非常有用的备忘录文档