【死磕 Java 集合】— LinkedTransferQueue源码分析

问题

(1)LinkedTransferQueue是什么东东?

(2)LinkedTransferQueue是怎么实现阻塞队列的?

(3)LinkedTransferQueue是怎么控制并发安全的?

(4)LinkedTransferQueue与SynchronousQueue有什么异同?

简介

LinkedTransferQueue是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体,它综合了这三者的方法,并且提供了更加高效的实现方式。

继承体系

LinkedTransferQueue实现了TransferQueue接口,而TransferQueue接口是继承自BlockingQueue的,所以LinkedTransferQueue也是一个阻塞队列。

TransferQueue接口中定义了以下几个方法:

// 尝试移交元素
boolean tryTransfer(E e);
// 移交元素
void transfer(E e) throws InterruptedException;
// 尝试移交元素(有超时时间)
boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;
// 判断是否有消费者
boolean hasWaitingConsumer();
// 查看消费者的数量
int getWaitingConsumerCount();

主要是定义了三个移交元素的方法,有阻塞的,有不阻塞的,有超时的。

存储结构

LinkedTransferQueue使用了一个叫做dual data structure的数据结构,或者叫做dual queue,译为双重数据结构或者双重队列。

双重队列是什么意思呢?

放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。

放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。

取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。

用图形来表示就是下面这样:

不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。

源码分析

主要属性

// 头节点
transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 放取元素的几种方式:
// 立即返回,用于非超时的poll()和tryTransfer()方法中
private static final int NOW   = 0; // for untimed poll, tryTransfer
// 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
private static final int ASYNC = 1; // for offer, put, add
// 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
private static final int SYNC  = 2; // for transfer, take
// 超时,用于有超时的poll()和tryTransfer()方法中
private static final int TIMED = 3; // for timed poll, tryTransfer

主要内部类

static final class Node {// 是否是数据节点(也就标识了是生产者还是消费者)final boolean isData;   // false if this is a request node// 元素的值volatile Object item;   // initially non-null if isData; CASed to match// 下一个节点volatile Node next;// 持有元素的线程volatile Thread waiter; // null until waiting
}

典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。

内部通过isData区分是生产者还是消费者。

主要构造方法

public LinkedTransferQueue() {
}public LinkedTransferQueue(Collection<? extends E> c) {this();addAll(c);
}

只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。

入队

四个方法都是一样的,使用异步的方式调用xfer()方法,传入的参数都一模一样。

public void put(E e) {// 异步模式,不会阻塞,不会超时// 因为是放元素,单链表存储,会一直往后加xfer(e, true, ASYNC, 0);
}public boolean offer(E e, long timeout, TimeUnit unit) {xfer(e, true, ASYNC, 0);return true;
}public boolean offer(E e) {xfer(e, true, ASYNC, 0);return true;
}public boolean add(E e) {xfer(e, true, ASYNC, 0);return true;
}

xfer(E e, boolean haveData, int how, long nanos)的参数分别是:

(1)e表示元素;

(2)haveData表示是否是数据节点,

(3)how表示放取元素的方式,上面提到的四种,NOW、ASYNC、SYNC、TIMED;

(4)nanos表示超时时间;

出队

出队的四个方法也是直接或间接的调用xfer()方法,放取元素的方式和超时规则略微不同,本质没有大的区别。

public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();
}
public E take() throws InterruptedException {// 同步模式,会阻塞直到取到元素E e = xfer(null, false, SYNC, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();
}public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 有超时时间E e = xfer(null, false, TIMED, unit.toNanos(timeout));if (e != null || !Thread.interrupted())return e;throw new InterruptedException();
}public E poll() {// 立即返回,没取到元素返回nullreturn xfer(null, false, NOW, 0);
}

取元素就各有各的玩法了,有同步的,有超时的,有立即返回的。

移交元素的方法

public boolean tryTransfer(E e) {// 立即返回return xfer(e, true, NOW, 0) == null;
}public void transfer(E e) throws InterruptedException {// 同步模式if (xfer(e, true, SYNC, 0) != null) {Thread.interrupted(); // failure possible only due to interruptthrow new InterruptedException();}
}public boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException {// 有超时时间if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)return true;if (!Thread.interrupted())return false;throw new InterruptedException();
}

请注意第二个参数,都是true,也就是这三个方法其实也是放元素的方法。

