并发编程5:Java 阻塞队列源码分析(下)
上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue
, LinkedBlockingQueue
和 PriorityBlockingQueue
,这篇文章来了解剩下的四种阻塞队列。
读完本文你将了解:
- 七种阻塞队列的后四种
- DelayQueue
- DelayQueue 的关键属性
- 实现 Delayed 接口
- 延时阻塞队列如何实现
- DelayQueue 使用场景
- SynchronousQueue
- LinkedTransferQueue
- TransferQueue
- tryTransfer 和 transfer
- LinkedBlockingDeque
- 关键属性
- DelayQueue
- 种阻塞队列的特点
- 总结
- Thanks
七种阻塞队列的后四种
DelayQueue
DelayQueue
是一个支持延时获取元素的、无界阻塞队列。
队列使用 PriorityQueue
实现,队列中的元素必须实现 Delayed
接口:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {...}
Delayed
接口:
public interface Delayed extends Comparable<Delayed> {//返回当前对象的剩余执行时间long getDelay(TimeUnit unit);
}
可以看到,实现 Delayed
的类也需要实现 Comparable
接口,即实现 compareTo()
方法,保证集合中元素的顺序和 getDelay()
一致。
因此创建元素时可以指定多久才能从队列中获取当前元素。
DelayQueue 的关键属性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;/*** Condition signalled when a newer element becomes available* at the head of the queue or a new thread may need to* become leader.*/
private final Condition available = lock.newCondition();
可以看到,DelayQueue
的属性只有四个,却都不简单:
- ReentrantLock lock
- 读写锁
- PriorityQueue q
- 无界的、优先级队列
- Thread leader
- Leader-Follower 模型中的 leader
- Condition available
- 队首有新元素可用或者有新线程成为 leader 时触发的 condition
简单介绍下关键属性。
1 PriorityQueue
是一个用数组实现的,基于二叉堆(元素[n] 的子孩子是 元素[2*n+1] 和元素[2*(n+1)] )数据结构的集合。
/*** Priority queue represented as a balanced binary heap: the two* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The* priority queue is ordered by comparator, or by the elements'* natural ordering, if comparator is null: For each node n in the* heap and each descendant d of n, n <= d. The element with the* lowest value is in queue[0], assuming the queue is nonempty.*/
transient Object[] queue;
在添加元素时如果超出限制也会扩容:
public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;if (i >= queue.length)grow(i + 1);size = i + 1;if (i == 0)queue[0] = e;elsesiftUp(i, e);return true;
}
所以是无界的。
2.Leader-Follower 模型
这种模型中所有线程会有三种身份中的一种:leader、follower,以及一个干活中的状态:proccesser。
它的基本原则就是,永远最多只有一个 leader。而所有 follower 都在等待成为 leader。
线程池启动时会自动产生一个 Leader 负责等待事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将其提拔为新的 Leader,然后自己就去干活了,去处理这个事件。处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。
这种方法可以增强 CPU 高速缓存相似性,及消除动态内存分配和线程间的数据交换。这种模式是为了最小化任务等待时间,当一个线程成为 leader 后,它只需要等待下一个可执行任务的出现,而其他线程要无限制地等待。
这部分摘自:http://blog.csdn.net/goldlevi/article/details/7705180
实现 Delayed
接口
前面提到了,DelayQueue
的元素必须实现 Delayed
接口,我们以 JDK 中的 ScheduledFutureTask
为例,看下如何实现:
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {//1.初始化ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}//...//2.
public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);
}//3.
public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//...
}
可以看到,实现 Delayed
接口大概有三步:
- 构造函数中初始化基本数据,比如执行时间等数据
- 实现
getDelay()
方法,返回当前元素还需要延时多久执行 - 实现
compareTo()
方法,指定不同元素如何比较谁先执行
延时阻塞队列如何实现
DelayQueue
中只有延迟时间到了才能从队列中取出元素。
那这个是怎么实现的呢?我们看一下获取元素的实现,以 take()
为例:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek(); //先获取队首元素,不删除if (first == null) //如果为空就阻塞等待available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L) //比较元素延时时间是否到达return q.poll(); //如果是就移除并返回first = null; // don't retain ref while waitingif (leader != null) //如果有 leader 线程,依然阻塞等待available.await();else { //如果没有 leader 线程,指定当前线程,然后等待任务的待执行时间Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally { //最后等待时间到了后,就通知阻塞的线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}//PriorityQueue.peek()
public E peek() {return (size == 0) ? null : (E) queue[0];
}
可以看到,在取元素时,会根据元素的延时执行时间是否为 0 进行判断,如果延时执行时间已经没有了,就直接返回;否则就要等待执行时间到达后再返回。其中的 Leader-Follower 模型的调度过程这里就不分析了,越分析内容越多 - -。
DelayQueue
使用场景:
- 缓存系统的设计
- 用
DelayQueue
保存元素的有效期,用一个线程来循环查询DelayQueue
,能查到元素,就说明缓存的有效期到了
- 用
- 定时任务调度
- 用
DelayQueue
保存定时执行的任务和执行时间,同样有一个循环查询线程,获取到任务就执行 TimerQueue
就是使用DelayQueue
实现的
- 用
SynchronousQueue
SynchronousQueue
支持公平访问队列,根据构造函数的参数不同,有两种实现方式:TransferQueue
和 TransferStack
,默认情况下是 false:
private transient volatile Transferer<E> transferer;public SynchronousQueue() {this(false);
}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue
是一个不存储元素的阻塞队列。
这里的“不存储元素”指的是,SynchronousQueue
容量为 0,每添加一个元素必须等待被取走后才能继续添加元素。
我们看下它的 put()
的实现:
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) { Thread.interrupted();throw new InterruptedException();}
}
可以看到,它的添加是调用的 transferer.transfer()
,如果返回 null 就调用 Thread.interrupted()
将中断标志位复位(设为 false),然后抛出异常。
看下 TransferStack.transfer()
:
/*** Puts or takes an item.*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {SNode s = null; int mode = (e == null) ? REQUEST : DATA; //判断是添加还是获取for (;;) {SNode h = head; //获取栈顶节点 if (h == null || h.mode == mode) { // empty or same-modeif (timed && nanos <= 0) { // can't waitif (h != null && h.isCancelled()) //如果头节点无法获取,就去获取下一个casHead(h, h.next); // pop cancelled nodeelsereturn null;} else if (casHead(h, s = snode(s, e, h, mode))) {//设置头节点SNode m = awaitFulfill(s, timed, nanos);if (m == s) { // wait was cancelledclean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next); // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}} else if (!isFulfilling(h.mode)) { // try to fulfillif (h.isCancelled()) // already cancelledcasHead(h, h.next); // pop and retryelse if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappearSNode m = s.next; // m is s's matchif (m == null) { // all waiters are gonecasHead(s, null); // pop fulfill nodes = null; // use new node next timebreak; // restart main loop}SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn); // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);} else // lost matchs.casNext(m, mn); // help unlink}}} else { // help a fulfillerSNode m = h.next; // m is h's matchif (m == null) // waiter is gonecasHead(h, null); // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h)) // help matchcasHead(h, mn); // pop both h and melse // lost matchh.casNext(m, mn); // help unlink}}}
}
逻辑比较复杂,主要就是三步:
- 栈是空的或者栈顶元素的模式和当前要进行的操作一致
- 将节点推到堆栈上并等待匹配
- 等待参数中的时间后返回
- 如果取消就返回 null
- 如果栈不为空且栈顶元素模式与当前要进行的操作不一致,如果这个元素的模式是相反的模式(取对应放)
- 尝试将栈中一个模式匹配要求的节点推到堆栈上,与相应的等待节点匹配并返回
- 如果栈顶已经拥有另一个模式 匹配的节点
- 通过执行 POP 操作来找到匹配的元素,然后继续
看着有点晕,简单概括就是一个添加操作后必须等待一个获取操作才可以继续添加。
SynchronousQueue
的吞吐量高于 LinkedBlockingQueue
和 ArrayBlockingQueue
,有位前辈做了测试,可以点击 这篇文章 查看。这里引用一下结论:
LinkedBlockingQueue 性能表现远超 ArrayBlcokingQueue,不管线程多少,不管 Queue 长短,LinkedBlockingQueue 都胜过 ArrayBlockingQueue。
SynchronousQueue 表现很稳定,而且在 20 个线程之内不管 Queue 长短,SynchronousQueue 性能表现是最好的,(其实SynchronousQueue 跟 Queue 长短没有关系),如果 Queue 的 capability 只能是 1,那么毫无疑问选择 SynchronousQueue,这也是设计 SynchronousQueue 的目的吧。
但大家也可以看到当超过 1000 个线程时,SynchronousQueue 性能就直线下降了,只有最高峰的一半左右,而且当 Queue 大于 30 时,LinkedBlockingQueue 性能就超过 SynchronousQueue。
相较于其他队列有缓存的作用,SynchronousQueue
适用于单线程同步传递性场景,比如:消费者没拿走当前的产品,生产者是不能再给产品的,这样可以控制生产者生产的速率和消费者一致。
LinkedTransferQueue
LinkedTransferQueue
实现了 TransferQueue
接口, 是一个由链表组成的、无界阻塞队列。
public class LinkedTransferQueue<E> extends AbstractQueue<E>implements TransferQueue<E>, java.io.Serializable {...}
TransferQueue
TransferQueue
也是一种阻塞队列,它用于生产者需要等待消费者消费事件的场景,与前面一节的 SynchronousQueue
有相似之处。它定义的方法如下:
public interface TransferQueue<E> extends BlockingQueue<E> {//尽可能快地转移元素给一个等待的消费者//如果在这之前有其他线程调用了 taked() 或者 poll(long,TimeUnit) 方法,就返回 true//否则返回 falseboolean tryTransfer(E e);//转移元素给一个消费者,在有的情况下会等待直到被取走//void transfer(E e) throws InterruptedException;//在 timeout 时间内将元素转移给一个消费者,如果这段时间内传递出去了就返回 true//否则返回 falseboolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;//如果至少有一个等待的消费者,就返回 trueboolean hasWaitingConsumer();//返回等待获取元素的消费者个数//这个值用于监控int getWaitingConsumerCount();
}
tryTransfer() 和 transfer()
相对于其他阻塞队列,LinkedTransferQueue
多了两个关键地方法:tryTransfer()
和 transfer()
。
分别来看看它是如何实现的。
1.transfer()
transfer()
方法的作用是:如果有等待接收元素的消费者线程,直接把生产者传入的元素 transfer 给消费者;如果没有消费者线程,transfer()
会将元素存放到队列尾部,并等待元素被消费者取走才返回:
public void transfer(E e) throws InterruptedException {if (xfer(e, true, SYNC, 0) != null) {Thread.interrupted(); // failure possible only due to interruptthrow new InterruptedException();}
}
private E xfer(E e, boolean haveData, int how, long nanos) {if (haveData && (e == null))throw new NullPointerException();Node s = null; // the node to append, if neededretry:for (;;) { // restart on append racefor (Node h = head, p = h; p != null;) { // find & match first nodeboolean isData = p.isData;Object item = p.item;if (item != p && (item != null) == isData) { // unmatchedif (isData == haveData) // can't matchbreak;if (p.casItem(item, e)) { // matchfor (Node q = p; q != h;) {Node n = q.next; // update by 2 unless singletonif (head == h && casHead(h, n == null ? q : n)) {h.forgetNext();break;} // advance and retryif ((h = head) == null ||(q = h.next) == null || !q.isMatched())break; // unless slack < 2}LockSupport.unpark(p.waiter);@SuppressWarnings("unchecked") E itemE = (E) item;return itemE;}}Node n = p.next;p = (p != n) ? n : (h = head); // Use head if p offlist}if (how != NOW) { // No matches availableif (s == null)s = new Node(e, haveData);Node pred = tryAppend(s, haveData); //尝试添加到队尾if (pred == null)continue retry; // lost race vs opposite modeif (how != ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);}return e; // not waiting}
}
awaitMatch()
方法的作用是:CPU 自旋等待消费者取走元素,为了避免长时间消耗 CPU,在自旋一定次数后会调用 Thread.yield()
暂停当前正在执行的线程,改为执行其他线程。
2.tryTransfer()
tryTransfer()
的作用是:试探生产者传入的元素是否能 直接传递给消费者。
- 如果有等待接收的消费者,返回 true
- 没有则返回 false
public boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)return true;if (!Thread.interrupted())return false;throw new InterruptedException();
}
可以看到,和 transfer()
必须等到消费者取出元素才返回不同的是,tryTransfer()
无论是否有消费者接收都会立即返回。
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表组成的、双向阻塞队列。
关键属性
static final class Node<E> {E item;Node<E> prev;Node<E> next;Node(E x) {item = x;}
}transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
可以看到,LinkedBlockingDeque
中持有队列首部和尾部节点,每个节点也是双向的。
双向的作用是:可以从队列两端插入和移除元素。多了一个操作队列的方向,在多线程同时入队时,可以减少一半的竞争。
除了 remove(Object)
等移除操作,LinkedBlockingDeque
的大多数操作的时间复杂度都是 O(n)。
LinkedBlockingDeque
多了获取和查询的 XXXFirst
和 XXXLast
的方法。
7 种阻塞队列的特点
这篇文章介绍的 4 种加上上一篇 细说并发4:Java 阻塞队列源码分析(上) 中 3 种,总共 7 种阻塞队列,这么多队列看的眼都花了。
这里简单总结下 Java 中 7 种阻塞队列的特点:
- ArrayBlockingQueue
- 环形数组实现的、有界的队列,一旦创建后,容量不可变
- 基于数组,在添加删除上性能还是不如链表
- LinkedBlockingQueue:
- 基于链表、有界阻塞队列
- 添加和获取是两个不同的锁,所以并发添加/获取效率更高些
Executors.newFixedThreadPool()
使用了这个队列
- PriorityBlockingQueue
- 基于数组的、支持优先级的、无界阻塞队列
- 使用自然排序或者定制排序指定排序规则
- 添加元素时,当数组中元素大于等于容量时,会扩容(当前队列中元素个数小于 64 个,数组容量就乘 3;否则就乘 2 加 2),拷贝数组
- DelayQueue
- 支持延时获取元素的、无界阻塞队列
- 添加元素时如果超出限制也会扩容
- Leader-Follower 模型
- SynchronousQueue
- 容量为 0
- 一个添加操作后必须等待一个获取操作才可以继续添加
- 吞吐量高于
LinkedBlockingQueue
和ArrayBlockingQueue
- LinkedTransferQueue
- 由链表组成的、无界阻塞队列
- 实现了
TransferQueue
接口 - CPU 自旋等待消费者取走元素,自旋一定次数后结束
- LinkedBlockingDeque
- 由双向链表组成的、双向阻塞队列
- 可以从队列两端插入和移除元素
- 多了一个操作队列的方向,在多线程同时入队时,可以减少一半的竞争
总结
在实际开发中可能接触不到阻塞队列,线程池或者其他池都将这些细节封装好了,但是在看一些开源框架的时候经常看到有使用它们,因此如果想要自己写牛逼的框架,这些底层的东西还是需要了解的。
我们结合源码和《Java 并发编程的艺术》相关章节分两篇文章介绍了 Java 中的阻塞队列,了解了 7 种阻塞队列的大致源码实现,后面遇到需要使用阻塞队列时心里应该有些底了。
学基础就是这样,不能指望立即有用,古话说得好:无用之用是为大用,不一定哪天就派上用场了!
Thanks
《Java 并发编程的艺术》
http://blog.csdn.net/goldlevi/article/details/7705180
http://stevex.blog.51cto.com/4300375/1287085/
并发编程5:Java 阻塞队列源码分析(下)相关推荐
- 并发-阻塞队列源码分析
阻塞队列 参考: http://www.cnblogs.com/dolphin0520/p/3932906.html http://endual.iteye.com/blog/1412212 http ...
- 【Java并发编程】16、ReentrantReadWriteLock源码分析
一.前言 在分析了锁框架的其他类之后,下面进入锁框架中最后一个类ReentrantReadWriteLock的分析,它表示可重入读写锁,ReentrantReadWriteLock中包含了两种锁,读锁 ...
- Java并发编程(十六):CyclicBarrier源码分析
前言 CyclicBarrier可以建立一个屏障,这个屏障可以阻塞一个线程直到指定的所有线程都达到屏障.就像团队聚餐,等所有人都到齐了再一起动筷子.根据Cyclic就可以发现CyclicBarri ...
- c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析
前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...
- Java并发编程笔记之Semaphore信号量源码分析
JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那 ...
- java并发编程基础-ReentrantLock及LinkedBlockingQueue源码分析
ReentrantLock是一个较为常用的锁对象.在上次分析的uil开源项目中也多次被用到,下面谈谈其概念和基本使用. 概念 一个可重入的互斥锁定 Lock,它具有与使用 synchronized 相 ...
- 高并发编程-Thread#interrupt用法及源码分析
文章目录 官网 方法&源码 void interrupt() ` boolean isInterrupted()` vs `static boolean interrupted()` 方法&a ...
- java.util.ServiceLoader源码分析
java.util.ServiceLoader源码分析 回顾: ServiceLoader类的使用(具体参考博客http://blog.csdn.net/liangyihuai/article/det ...
- Java集合类框架源码分析 之 LinkedList源码解析 【4】
上一篇介绍了ArrayList的源码分析[点击看文章],既然ArrayList都已经做了介绍,那么作为他同胞兄弟的LinkedList,当然必须也配拥有姓名! Talk is cheap,show m ...
最新文章
- linux编程之GDB调试
- android清空frame,android – GLSurfaceView onDrawFrame清除行为
- linux 写脚本登录ftp,Linux使用Shell脚本实现ftp的自动上传下载-Go语言中文社区
- scss怎么引入到html,Sass 导入指令
- 定向输出命令_网络工程师之linux重定向命令和管道命令详解
- mysql添加索引后查询先用索引吗_mysql 添加索引后 在查询的时候是mysql就自动从索引里面查询了。还是查询的时候有单 独的参数查询索引?...
- numpy.random随机数模块常用函数总结
- Linux中路径的组成部分
- matlab 读取mdf文件路径,通过 MDF 数据存储使用 MDF 文件
- Android apk签名
- 我的世界只支持java8_我的世界minecraft 1.8以上版本forge安装支持哪一种java?7还是8?...
- sgsn与ggsn的区别与联系
- 安卓手机怎么录屏?精心挑选这几款录屏软件,个个好用
- 安徽大学在校生如何校外访问图书馆资源
- 历代iOS设备屏幕分辨率
- 程序人生(一) 初生牛犊
- 静观花开花落,笑看云卷云舒
- 二层广播风暴(产生原因+判断+解决)
- 后疫情时代,藏在同程艺龙财报里的“增长密码”
- 【新书推荐】【2018.05】电磁兼容性的计算方法