在看这篇总结之前,建议大家先熟悉一下 PriorityQueue,这里主要介绍 PriorityBlockingQueue 一些特殊的性质,关于优先级队列的知识不作着重介绍,因为过程与 PriorityQueue 都是一致的。

关于 PriorityQueue 的文章,你可以参考这里->点击前往~

PriorityBlockingQueue 相关源码分析

add 方法

    public boolean add(E e) {return offer(e);}

add 方法主要调用的是 offer 方法,下面我们来看 offer 方法。

    public boolean offer(E e) {// 队列所有的元素不允许为 nullif (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;// 加锁lock.lock();int n, cap;Object[] array;// 判断是否需要扩容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果不自定义比较器,则默认为一个小顶堆,从下往上判断进行调整if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;// 唤醒非空条件对象notEmpty.signal();} finally {// 释放锁lock.unlock();}return true;}

offer 方法整体的流程并不是复杂,首先加锁,然后判断是否需要扩容,接着添加元素,添加元素也分了两种情况,一种是没有自定义比较器,默认是小顶堆,如果初始化了自定义比较器,则按照自定义比较器的逻辑添加元素,因为添加了元素,队列肯定不为空,因此要唤醒 notEmpty 条件。

我们以不自定义比较器为例,看一下 siftUpComparable 方法是如何调整堆结构的。

    private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}

siftUpComparable 方法与 PriorityQueue 中对应的方法简直是一模一样,放一张图在这,就不具体介绍了。

其中比较有意思的是 tryGrow 扩容方法,我们接下来看一下这个方法。

