简介

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。

LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue(公平模式下转交元素)、LinkedBlockingQueue(阻塞Queue的基本方法)的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

LinkedTransferQueue只有两个构造方法,这里不再详细介绍:

public LinkedTransferQueue() { }public LinkedTransferQueue(Collection<? extends E> c) {this();addAll(c);
}

LinkedTransferQueue源码详解

LinkedTransferQueue类定义如下:

public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable

LinkedTransferQueue类继承自AbstractQueue抽象类,并且实现了TransferQueue接口:

public interface TransferQueue<E> extends BlockingQueue<E> {// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。boolean tryTransfer(E e);// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。void transfer(E e) throws InterruptedException;// 在上述方法的基础上设置超时时间boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;// 如果至少有一位消费者在等待,则返回trueboolean hasWaitingConsumer();// 获取所有等待获取元素的消费线程数量int getWaitingConsumerCount();
}

TransferQueue接口继承了BlockingQueue接口,并进行了扩充,自己又定义了一些LinkedTransferQueue类需要用到的方法。

TransferQueue队列中的节点都是Node类型:

static final class Node {// 如果是消费者请求的节点,则isData为false,否则该节点为生产(数据)节点为truefinal boolean isData;   // false if this is a request node// 数据节点的值,若是消费者节点,则item为nullvolatile Object item;   // initially non-null if isData; CASed to match// 指向下一个节点volatile Node next;// 等待线程volatile Thread waiter; // null until waiting// CAS设置nextfinal boolean casNext(Node cmp, Node val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// CAS设置itemfinal boolean casItem(Object cmp, Object val) {// assert cmp == null || cmp.getClass() != Node.class;return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}// 构造方法Node(Object item, boolean isData) {UNSAFE.putObject(this, itemOffset, item); // relaxed writethis.isData = isData;}// 将next指向自己final void forgetNext() {UNSAFE.putObject(this, nextOffset, this);}// 匹配或者节点被取消的时候会调用,设置item自连接,waiter为nullfinal void forgetContents() {UNSAFE.putObject(this, itemOffset, this);UNSAFE.putObject(this, waiterOffset, null);}// 节点是否被匹配过了final boolean isMatched() {Object x = item;return (x == this) || ((x == null) == isData);}// 是否是一个未匹配的请求节点// 如果是的话,则isData为false,且item为null,因为如果被匹配过了,item就不再为null,而是指向自己final boolean isUnmatchedRequest() {return !isData && item == null;}// 如果给定节点不能连接在当前节点后则返回truefinal boolean cannotPrecede(boolean haveData) {boolean d = isData;Object x;return d != haveData && (x = item) != this && (x != null) == d;}// 匹配一个数据节点final boolean tryMatchData() {// assert isData;Object x = item;if (x != null && x != this && casItem(x, null)) {LockSupport.unpark(waiter);return true;}return false;}private static final long serialVersionUID = -3375979862319811754L;// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;private static final long waiterOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter"));} catch (Exception e) {throw new Error(e);}}
}

匹配前后节点item的变化,其中node1为数据节点,node2为消费者请求的占位节点:

Node node1(isData-item) node2(isData-item)
匹配前 true-item false-null
匹配后 true-null false-this
  • 数据节点,则匹配前item不为null且不为自身,匹配后设置为null。
  • 占位请求节点,匹配前item为null,匹配后自连接。

LinkedTransferQueue类中的重要字段如下:

// 是否为多核
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;// 作为第一个等待节点在阻塞之前的自旋次数
private static final int FRONT_SPINS   = 1 << 7;// 前驱节点正在处理,当前节点在阻塞之前的自旋次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;// sweepVotes的阈值
static final int SWEEP_THRESHOLD = 32;// 队列首节点
transient volatile Node head;// 队列尾节点
private transient volatile Node tail;// 断开被删除节点失败的次数
private transient volatile int sweepVotes;// xfer方法的how参数的可能取值
// 用于无等待的poll、tryTransfer
private static final int NOW   = 0; // for untimed poll, tryTransfer
// 用于offer、put、add
private static final int ASYNC = 1; // for offer, put, add
// 用于无超时的阻塞transfer、take
private static final int SYNC  = 2; // for transfer, take
// 用于超时等待的poll、tryTransfer
private static final int TIMED = 3; // for timed poll, tryTransfer

