Java8 PriorityBlockingQueue源码分析
在看这篇总结之前,建议大家先熟悉一下 PriorityQueue
,这里主要介绍 PriorityBlockingQueue
一些特殊的性质,关于优先级队列的知识不作着重介绍,因为过程与 PriorityQueue
都是一致的。
关于 PriorityQueue
的文章,你可以参考这里->点击前往~
PriorityBlockingQueue 相关源码分析
add 方法
public boolean add(E e) {return offer(e);}
add
方法主要调用的是 offer
方法,下面我们来看 offer
方法。
public boolean offer(E e) {// 队列所有的元素不允许为 nullif (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;// 加锁lock.lock();int n, cap;Object[] array;// 判断是否需要扩容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果不自定义比较器,则默认为一个小顶堆,从下往上判断进行调整if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;// 唤醒非空条件对象notEmpty.signal();} finally {// 释放锁lock.unlock();}return true;}
offer
方法整体的流程并不是复杂,首先加锁,然后判断是否需要扩容,接着添加元素,添加元素也分了两种情况,一种是没有自定义比较器,默认是小顶堆,如果初始化了自定义比较器,则按照自定义比较器的逻辑添加元素,因为添加了元素,队列肯定不为空,因此要唤醒 notEmpty
条件。
我们以不自定义比较器为例,看一下 siftUpComparable
方法是如何调整堆结构的。
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}
siftUpComparable
方法与 PriorityQueue
中对应的方法简直是一模一样,放一张图在这,就不具体介绍了。
其中比较有意思的是 tryGrow
扩容方法,我们接下来看一下这个方法。
/*** Q:扩容操作为什么要允许多个线程进来呢?* A:如果整个扩容过程还加锁的话,其他线程是不能修改队列的,* 只能等待扩容完后才能继续执行,并发效率比较低*/
private void tryGrow(Object[] array, int oldCap) {// 释放锁lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;/*** compareAndSwapInt:** this:当前对象的引用* allocationSpinLockOffset:allocationSpinLock 在内存中的偏移量* 0:allocationSpinLock 的预期值* 1:更新值*/if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {// 当容量小于 64 时容量为原来的两倍 + 2,如果大于等于 64 时扩容为原来的 1.5 倍// 与 PriorityQueue 一致int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}// 初始化新的数组if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// allocationSpinLock 置 0// 因此后面的线程获取锁也可能会尝试 CAS 成功,然后初始化新数组allocationSpinLock = 0;}}/*** 如果当前线程尝试 CAS 失败,则尝试让步* Q:这里为什么要让步?* A:因为自己不是成功初始化新数组的线程,就算获取到了线程也不能正确扩容,* 因此让步尽量让成功扩容的线程获取锁*/if (newArray == null) // back off if another thread is allocatingThread.yield();/*** Q:在加锁之前,可能由多个数组尝试 CAS 成功,且成功的初始化了新的数组,* 那么是不是后面的新数组会覆盖前面的数组呢?* A:当然答案肯定是不会的,那么是如何保证正确性的呢?关键在于 queue == array 判断,* 因此只有第一个判断成功的线程能正确扩容,其他非第一个线程再进行判断的时候会返回 false,* 自然不会进行数组元素拷贝*/lock.lock();if (newArray != null && queue == array) {// 重置队列内部数组queue = newArray;// 元素拷贝,同 PriorityQueueSystem.arraycopy(array, 0, newArray, 0, oldCap);}}
这个方法比较特殊的地方在于先释放了锁,然后通过 CAS 操作判断是否需要初始化新数组,尝试 CAS 失败的线程,会做出一个让步,放弃 CPU 时间片,然后与其他线程一同竞争。这个过程我们可以思考以下几个问题:
- 为什么不直接加锁而是通过 CAS 加判断操作完成扩容步骤
- 为什么尝试 CAS 失败的线程需要让步
- 在多线程情况下可能会有多个线程初始化新数组,那如何保证操作一致性
这些问题在上面的方法里都总结了一些自己的想法,如果大家有不同的见解可以留言交流。
take 方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加锁(可响应中断)lock.lockInterruptibly();E result;try {// 如果队列为空,take 方法会阻塞出队线程while ( (result = dequeue()) == null)/*** 如果队列中没有元素,会阻塞后续调用 take 方法出队的线程* 直到队列添加了元素后唤醒 notEmpty,才可以继续执行*/notEmpty.await();} finally {// 释放锁lock.unlock();}return result;}
take
方法中调用了 dequeue
方法,如下:
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;// 堆顶的元素E result = (E) array[0];// 堆最底层的元素(最后一个)E x = (E) array[n];// 把最后一个元素置 null,因为要把它放到堆顶,向下逐步调整堆结构,与 PriorityQueue 一致array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}
我们还以不自定义比较器为例,看下 siftDownComparable
方法。
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];if (key.compareTo((T) c) <= 0)break;array[k] = c;k = child;}array[k] = key;}}
过程与 ProrityQueue
还是一样的,就不分析了,放一张图帮助大家理解吧。
(完)
Java8 PriorityBlockingQueue源码分析相关推荐
- Java8 ThreadLocal 源码分析
可参考文章: Java8 IdentityhashMap 源码分析 IdentityhashMap 与 ThreadLocalMap 一样都是采用线性探测法解决哈希冲突,有兴趣的可以先了解下 Iden ...
- Java8 HashMap源码分析
前言 今天,我们主要来研究一下在Java8中HashMap的数据结构及一些重要方法的具体实现. 研究HashMap的源代码之前,我们首先来研究一下常用的三种数据结构:数组.链表和红黑树. ...
- Java8 EnumMap 源码分析
一.EnumMap 概述 EnumMap 是一个用于存储 key 为枚举类型的 map,底层使用数组实现(K,V 双数组).下面是其继承结构: public class EnumMap<K ex ...
- Java8 IdentityHashMap 源码分析
在讲这个数据结构之前,我们先来看一段代码: public static void main(String[] args) {IdentityHashMap<String, Integer> ...
- Java8 ReentrantLock 源码分析
一.ReentrantLock 概述 1.1 ReentrantLock 简介 故名思义,ReentrantLock 意为可重入锁,那么什么是可重入锁呢?可重入意为一个持有锁的线程可以对资源重复加锁而 ...
- Java8 CopyOnWriteArrayList 源码分析
一.CopyOnWriteArrayList 概述 1.1 概念概述 CopyOnWriteArrayList 是 juc 包下一个线程安全的并发容器,底层使用数组实现.CopyOnWrite 顾名思 ...
- Java8 CountDownLatch 源码分析
一.CountDownLatch 概述 1.1 什么是 CountDLatch 闭锁(CountDownLatch)是 java.util.concurrent 包下的一种同步工具类.闭锁可以用来确保 ...
- 【Java源码分析】Java8的ArrayList源码分析
Java8的ArrayList源码分析 源码分析 ArrayList类的定义 字段属性 构造函数 trimToSize()函数 Capacity容量相关的函数,比如扩容 List大小和是否为空 con ...
- 【Java源码分析】Java8的HashMap源码分析
Java8中的HashMap源码分析 源码分析 HashMap的定义 字段属性 构造函数 hash函数 comparableClassFor,compareComparables函数 tableSiz ...
最新文章
- 用Python编写代码分析《英雄联盟》游戏胜利的最重要因素
- HTML 各种鼠标手势
- 云原生生态周报 Vol. 19 | Helm 推荐用户转向 V3
- 如何在企业内部实现云信私有化
- oracle查看和替换含不可见字符(空白)
- idea新建module 后 mapper老是说mapper和xml没有绑定
- js ajax java传参_js使用ajax传值给后台,后台返回字符串处理方法
- 医保费用监控指标体系建立(八)医生指标分析
- 16进制颜色与UIColor互转
- 今天来看一下云测平台的测试实验
- 《中国传统文化》慕课期末试题及答案
- 解决非root用户没有权限运行docker命令的问题
- 阿里云达摩院视觉AI介绍
- springboot中使用thymeleaf片段引入出现500错误(易错)
- 计算机机房防火门,弱电机房门为防火门吗
- 怎么对接个人收款支付接口(扫码支付)
- 说大数据杀熟,这锅可不背!
- 攻防世界Misc高手进阶区第一页WriteUp
- 【C语言】猜随机数小游戏(知识点:如何产生一个随机值)
- Flask框架基础Jinja2模板
热门文章
- spring-xml实现aop-通知的种类
- idea 编译显示source1.3不支持泛型(请使用source5或更高版本)
- SpirngBoot整合MyBatis出现“SAXParseException”和“文件提前结束”异常解决办法
- Spring Security——自定义认证错误提示信息及自适应返回格式解决方案
- Longest Common Substring
- Array K-Coloring
- Make It Connected
- java源码导入eclipse_spring5源码如何导入eclipse
- 使用分页插件PageHelper
- 360安全卫士 导致MySQL 5.0.24 自动关闭