这里xfer()方法的几种模式到底有什么区别呢?请看下面的分析。

神奇的xfer()方法

private E xfer(E e, boolean haveData, int how, long nanos) {// 不允许放入空元素if (haveData && (e == null))throw new NullPointerException();Node s = null;                        // the node to append, if needed// 外层循环,自旋,失败就重试retry:for (;;) {                            // restart on append race// 下面这个for循环用于控制匹配的过程// 同一时刻队列中只会存储一种类型的节点// 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了// 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止for (Node h = head, p = h; p != null;) { // find & match first node// p节点的模式boolean isData = p.isData;// p节点的值Object item = p.item;// p没有被匹配到if (item != p && (item != null) == isData) { // unmatched// 如果两者模式一样,则不能匹配,跳出循环后尝试入队if (isData == haveData)   // can't matchbreak;// 如果两者模式不一样,则尝试匹配// 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)if (p.casItem(item, e)) { // match// 匹配成功// for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的// 看不懂可以直接跳过for (Node q = p; q != h;) {// 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点Node n = q.next;  // update by 2 unless singleton// 如果head还没变,就把它更新成新的节点// 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)// 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了// 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了// 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了if (head == h && casHead(h, n == null ? q : n)) {h.forgetNext();break;}                 // advance and retry// 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试if ((h = head)   == null ||(q = h.next) == null || !q.isMatched())break;        // unless slack < 2}// 唤醒p中等待的线程LockSupport.unpark(p.waiter);// 并返回匹配到的元素return LinkedTransferQueue.<E>cast(item);}}// p已经被匹配了或者尝试匹配的时候失败了// 也就是其它线程先一步匹配了p// 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己// 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试Node n = p.next;p = (p != n) ? n : (h = head); // Use head if p offlist}// 到这里肯定是队列中存储的节点类型和自己一样// 或者队列中没有元素了// 就入队(不管放元素还是取元素都得入队)// 入队又分成四种情况:// NOW,立即返回,没有匹配到立即返回,不做入队操作// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身// 如果不是立即返回if (how != NOW) {                 // No matches available// 新建s节点if (s == null)s = new Node(e, haveData);// 尝试入队Node pred = tryAppend(s, haveData);// 入队失败,重试if (pred == null)continue retry;           // lost race vs opposite mode// 如果不是异步(同步或者有超时)// 就等待被匹配if (how != ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);}return e; // not waiting}
}private Node tryAppend(Node s, boolean haveData) {// 从tail开始遍历,把s放到链表尾端for (Node t = tail, p = t;;) {        // move p to last node and appendNode n, u;                        // temps for reads of next & tail// 如果首尾都是null,说明链表中还没有元素if (p == null && (p = head) == null) {// 就让首节点指向s// 注意,这里插入第一个元素的时候tail指针并没有指向sif (casHead(null, s))return s;                 // initialize}else if (p.cannotPrecede(haveData))// 如果p无法处理,则返回null// 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队// 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,// 队列中所有的元素都要保证是同一种类型的节点// 返回null后外面的方法会重新尝试匹配重新入队等return null;                  // lost race vs opposite modeelse if ((n = p.next) != null)    // not last; keep traversing// 如果p的next不为空,说明不是最后一个节点// 则让p重新指向最后一个节点p = p != t && t != (u = tail) ? (t = u) : // stale tail(p != n) ? n : null;      // restart if off listelse if (!p.casNext(null, s))// 如果CAS更新s为p的next失败// 则说明有其它线程先一步更新到p的next了// 就让p指向p的next,重新尝试让s入队p = p.next;                   // re-read on CAS failureelse {// 到这里说明s成功入队了// 如果p不等于t,就更新tail指针// 还记得上面插入第一个元素时tail指针并没有指向新元素吗?// 这里就是用来更新tail指针的if (p != t) {                 // update if slack now >= 2while ((tail != t || !casTail(t, s)) &&(t = tail)   != null &&(s = t.next) != null && // advance and retry(s = s.next) != null && s != t);}// 返回p,即s的前一个元素return p;}}
}private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {// 如果是有超时的,计算其超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;// 当前线程Thread w = Thread.currentThread();// 自旋次数int spins = -1; // initialized after first item and cancel checks// 随机数,随机让一些自旋的线程让出CPUThreadLocalRandom randomYields = null; // bound if neededfor (;;) {Object item = s.item;// 如果s元素的值不等于e,说明它被匹配到了if (item != e) {                  // matched// assert item != s;// 把s的item更新为s本身// 并把s中的waiter置为空s.forgetContents();           // avoid garbage// 返回匹配到的元素return LinkedTransferQueue.<E>cast(item);}// 如果当前线程中断了,或者有超时的到期了// 就更新s的元素值指向s本身if ((w.isInterrupted() || (timed && nanos <= 0)) &&s.casItem(e, s)) {        // cancel// 尝试解除s与其前一个节点的关系// 也就是删除s节点unsplice(pred, s);// 返回元素的值本身,说明没匹配到return e;}// 如果自旋次数小于0,就计算自旋次数if (spins < 0) {                  // establish spins at/near front// spinsFor()计算自旋次数// 如果前面有节点未被匹配就返回0// 如果前面有节点且正在匹配中就返回一定的次数,等待if ((spins = spinsFor(pred, s.isData)) > 0)// 初始化随机数randomYields = ThreadLocalRandom.current();}else if (spins > 0) {             // spin// 还有自旋次数就减1--spins;// 并随机让出CPUif (randomYields.nextInt(CHAINED_SPINS) == 0)Thread.yield();           // occasionally yield}else if (s.waiter == null) {// 更新s的waiter为当前线程s.waiter = w;                 // request unpark then recheck}else if (timed) {// 如果有超时,计算超时时间,并阻塞一定时间nanos = deadline - System.nanoTime();if (nanos > 0L)LockSupport.parkNanos(this, nanos);}else {// 不是超时的,直接阻塞,等待被唤醒// 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了LockSupport.park(this);}}
}