我们看一看LinkedTransferQueue类的入队、出队方法:

// 入队方法
public void put(E e) {xfer(e, true, ASYNC, 0);
}public boolean offer(E e, long timeout, TimeUnit unit) {xfer(e, true, ASYNC, 0);return true;
}public boolean offer(E e) {xfer(e, true, ASYNC, 0);return true;
}public boolean add(E e) {xfer(e, true, ASYNC, 0);return true;
}// 出队方法
public E take() throws InterruptedException {E e = xfer(null, false, SYNC, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();
}public E poll() {return xfer(null, false, NOW, 0);
}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E e = xfer(null, false, TIMED, unit.toNanos(timeout));if (e != null || !Thread.interrupted())return e;throw new InterruptedException();
}

我们可以看到,这些出队、入队方法都会调用xfer方法,因为LinkedTransferQueue是无界的,入队操作都会成功,所以入队操作都是ASYNC的,而出队方法,则是根据不同的要求传入不同的值,比如需要阻塞的出队方法就传入SYNC,需要加入超时控制的就传入TIMED。

除了上述方法会调用xfer方法之外,TransferQueue接口定义的方法也会调用xfer方法:

public boolean tryTransfer(E e) {return xfer(e, true, NOW, 0) == null;
}public void transfer(E e) throws InterruptedException {if (xfer(e, true, SYNC, 0) != null) {Thread.interrupted(); // failure possible only due to interruptthrow new InterruptedException();}
}public boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)return true;if (!Thread.interrupted())return false;throw new InterruptedException();
}

可以看到,xfer方法是实现LinkedTransferQueue的关键方法,下面我们来仔细分析一下该方法:

xfer方法

private E xfer(E e, boolean haveData, int how, long nanos) {// 如果haveData但是e为null,则抛出NullPointerException异常if (haveData && (e == null))throw new NullPointerException();// s是将要被添加的节点,如果需要Node s = null;                        // the node to append, if neededretry:for (;;) {                            // restart on append race// 从首节点开始匹配for (Node h = head, p = h; p != null;) { // find & match first nodeboolean isData = p.isData;Object item = p.item;// 判断节点是否被匹配过// item != null有2种情况:一是put操作,二是take的item被修改了(匹配成功)// (itme != null) == isData 要么表示p是一个put操作,要么表示p是一个还没匹配成功的take操作if (item != p && (item != null) == isData) { // unmatched// 节点与此次操作模式一致,无法匹配if (isData == haveData)   // can't matchbreak;// 匹配成功if (p.casItem(item, e)) { // matchfor (Node q = p; q != h;) {Node n = q.next;  // update by 2 unless singleton// 更新head为匹配节点的next节点if (head == h && casHead(h, n == null ? q : n)) {// 将旧节点自连接h.forgetNext();break;}                 // advance and retryif ((h = head)   == null ||(q = h.next) == null || !q.isMatched())break;        // unless slack < 2}// 匹配成功,则唤醒阻塞的线程LockSupport.unpark(p.waiter);// 类型转换,返回匹配节点的元素return LinkedTransferQueue.<E>cast(item);}}// 若节点已经被匹配过了,则向后寻找下一个未被匹配的节点Node n = p.next;// 如果当前节点已经离队,则从head开始寻找p = (p != n) ? n : (h = head); // Use head if p offlist}// 若整个队列都遍历之后,还没有找到匹配的节点,则进行后续处理// 把当前节点加入到队列尾if (how != NOW) {                 // No matches availableif (s == null)s = new Node(e, haveData);// 将新节点s添加到队列尾并返回s的前驱节点Node pred = tryAppend(s, haveData);// 前驱节点为null,说明有其他线程竞争,并修改了队列,则从retry重新开始if (pred == null)continue retry;           // lost race vs opposite mode// 不为ASYNC方法,则同步阻塞等待if (how != ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);}// how == NOW,则立即返回return e; // not waiting}
}

