DelayQueue浮光掠影
现在是2020年8月,正式从大学毕业也已经第三个年头了,现在想想还是挺快。
通过最近这段时间对公司项目的改造,在使用spring cloud、spring cloud kuberntes、kubernetes、istio、skywalking、helm这些技术与框架后,个人感觉对于上面这些技术虽然精通远远谈不上,但对于上面这些框架的基本使用和操作还是达到了的,也很感谢公司和领导能让我有机会去进行尝试。
由于最近的项目有时忙有时不忙,所以在空余时间又看了关于分布式事务的常用解决方案和同事分享给我的《MySQL 是怎样运行的:从根儿上理解 MySQL》电子书后对数据库的知识也有了更进一步的了解,同时也有了一些来自内心的惊讶。
啥是惊讶?惊讶就是比如:原来这就是所谓的CAP和BASE理论呀! 又比如:原来索引是一棵那样的B+树呀! 还可比如:原来所谓的MVCC就是那么一回事呀! 当然惊讶归惊讶,惊喜归惊喜。
啥是惊喜?惊喜就是让我今后再也不像以前那么惧怕sql优化、索引的设置、事务的隔离级别和分布式事务的常见解决方案了。当然也还有其它感慨,那就是想成为一名优秀的软件工程师需要学习和掌握的技术还TM多呀!
8月成都的夜晚很是闷热,每次到家做饭洗碗后感觉像是又免费蒸了一把桑拿。虽然很闷热,不过好在俺还有电风扇。神奇的电风扇,赐予我继续学习的力量吧!
最近这段时间在熟悉了一些应用框架(外功)后,接下来的一段时间我决定还是多从应用的原理层(内功)入手,再多了解与回顾一些基础知识(以防走火入魔)。
前言
主角登场:DelayQueue
说起毕业又让我想起了我大四时(又岔开话题了),给前公司做了一个移动充电的小项目,从项目计划需求文档+axure原型+sketch效果图设计+小程序前端界面+服务端api+web管理后台+商户端h5+整体项目发布到上线(现在那小软件还仍然在运行着的),一个项目一个人,上线后我还差点以为自己成了一名全栈了,可怕。
当时项目里为了确保服务端业务的正确性,需要向设备下发倒计时指令后让业务端也要跟着进行倒计时,当时为了方便采取了java自带的DelayQueue来实现倒计时的功能。
当时用了DelayQueue后感觉那个类还是挺神奇的,自动根据数据的优先级执行相应的任务。神奇归神奇,不过今天我决定来揭开DelayQueue的神秘面纱,看看java框架究竟是如何实现它的。
DelayQueue源码分析
测试代码
先写一个空方法,直接查看其源码
import java.util.Arrays;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class DelayQueueStu {public static void main(String[] args) throws Exception {Worker[] workers = new Worker[]{};DelayQueue<Worker> delayQueue = new DelayQueue<Worker>(Arrays.asList(workers));delayQueue.add(new Worker());Worker poll = delayQueue.poll();Worker worker = delayQueue.take();}public static class Worker implements Delayed {@Overridepublic long getDelay(TimeUnit unit) {return 0;}@Overridepublic int compareTo(Delayed o) {return 0;}}
}
DelayQueue的add方法
/*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true} (as specified by {@link Collection#add})* @throws NullPointerException if the specified element is null*/public boolean add(E e) {return offer(e);}/*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true}* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}/*** Inserts the specified element into this priority queue.** @return {@code true} (as specified by {@link Queue#offer})* @throws ClassCastException if the specified element cannot be* compared with elements currently in this priority queue* according to the priority queue's ordering* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;if (i >= queue.length)//将数组扩容grow(i + 1);//先将原来的size加一个size = i + 1;if (i == 0)queue[0] = e;else//上移i这个数组位置的变量,siftUp(i, e);return true;}/*** Inserts item x at position k, maintaining heap invariant by* promoting x up the tree until it is greater than or equal to* its parent, or is the root.** To simplify and speed up coercions and comparisons. the* Comparable and Comparator versions are separated into different* methods that are otherwise identical. (Similarly for siftDown.)** @param k the position to fill* @param x the item to insert*/private void siftUp(int k, E x) {if (comparator != null)siftUpUsingComparator(k, x);elsesiftUpComparable(k, x);}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {//因为x是实现了delay这个接口的,所以可以直接转为ComparableComparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {//当k大于0,说明还没有到根节点,就一直遍历着吧int parent = (k - 1) >>> 1;//(k-1)>>>1代表无符号右移1位,相当于除以2;也就是(k-1)/2,这个代表当前当点的父节点Object e = queue[parent];if (key.compareTo((E) e) >= 0)//如果当前节点的值大于等于父节点,则break;break后会执行queue[k] = key,也就是最直接进行赋值了。由此可见优先队列构建的是小顶堆break;//key的值比父节点的值小,让queue[k]=e就是让父节点的值到此节点来,让k=parent就是下次从当前节点的父节点来始循环queue[k] = e;k = parent;}//都执行到这里了,就说明节点找到自己的位置了queue[k] = key;}
对于add方法,主要关注siftUpComparable方法;其主要流程见代码里的注释里,其主要就是堆的siftUp操作
DelayQueue的构造方法
/*** Creates a {@code DelayQueue} initially containing the elements of the* given collection of {@link Delayed} instances.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any* of its elements are null*/public DelayQueue(Collection<? extends E> c) {//如果是一开始构建一个数组进来,也是一个一个添加到堆中的this.addAll(c);}/*** Adds all of the elements in the specified collection to this* queue. Attempts to addAll of a queue to itself result in* <tt>IllegalArgumentException</tt>. Further, the behavior of* this operation is undefined if the specified collection is* modified while the operation is in progress.** <p>This implementation iterates over the specified collection,* and adds each element returned by the iterator to this* queue, in turn. A runtime exception encountered while* trying to add an element (including, in particular, a* <tt>null</tt> element) may result in only some of the elements* having been successfully added when the associated exception is* thrown.** @param c collection containing elements to be added to this queue* @return <tt>true</tt> if this queue changed as a result of the call* @throws ClassCastException if the class of an element of the specified* collection prevents it from being added to this queue* @throws NullPointerException if the specified collection contains a* null element and this queue does not permit null elements,* or if the specified collection is null* @throws IllegalArgumentException if some property of an element of the* specified collection prevents it from being added to this* queue, or if the specified collection is this queue* @throws IllegalStateException if not all the elements can be added at* this time due to insertion restrictions* @see #add(Object)*/public boolean addAll(Collection<? extends E> c) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();boolean modified = false;for (E e : c)if (add(e))modified = true;return modified;}
原来以为DelayQueue的有参构造方法会进行一个堆化,结果发现并没有,直接遍历数组调用add方法即可。
java.util.PriorityQueue#heapify(堆化)方法
不过为了学习,还是顺便看一下优先队列中的堆化方法吧
/*** Establishes the heap invariant (described above) in the entire tree,* assuming nothing about the order of the elements prior to the call.*/@SuppressWarnings("unchecked")private void heapify() {//size/2-1为最后一个非叶子节点,下面的代码也就是从最后一个非叶子节点开始进行堆化for (int i = (size >>> 1) - 1; i >= 0; i--)siftDown(i, (E) queue[i]);}/*** Inserts item x at position k, maintaining heap invariant by* demoting x down the tree repeatedly until it is less than or* equal to its children or is a leaf.** @param k the position to fill* @param x the item to insert*/private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {//k为某一个非叶子节点Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1; // loop while a non-leaf//size>>>1就是size/2,第一个非叶子节点的下一个节点while (k < half) {//k<<1代表左移一位,也就是k*2,k*2+1就是这个节点的左节点,k*2+2就是这个节点的右节点int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];//c为左节点的值int right = child + 1;//right这个是右节点的索引if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)//right<size的意思是右节点还没有越界,让左节点和右节点比较一下,如果左节点大于了右节点,就让左右节点索引变量交换一下c = queue[child = right];if (key.compareTo((E) c) <= 0)//x比c小,就跳出循环;已经局部小顶堆了本轮堆化完毕break;//将k位置让x的子节点放进去,子节点上位了queue[k] = c;//k变为刚刚上位那个节点的索引k = child;}//key找到自己的位置了queue[k] = key;}
PriorityQueue的堆化的构造方法主要是从最后一个非叶子节点开始调用堆化方法,其主要代码为siftDown方法
DelayQueue的poll方法
/*** Retrieves and removes the head of this queue, or returns {@code null}* if this queue has no elements with an expired delay.** @return the head of this queue, or {@code null} if this* queue has no elements with an expired delay*/public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}@SuppressWarnings("unchecked")public E poll() {if (size == 0)return null;int s = --size;//size自减modCount++;E result = (E) queue[0];//将result取出,也就是将根节点取出E x = (E) queue[s];//x=queue[s],此刻s为完全二叉树中最后一个节点的值queue[s] = null;//queue[s]=null,差不多就是把最后一个节点给删了,不要了if (s != 0)//s!=0代表二叉树现在还不是空树;那么接下来对x进行下移吧siftDown(0, x);return result;}/*** Inserts item x at position k, maintaining heap invariant by* demoting x down the tree repeatedly until it is less than or* equal to its children or is a leaf.** @param k the position to fill* @param x the item to insert*/private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {//k为0的话就是根节点Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1; // loop while a non-leaf//size>>>1就是size/2,不知道干啥的还while (k < half) {//当half大于0,就一直执行;k<<1代表左移一位,也就是k*2,k*2+1就是这个节点的左节点,k*2+2就是这个节点的右节点int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];int right = child + 1;//right这个是右节点的索引,c是左节点的值if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)//right<size的意思是右节点还没有越界,让左节点和右节点比较一下,如果左节点大于了右节点,就让c = queue[child = right];if (key.compareTo((E) c) <= 0)//x比c小,就跳出循环;已经局部小顶堆了本轮堆化完毕break;//将k位置让x的子节点放进去,子节点上位了queue[k] = c;//k变为刚刚上位那个节点的索引k = child;}//key找到自己的位置了queue[k] = key;}
主要关注siftDown方法
DelayQueue的take方法
/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁lock.lockInterruptibly();try {for (;;) {//从队列中获取了头节点E first = q.peek();if (first == null)//如果头节点没有数据,那么队列也是空的,就让当前线程挂起available.await();else {//判断头节点的时间是否满足出队条件了long delay = first.getDelay(NANOSECONDS);if (delay <= 0)//满足出队条件了,出队并将头节点移出队列return q.poll();//让first设为空,因为还没有到达出队时间first = null; // don't retain ref while waitingif (leader != null)//leader线程不为空,就挂起来available.await();else {//leader线程是空的,就让本线程成为leaderThread thisThread = Thread.currentThread();leader = thisThread;try {//让本线程等到头节点满足条件后再激活available.awaitNanos(delay);} finally {//激活后如果leader线程是当前线程,就让leader变为空if (leader == thisThread)leader = null;}}}}} finally {//如果leader线程是空的 且 头节点有值if (leader == null && q.peek() != null)//激活等待的线程available.signal();//解锁lock.unlock();}}
其主要是condition的运用实现线程挂起和激活
DelayQueue的remove方法
/*** Removes a single instance of the specified element from this* queue, if it is present, whether or not it has expired.*/public boolean remove(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {return q.remove(o);} finally {lock.unlock();}}/*** Removes a single instance of the specified element from this queue,* if it is present. More formally, removes an element {@code e} such* that {@code o.equals(e)}, if this queue contains one or more such* elements. Returns {@code true} if and only if this queue contained* the specified element (or equivalently, if this queue changed as a* result of the call).** @param o element to be removed from this queue, if present* @return {@code true} if this queue changed as a result of the call*/public boolean remove(Object o) {int i = indexOf(o);if (i == -1)return false;else {//找到将要移除元素的在数组removeAt(i);return true;}}private int indexOf(Object o) {if (o != null) {for (int i = 0; i < size; i++)if (o.equals(queue[i]))return i;}return -1;}/*** Removes the ith element from queue.** Normally this method leaves the elements at up to i-1,* inclusive, untouched. Under these circumstances, it returns* null. Occasionally, in order to maintain the heap invariant,* it must swap a later element of the list with one earlier than* i. Under these circumstances, this method returns the element* that was previously at the end of the list and is now at some* position before i. This fact is used by iterator.remove so as to* avoid missing traversing elements.*/@SuppressWarnings("unchecked")private E removeAt(int i) {// assert i >= 0 && i < size;modCount++;//modCount++,其他线程用以确保没有并其他线程修改int s = --size;//先将数组减-1if (s == i) // removed last element//i是从0开始的,size是从1开始的;s==i那么i就是最后一个叶子节点queue[i] = null;else {//将最后一个节点的元素取出赋值为movedE moved = (E) queue[s];//先将原树的最后一个叶子节点设为空queue[s] = null;//将最后一个节点补到i位置位置去,从那里开始用siftDown构建堆siftDown(i, moved);//siftDown了一圈后发现最后一节点到了i位置if (queue[i] == moved) {//用siftUp继续构建堆siftUp(i, moved);//如果原来的位置上的节点if (queue[i] != moved)return moved;}}return null;}/*** Inserts item x at position k, maintaining heap invariant by* demoting x down the tree repeatedly until it is less than or* equal to its children or is a leaf.** @param k the position to fill* @param x the item to insert*/private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {//k为将要移除节点的索引值,x为入堆的数据Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1; // loop while a non-leaf//size>>>1就是size/2,不知道干啥的还while (k < half) {//当half大于0,就一直执行;k<<1代表左移一位,也就是k*2,k*2+1就是这个节点的左节点,k*2+2就是这个节点的右节点int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];int right = child + 1;//right这个是右节点的索引,c是左节点的值if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)//right<size的意思是右节点还没有越界,让左节点和右节点比较一下,如果左节点大于了右节点,就让c = queue[child = right];if (key.compareTo((E) c) <= 0)//x比c小,就跳出循环;已经局部小顶堆了本轮堆化完毕break;//将k位置让x的子节点放进去,子节点上位了queue[k] = c;//k变为刚刚上位那个节点的索引k = child;}//key找到自己的位置了queue[k] = key;}
总结
在看了DelayQueue的源码后对它的认识是,它是基于PriorityQueue和ReentrantLock实现的一种阻塞队列,因为其基与PriorityQueue实现的,其核心点在于理解堆排序。
个人感觉想更好的看懂堆排序算法,得要做到如下点:
心中有数(数组) 和 心中有树(完全二叉树)
浮光掠影,又如微风拂面
DelayQueue浮光掠影相关推荐
- 【Java并发编程】20、DelayQueue实现订单的定时取消
当订单定时取消需要修改数据库订单状态,但是怎么确定订单什么时候应该改变状态,解决方案有下面两种: 第一种,写个定时器去每分钟扫描数据库,这样更新及时,但是如果数据库数据量大的话,会对数据库造成很大的 ...
- java加载c库阻塞_【死磕Java並發】-----J.U.C之阻塞隊列:DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列.里面的元素全部都是"可延期"的元素,列頭的元素是最先"到期"的元素,如果隊列里面沒有元素到期,是不能從 ...
- 每日一博 - DelayQueue阻塞队列源码解读
文章目录 Pre DelayQueue特征 Leader/Followers模式 DelayQueue源码分析 类继承关系 核心方法 成员变量 构造函数 入队方法 offer(E e) 出队方法 po ...
- Java DelayQueue延迟队列的使用和源码分析
文章目录 概述 示例 原理分析 概述 DelayQueue 是JAVA提供的延时队列,队列内部的对象必须实现 Delayed 接口,该接口只有一个 getDelay 方法,返回延迟执行的时长. pub ...
- 使用DelayQueue 和 FutureTask 实现java中的缓存
使用DelayQueue.ConcurrentHashMap.FutureTask实现的缓存工具类. DelayQueue 简介 DelayQueue是一个支持延时获取元素的无界阻塞队列.DelayQ ...
- DelayQueue源码
介绍 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素.只有延时期满后才能从队列中获取元素.(DelayQueue可以运用在 ...
- DelayQueue详解
一.DelayQueue是什么 DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对 ...
- java中DelayQueue的使用
文章目录 简介 DelayQueue DelayQueue的应用 总结 java中DelayQueue的使用 简介 今天给大家介绍一下DelayQueue,DelayQueue是BlockingQue ...
- 阻塞队列之七:DelayQueue延时队列
一.DelayQueue简介 是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的(PriorityQueue实际 ...
- delayqueue_在DelayQueue中更改延迟,从而更改顺序
delayqueue 因此,我正在考虑构建一个简单的对象缓存,该缓存在给定时间后会使对象过期. 显而易见的机制是使用Java并发包中的DelayedQueue类. 但是我想知道是否有可能在将对象添加到 ...
最新文章
- python动态规划详解_经典动态规划例题整理(Python版)
- boost::function/bind
- XiaoKL学Python(C)__future__
- ThreadPoolExecutor源码学习(2)-- 在thrift中的应用
- JavaApplet 绘制火柴棒和轮播图片
- Centos下安装Gcc和Qt
- CocoPods原理
- groovy 兼容 java_java – eclipse插件和maven依赖项中存在的’groovy-all’jar之间的兼容性问题...
- iptables基础概念
- Linux搭建Git服务器教程
- 二重指针、二维数组及二者如何进行赋值
- 『词向量』用Word2Vec训练中文词向量(一)—— 采用搜狗新闻数据集
- SYSAUX表空间占用过大情况下的处理(AWR信息过多)
- 虾皮有哪些站点?各站点有什么特色
- C# 使用DataMatrix.net.dll进行二维码打印
- 百行代码构建神经网络黑白图片自动上色系统
- 2021澳洲大学计算机专业排名,澳洲纽卡斯尔大学UoN计算机科学Computer Science专业排名第201-250位(2021年THE世界大学商科排名)...
- 学术不端网查重靠谱吗_学术不端网知网查重万方哪一个权威
- MP6050使用DMP库获取计步数
- h3c如何配置acl命令