这三个方法里的内容特别复杂,很大一部分代码都是在控制线程安全,各种CAS,我们这里简单描述一下大致的逻辑:

(1)来了一个元素,我们先查看队列头的节点,是否与这个元素的模式一样;

(2)如果模式不一样,就尝试让他们匹配,如果头节点被别的线程先匹配走了,就尝试与头节点的下一个节点匹配,如此一直往后,直到匹配到或到链表尾为止;

(3)如果模式一样,或者到链表尾了,就尝试入队;

(4)入队的时候有可能链表尾修改了,那就尾指针后移,再重新尝试入队,依此往复;

(5)入队成功了,就自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒;

(6)唤醒之后进入下一次循环就匹配到元素了,返回匹配到的元素;

(7)是否需要入队及阻塞有四种情况:

a)NOW,立即返回,没有匹配到立即返回,不做入队操作对应的方法有:poll()、tryTransfer(e)b)ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)对应的方法有:add(e)、offer(e)、put(e)、offer(e, timeout, unit)c)SYNC,同步,元素入队后当前线程阻塞,等待被匹配到对应的方法有:take()、transfer(e)d)TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身对应的方法有:poll(timeout, unit)、tryTransfer(e, timeout, unit)

总结

(1)LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体;

(2)LinkedTransferQueue的实现方式是使用一种叫做双重队列的数据结构;

(3)不管是取元素还是放元素都会入队;

(4)先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;

(5)如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;

(6)至于是否入队及阻塞有四种模式,NOW、ASYNC、SYNC、TIMED;

(7)LinkedTransferQueue全程都没有使用synchronized、重入锁等比较重的锁,基本是通过 自旋+CAS 实现;

(8)对于入队之后,先自旋一定次数后再调用LockSupport.park()或LockSupport.parkNanos阻塞;

彩蛋

LinkedTransferQueue与SynchronousQueue(公平模式)有什么异同呢?

(1)在java8中两者的实现方式基本一致,都是使用的双重队列;

(2)前者完全实现了后者,但比后者更灵活;

(3)后者不管放元素还是取元素,如果没有可匹配的元素,所在的线程都会阻塞;

(4)前者可以自己控制放元素是否需要阻塞线程,比如使用四个添加元素的方法就不会阻塞线程,只入队元素,使用transfer()会阻塞线程;

(5)取元素两者基本一样,都会阻塞等待有新的元素进入被匹配到;