xfer方法的整个操作流程如下所示:

1、寻找和操作匹配的节点

从head开始向后遍历寻找未被匹配的节点,找到一个未被匹配并且和本次操作的模式不同的节点,匹配节点成功就通过CAS 操作将匹配节点的item字段设置为e,若修改失败,则继续向后寻找节点。

通过CAS操作更新head节点为匹配节点的next节点,旧head节点进行自连接,唤醒匹配节点的等待线程waiter,返回匹配的 item。如果CAS失败,并且松弛度大于等于2,就需要重新获取head重试。

2、如果在上述操作中没有找到匹配节点,则根据参数how做不同的处理:

  • NOW:立即返回,也不会插入节点
  • SYNC:插入一个item为e(isData = haveData)到队列的尾部,然后自旋或阻塞当前线程直到节点被匹配或者取消。
  • ASYNC:插入一个item为e(isData = haveData)到队列的尾部,不阻塞直接返回。
  • TIMED:插入一个item为e(isData = haveData)到队列的尾部,然后自旋或阻塞当前线程直到节点被匹配或者取消或者超时。

上面提到了一个松弛度的概念,它是什么作用呢?

在节点被匹配(被删除)之后,不会立即更新head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个“松弛阀值”之后才会更新(在LinkedTransferQueue中,这个值为 2)。这个“松弛阀值”一般为1-3,如果太大会降低缓存命中率,并且会增加遍历链的长度;太小会增加 CAS 的开销。

入队操作则是调用了tryAppend方法:

private Node tryAppend(Node s, boolean haveData) {// 从尾节点开始for (Node t = tail, p = t;;) {        // move p to last node and appendNode n, u;                        // temps for reads of next & tail// 队列为空,则将s设置为head并返回sif (p == null && (p = head) == null) {if (casHead(null, s))return s;                 // initialize}else if (p.cannotPrecede(haveData))return null;                  // lost race vs opposite mode// 不是最后一个节点else if ((n = p.next) != null)    // not last; keep traversingp = p != t && t != (u = tail) ? (t = u) : // stale tail(p != n) ? n : null;      // restart if off list// CAS失败else if (!p.casNext(null, s))p = p.next;                   // re-read on CAS failureelse {// 更新tailif (p != t) {                 // update if slack now >= 2while ((tail != t || !casTail(t, s)) &&(t = tail)   != null &&(s = t.next) != null && // advance and retry(s = s.next) != null && s != t);}return p;}}
}

该方法主要逻辑为:添加节点s到队列尾并返回s的前继节点,失败时(与其他不同模式线程竞争失败)返回null,没有前继节点则返回自身。

加入队列后,如果how还不是ASYNC则调用awaitMatch()方法阻塞等待:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {// 计算超时时间点final long deadline = timed ? System.nanoTime() + nanos : 0L;// 获取当前线程对象Thread w = Thread.currentThread();// 自旋次数int spins = -1; // initialized after first item and cancel checks// 随机数ThreadLocalRandom randomYields = null; // bound if neededfor (;;) {Object item = s.item;// 若有其它线程匹配了该节点if (item != e) {                  // matched// assert item != s;// 撤销该节点,并返回匹配值s.forgetContents();           // avoid garbagereturn LinkedTransferQueue.<E>cast(item);}// 线程中断或者超时,则将s的节点item设置为sif ((w.isInterrupted() || (timed && nanos <= 0)) &&s.casItem(e, s)) {        // cancel// 断开节点unsplice(pred, s);return e;}// 自旋if (spins < 0) {                  // establish spins at/near front// 计算自旋次数if ((spins = spinsFor(pred, s.isData)) > 0)randomYields = ThreadLocalRandom.current();}else if (spins > 0) {             // spin--spins;// 生成随机数来让出CPU时间if (randomYields.nextInt(CHAINED_SPINS) == 0)Thread.yield();           // occasionally yield}// 将s的waiter设置为当前线程else if (s.waiter == null) {s.waiter = w;                 // request unpark then recheck}// 超时阻塞else if (timed) {nanos = deadline - System.nanoTime();if (nanos > 0L)LockSupport.parkNanos(this, nanos);}// 非超时阻塞else {LockSupport.park(this);}}
}