/*** Q:扩容操作为什么要允许多个线程进来呢?* A:如果整个扩容过程还加锁的话,其他线程是不能修改队列的,* 只能等待扩容完后才能继续执行,并发效率比较低*/
private void tryGrow(Object[] array, int oldCap) {// 释放锁lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;/*** compareAndSwapInt:** this:当前对象的引用* allocationSpinLockOffset:allocationSpinLock 在内存中的偏移量* 0:allocationSpinLock 的预期值* 1:更新值*/if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {// 当容量小于 64 时容量为原来的两倍 + 2,如果大于等于 64 时扩容为原来的 1.5 倍// 与 PriorityQueue 一致int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}// 初始化新的数组if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// allocationSpinLock 置 0// 因此后面的线程获取锁也可能会尝试 CAS 成功,然后初始化新数组allocationSpinLock = 0;}}/*** 如果当前线程尝试 CAS 失败,则尝试让步* Q:这里为什么要让步?* A:因为自己不是成功初始化新数组的线程,就算获取到了线程也不能正确扩容,* 因此让步尽量让成功扩容的线程获取锁*/if (newArray == null) // back off if another thread is allocatingThread.yield();/*** Q:在加锁之前,可能由多个数组尝试 CAS 成功,且成功的初始化了新的数组,* 那么是不是后面的新数组会覆盖前面的数组呢?* A:当然答案肯定是不会的,那么是如何保证正确性的呢?关键在于 queue == array 判断,* 因此只有第一个判断成功的线程能正确扩容,其他非第一个线程再进行判断的时候会返回 false,* 自然不会进行数组元素拷贝*/lock.lock();if (newArray != null && queue == array) {// 重置队列内部数组queue = newArray;// 元素拷贝,同 PriorityQueueSystem.arraycopy(array, 0, newArray, 0, oldCap);}}

这个方法比较特殊的地方在于先释放了锁,然后通过 CAS 操作判断是否需要初始化新数组,尝试 CAS 失败的线程,会做出一个让步,放弃 CPU 时间片,然后与其他线程一同竞争。这个过程我们可以思考以下几个问题:

  • 为什么不直接加锁而是通过 CAS 加判断操作完成扩容步骤
  • 为什么尝试 CAS 失败的线程需要让步
  • 在多线程情况下可能会有多个线程初始化新数组,那如何保证操作一致性

这些问题在上面的方法里都总结了一些自己的想法,如果大家有不同的见解可以留言交流。

take 方法

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加锁(可响应中断)lock.lockInterruptibly();E result;try {// 如果队列为空,take 方法会阻塞出队线程while ( (result = dequeue()) == null)/*** 如果队列中没有元素,会阻塞后续调用 take 方法出队的线程* 直到队列添加了元素后唤醒 notEmpty,才可以继续执行*/notEmpty.await();} finally {// 释放锁lock.unlock();}return result;}

take 方法中调用了 dequeue 方法,如下:

    private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;// 堆顶的元素E result = (E) array[0];// 堆最底层的元素(最后一个)E x = (E) array[n];// 把最后一个元素置 null,因为要把它放到堆顶,向下逐步调整堆结构,与 PriorityQueue 一致array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}

我们还以不自定义比较器为例,看下 siftDownComparable 方法。

    private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1;           // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];if (key.compareTo((T) c) <= 0)break;array[k] = c;k = child;}array[k] = key;}}

过程与 ProrityQueue 还是一样的,就不分析了,放一张图帮助大家理解吧。


(完)

Java8 PriorityBlockingQueue源码分析相关推荐

  1. Java8 ThreadLocal 源码分析

    可参考文章: Java8 IdentityhashMap 源码分析 IdentityhashMap 与 ThreadLocalMap 一样都是采用线性探测法解决哈希冲突,有兴趣的可以先了解下 Iden ...

  2. Java8 HashMap源码分析

    前言 今天,我们主要来研究一下在Java8中HashMap的数据结构及一些重要方法的具体实现.       研究HashMap的源代码之前,我们首先来研究一下常用的三种数据结构:数组.链表和红黑树. ...

  3. Java8 EnumMap 源码分析

    一.EnumMap 概述 EnumMap 是一个用于存储 key 为枚举类型的 map,底层使用数组实现(K,V 双数组).下面是其继承结构: public class EnumMap<K ex ...

  4. Java8 IdentityHashMap 源码分析

    在讲这个数据结构之前,我们先来看一段代码: public static void main(String[] args) {IdentityHashMap<String, Integer> ...

  5. Java8 ReentrantLock 源码分析

    一.ReentrantLock 概述 1.1 ReentrantLock 简介 故名思义,ReentrantLock 意为可重入锁,那么什么是可重入锁呢?可重入意为一个持有锁的线程可以对资源重复加锁而 ...

  6. Java8 CopyOnWriteArrayList 源码分析

    一.CopyOnWriteArrayList 概述 1.1 概念概述 CopyOnWriteArrayList 是 juc 包下一个线程安全的并发容器,底层使用数组实现.CopyOnWrite 顾名思 ...

  7. Java8 CountDownLatch 源码分析

    一.CountDownLatch 概述 1.1 什么是 CountDLatch 闭锁(CountDownLatch)是 java.util.concurrent 包下的一种同步工具类.闭锁可以用来确保 ...

  8. 【Java源码分析】Java8的ArrayList源码分析

    Java8的ArrayList源码分析 源码分析 ArrayList类的定义 字段属性 构造函数 trimToSize()函数 Capacity容量相关的函数,比如扩容 List大小和是否为空 con ...

  9. 【Java源码分析】Java8的HashMap源码分析

    Java8中的HashMap源码分析 源码分析 HashMap的定义 字段属性 构造函数 hash函数 comparableClassFor,compareComparables函数 tableSiz ...

最新文章

  1. 用Python编写代码分析《英雄联盟》游戏胜利的最重要因素
  2. HTML 各种鼠标手势
  3. 云原生生态周报 Vol. 19 | Helm 推荐用户转向 V3
  4. 如何在企业内部实现云信私有化
  5. oracle查看和替换含不可见字符(空白)
  6. idea新建module 后 mapper老是说mapper和xml没有绑定
  7. js ajax java传参_js使用ajax传值给后台,后台返回字符串处理方法
  8. 医保费用监控指标体系建立(八)医生指标分析
  9. 16进制颜色与UIColor互转
  10. 今天来看一下云测平台的测试实验
  11. 《中国传统文化》慕课期末试题及答案
  12. 解决非root用户没有权限运行docker命令的问题
  13. 阿里云达摩院视觉AI介绍
  14. springboot中使用thymeleaf片段引入出现500错误(易错)
  15. 计算机机房防火门,弱电机房门为防火门吗
  16. 怎么对接个人收款支付接口(扫码支付)
  17. 说大数据杀熟,这锅可不背!
  18. 攻防世界Misc高手进阶区第一页WriteUp
  19. 【C语言】猜随机数小游戏(知识点:如何产生一个随机值)
  20. Flask框架基础Jinja2模板

热门文章

  1. spring-xml实现aop-通知的种类
  2. idea 编译显示source1.3不支持泛型(请使用source5或更高版本)
  3. SpirngBoot整合MyBatis出现“SAXParseException”和“文件提前结束”异常解决办法
  4. Spring Security——自定义认证错误提示信息及自适应返回格式解决方案
  5. Longest Common Substring
  6. Array K-Coloring
  7. Make It Connected
  8. java源码导入eclipse_spring5源码如何导入eclipse
  9. 使用分页插件PageHelper
  10. 360安全卫士 导致MySQL 5.0.24 自动关闭