【死磕 Java 集合】— LinkedTransferQueue源码分析相关推荐

  1. JAVA集合专题+源码分析

    文章目录 Java集合专题 集合和数组的区别 数组 集合 区别 集合体系结构介绍 单列集合 [Collection ] Collection接口 迭代器 迭代器原理 增强for循环 List接口 对集 ...

  2. 死磕 java集合之ArrayDeque源码分析

    问题 (1)什么是双端队列? (2)ArrayDeque是怎么实现双端队列的? (3)ArrayDeque是线程安全的吗? (4)ArrayDeque是有界的吗? 简介 双端队列是一种特殊的队列,它的 ...

  3. java arraydeque_死磕 java集合之ArrayDeque源码分析

    问题 (1)什么是双端队列? (2)ArrayDeque是怎么实现双端队列的? (3)ArrayDeque是线程安全的吗? (4)ArrayDeque是有界的吗? 简介 双端队列是一种特殊的队列,它的 ...

  4. 死磕Java集合之BitSet源码分析(JDK18)

    死磕Java集合之BitSet源码分析(JDK18) 文章目录 死磕Java集合之BitSet源码分析(JDK18) 简介 继承体系 存储结构 源码解析 属性 构造方法 set(int bitInde ...

  5. 死磕 java集合之终结篇

    概览 我们先来看一看java中所有集合的类关系图. 这里面的类太多了,请放大看,如果放大还看不清,请再放大看,如果还是看不清,请放弃. 我们下面主要分成五个部分来逐个击破. List List中的元素 ...

  6. 并发编程5:Java 阻塞队列源码分析(下)

    上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...

  7. java.util.ServiceLoader源码分析

    java.util.ServiceLoader源码分析 回顾: ServiceLoader类的使用(具体参考博客http://blog.csdn.net/liangyihuai/article/det ...

  8. Java集合类框架源码分析 之 LinkedList源码解析 【4】

    上一篇介绍了ArrayList的源码分析[点击看文章],既然ArrayList都已经做了介绍,那么作为他同胞兄弟的LinkedList,当然必须也配拥有姓名! Talk is cheap,show m ...

  9. 【java集合框架源码剖析系列】java源码剖析之ArrayList

    注:博主java集合框架源码剖析系列的源码全部基于JDK1.8.0版本. 本博客将从源码角度带领大家学习关于ArrayList的知识. 一ArrayList类的定义: public class Arr ...

最新文章

  1. 用TCP/IP进行网际互联一
  2. json qbytearray 串 转_如何通过QByteArray在JSON中存储QPixmap?
  3. Swift 4官方文档中文版 The Basic(上)
  4. Java迭代器ListIterator
  5. Docker Compose部署Nexus3时的docker-compose.yml代码
  6. 【bzoj3280】小R的烦恼 费用流
  7. TensorFlow学习笔记之四(MNIST数字识别)
  8. 泛型数组列表ArrayList
  9. windows版一键绕id工具_Windows免费版一键绕过IOS13.6激活锁工具XgRiNdA,完美重启!...
  10. NLP任务增强:通过引入外部知识来提供额外信息
  11. 【NetApp】可以使用查设备备件型号的链接
  12. WPF--ContextMenu绑定命令的一个问题
  13. Facebook 开源 AI 所使用的硬件平台 'Big Sur'
  14. Go语言基础之10--面向对象编程2之方法
  15. 《WF编程》系列之15 - 顺序工作流与SequenceActivity 3 顺序工作流
  16. 如何用C语言打印出ASCII码表
  17. 全球机场三字码,对应的城市三字码
  18. matlab生成39码,LaTeX技巧357:MATLAB如何直接生成latex代码?
  19. php6简介,[PHP框架] ThinkPHP6 介绍、安装及配置
  20. Web前端--HTML+CSS+JavaScript酷炫游戏动漫网页设计

热门文章

  1. 基于Matlab的1/3倍频程计算
  2. how2heap2.31学习(2)
  3. 小程序生成分享海报php配置,小程序生成海报保存分享图片完全指南(包括:头像,文字)...
  4. 计算机售票作业6字法,铁路客运计算机售票具体操作(28页)-原创力文档
  5. 外刊阅读——英国女王新冠病毒检测呈阳性
  6. 数字图像处理1.3数字图像处理系统
  7. 单表查询和多表连接查询哪个效率更快?
  8. Redis学习笔记2
  9. 华为数通HCIE面试项目题——网关是放在接入还是汇聚?
  10. java实现导出Excel多行表头复杂模板