当前操作为同步操作时,会调用awaitMatch方法阻塞等待匹配,成功返回匹配节点 item,失败返回给定参数e(s.item)。在等待期间如果线程被中断或等待超时,则取消匹配,并调用unsplice方法解除节点s和其前继节点的链接。

final void unsplice(Node pred, Node s) {// 设置item自连接,waiter为nulls.forgetContents(); // forget unneeded fieldsif (pred != null && pred != s && pred.next == s) {// 获取s的后继节点Node n = s.next;// s的后继节点为null,或不为null,就将s的前驱节点的后继节点设置为nif (n == null ||(n != s && pred.casNext(s, n) && pred.isMatched())) {for (;;) {               // check if at, or could be, headNode h = head;if (h == pred || h == s || h == null)return;          // at head or list emptyif (!h.isMatched())break;Node hn = h.next;if (hn == null)return;          // now emptyif (hn != h && casHead(h, hn))h.forgetNext();  // advance head}if (pred.next != pred && s.next != s) { // recheck if offlistfor (;;) {           // sweep now if enough votesint v = sweepVotes;if (v < SWEEP_THRESHOLD) {if (casSweepVotes(v, v + 1))break;}// 达到阀值,进行“大扫除”,清除队列中的无效节点else if (casSweepVotes(v, 0)) {sweep();break;}}}}}
}

如果s的前继节点pred还是指向s(pred.next == s),尝试解除s的链接,若s不是自连接节点,就把pred的next引用指向s的next节点。如果s不能被解除(由于它是尾节点或者pred可能被解除链接,并且pred和s都不是head节点或已经出列),则添加到sweepVotes,sweepVotes累计到阀值SWEEP_THRESHOLD之后就调用sweep()对队列进行一次“大扫除”,清除队列中所有的无效节点:

private void sweep() {for (Node p = head, s, n; p != null && (s = p.next) != null; ) {if (!s.isMatched())// Unmatched nodes are never self-linkedp = s;else if ((n = s.next) == null) // trailing node is pinnedbreak;else if (s == n)    // stale// No need to also check for p == s, since that implies s == np = head;elsep.casNext(s, n);}
}

xfer的主要过程如下图所示:

和SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列,与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步。

参考资料

方腾飞:《Java并发编程的艺术》

并发编程—— LinkedTransferQueue

JUC源码分析-集合篇(六):LinkedTransferQueue

Java并发编程之LinkedTransferQueue阻塞队列详解相关推荐

  1. Java并发编程之CountDownLatch/CyclicBarrierDemo/SemaphoreDemo详解

    CountDownLatch详解 什么是CountDownLatch? 代码说明一 :班长锁门 代码说明二:秦国统一六国 什么是CyclicBarrierDemo? 代码说明一:集齐7个龙珠,召唤神龙 ...

  2. Java并发编程之CountDownLatch(闭锁)使用详解

    package com.zhangxueliang.day_20191108;import java.util.concurrent.CountDownLatch;/*** 计算多线程的运行时间* @ ...

  3. java并发编程之Thread.sleep方法详解

    Thread.sleep方法的作用: 使当前线程暂停执行一段时间,交出cpu的执行时间片,并且在暂停期间不会参与cpu时间片的获取.直到等待时间结束恢复到就绪状态,是否执行还要看OS的调度,或者在这段 ...

  4. java并发编程之thread.join()方法详解

    thread.join()方法的作用:保证线程的执行结果的可见性.原理是通过阻塞主线程实现的. 代码Demo如下: public class ThreadJoinDemo {public static ...

  5. Java网络编程之Socket和ServerSocket详解

    Socket Socket是实现客户端套接字的类,套接字是两台计算机之间进行通信的端点. Socket的实际工作由SocketImpl类的实例执行 .通过更改用于创建套接字实现的套接字工厂,应用程序可 ...

  6. Java并发编程之CyclicBarrier详解

    简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...

  7. 转:Java 7 种阻塞队列详解

    转自: Java 7 种阻塞队列详解 - 云+社区 - 腾讯云队列(Queue)是一种经常使用的集合.Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表. ...

  8. zbb20180929 thread java并发编程之Condition

    java并发编程之Condition 引言 在java中,对于任意一个java对象,它都拥有一组定义在java.lang.Object上监视器方法,包括wait(),wait(long timeout ...

  9. java并发编程之4——Java锁分解锁分段技术

    转载自 java并发编程之4--Java锁分解锁分段技术 并发编程的所有问题,最后都转换成了,"有状态bean"的状态的同步与互斥修改问题.而最后提出的解决"有状态bea ...

最新文章

  1. 36氪研究 | 智慧零售行业研究报告
  2. CTF web题总结--绕过正则表达式
  3. 简单的Postman,还能玩出花?
  4. 华为旗下首款弹出式前置摄像头新机发布:或归属荣耀旗下...
  5. ie6,ie7兼容性总结(转)
  6. .NET Framework 4.5的C#中的对话框消息
  7. 中文和全角检测 两种写法
  8. Ancient Knight(打造Windows Mobile平台最专业的游戏修改器)
  9. C#弹出窗体、C#导出Excel、C#数据展示框、C#弹出框
  10. Java基础知识强化之集合框架笔记55:Map集合之HashMap集合(HashMapInteger,String)的案例...
  11. Mybatis中文文档下载地址分享
  12. 校招行测笔试-言语理解与表达
  13. java空气质量指数AQI换算
  14. vb.net 同时给多个属性赋值_C++程序入门之——赋值操作符
  15. pde与波长 sipm 关系_基于SiPM和TCMPC的时间分辨拉曼散射测量技术研究
  16. jdk1.8的下载与安装教程
  17. 文献导读 - Machine Learning Identifies Stemness Features Associated with Oncogenic Dedifferentiation...
  18. 1080p60Hz需要传多少数据,怎么计算显示器带宽?(二)
  19. oracle查看列属性,oracle查询列属性
  20. 人生的悲哀莫过于:求而不得,舍而不能,得而不惜

热门文章

  1. 达梦数据库查看版本号方法
  2. 我的世界服务器怎么添加信息框,我的世界服务器怎么添加指定建筑
  3. 模式识别分类器评价指标之CMC曲线
  4. 2021-2027全球与中国光储系统市场现状及未来发展趋势
  5. 软件测试/测试开发丨免安装免配置环境的免费 ios 调试工具 sib 来啦
  6. js_RPC,sekiro框架,实现HTTPS通讯
  7. 史上最全 | fNIRS相关软件推荐(含下载资源)
  8. 消灭老鼠 有一只老鼠,在一个环形的田埂上挖了n个老鼠洞,这些洞也是连接为一个环状,我们要用泥土填满这些鼠洞,老鼠从第0号洞开始出现(第0号洞不填),然后依次按每间隔m个洞出现一次。我们要跟在老鼠后
  9. 单表查询和多表连接查询哪个效率更快?
  10. 热敏打印机在小票上打印条码和开钱箱打印小票