SynchronousQueue原理详解-非公平模式

开篇

说明:本文分析采用的是jdk1.8

约定:下面内容中Ref-xxx代表的是引用地址,引用对应的节点

前面已经讲解了公平模式的内容,今天来讲解下关于非公平模式下的SynchronousQueue是如何进行工作的,在源码分析的时候,先来简单看一下非公平模式的简单原理,它采用的栈这种FILO先进后出的方式进行非公平处理,它内部有三种状态,分别是REQUEST,DATA,FULFILLING,其中REQUEST代表的数据请求的操作也就是take操作,而DATA表示的是数据也就是Put操作将数据存放到栈中,用于消费者进行获取操作,而FULFILLING代表的是可以进行互补操作的状态,其实和前面讲的公平模式也很类似。

当有相同模式情况下进行入栈操作,相同操作指的是REQUEST和DATA两种类型中任意一种进行操作时,模式相同则进行入栈操作,如下图所示:

同REQUEST进行获取数据时的入栈情况:

同样的put的操作,进行数据操作时为DATA类型的操作,此时队列情况为:

不同模式下又是如何进行操作的?当有不同模式进来的时候,他不是将当前的模式压入栈顶,而是将FullFill模式和当前模式进行按位或之后压入栈顶,也就是压入一个进行FullFill请求的模式进入栈顶,请求配对操作,如下图所示:

通过上图可见,本来栈中有一个DATA模式的数据等待消费者进行消费,这时候来了一个REQUEST模式的请求操作来进行消费数据,这时候并没有将REQUEST模式直接压入栈顶,而是将其转换为FULLFILLING模式,并且保留了原有的类型,这是进行FULLFILLING的请求,请求和栈顶下方元素进行匹配,当匹配成功后将栈顶和匹配元素同时进行出栈操作,详细请见下文分析:

TransferStack

字段信息

/** 消费者模式 */
static final int REQUEST    = 0;
/** 提供者模式 */
static final int DATA       = 1;
/** 互补模式 */
static final int FULFILLING = 2;
/** 栈顶指针 */
volatile SNode head;

方法

方法名 描述
isFulfilling 判断指定类型是否是互补模式
casHead 替换当前头结点
snode 生成SNode节点对象
transfer 主要处理逻辑
awaitFulfill 等待fulfill操作
shouldSpin 判断节点s是头结点或是fulfill节点则返回true

SNode内容

字段信息

volatile SNode next;        // 栈下一个元素
volatile SNode match;       // 匹配的节点
volatile Thread waiter;     // 控制park/unpark的线程
Object item;                // 数据或请求
int mode;                                       // 模式,上面介绍的三种模式

方法

方法名 描述
casNext 判断指定类型是否是互补模式
tryMatch 尝试匹配节点,如果存在匹配节点则判断是否是当前节点,直接返回判断结果,如果没有则替换match内容并且唤醒线程
tryCancel 取消当前节点,将当前节点的match节点设置为当前节点(this)
isCancelled 判断match节点是不是等于当前节点

经过上面内容的分析,接下来就进入正题,让我们整体先看一下下transfer都为我们做了些什么内容,下面是transfer源码内容:

E transfer(E e, boolean timed, long nanos) {/** Basic algorithm is to loop trying one of three actions:** 1. If apparently empty or already containing nodes of same*    mode, try to push node on stack and wait for a match,*    returning it, or null if cancelled.** 2. If apparently containing node of complementary mode,*    try to push a fulfilling node on to stack, match*    with corresponding waiting node, pop both from*    stack, and return matched item. The matching or*    unlinking might not actually be necessary because of*    other threads performing action 3:** 3. If top of stack already holds another fulfilling node,*    help it out by doing its match and/or pop*    operations, and then continue. The code for helping*    is essentially the same as for fulfilling, except*    that it doesn't return the item.*/SNode s = null; // constructed/reused as neededint mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;if (h == null || h.mode == mode) {  // 栈顶指针为空或者是模式相同if (timed && nanos <= 0) {      // 制定了timed并且时间小于等于0则取消操作。if (h != null && h.isCancelled())casHead(h, h.next);     // 判断头结点是否被取消了取消了就弹出队列,将头结点指向下一个节点elsereturn null;} else if (casHead(h, s = snode(s, e, h, mode))) {// 初始化新节点并且修改栈顶指针SNode m = awaitFulfill(s, timed, nanos);            // 进行等待操作if (m == s) {               // 返回内容是本身则进行清理操作clean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}} else if (!isFulfilling(h.mode)) { // 尝试去匹配if (h.isCancelled())            // 判断是否已经被取消了casHead(h, h.next);         // 弹出取消的节点并且从新进入主循环else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//新建一个Full节点压入栈顶for (;;) { // 循环直到匹配SNode m = s.next;       // s的下一个节点为匹配节点if (m == null) {        // 代表没有等待内容了casHead(s, null);   // 弹出full节点s = null;           // 设置为null用于下次生成新的节点break;              // 退回到主循环中}SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn);     // 弹出s节点和m节点两个节点return (E) ((mode == REQUEST) ? m.item : s.item);} else                  // 如果失去了匹配s.casNext(m, mn);   // 帮助取消连接}}} else {                            // 这里是帮助进行fillullSNode m = h.next;               // m是头结点的匹配节点if (m == null)                  // 如果m不存在则直接将头节点赋值为nllcasHead(h, null);           // 弹出fulfill节点else {SNode mn = m.next;if (m.tryMatch(h))          // h节点尝试匹配m节点casHead(h, mn);         // 弹出h和m节点else                        // 丢失匹配则直接将头结点的下一个节点赋值为头结点的下下节点h.casNext(m, mn);       }}}
}
  1. 模式相同的时候则进行等待操作,入队等待操作
  2. 当模式不相同时,首先判断头结点是否是fulfill节点如果不是则进行匹配操作,如果是fulfill节点先帮助头结点的fulfill节点进行匹配操作

接下来再来看一下awaitFulfill方法内容

SNode awaitFulfill(SNode s, boolean timed, long nanos) {final long deadline = timed ? System.nanoTime() + nanos : 0L;// 等待线程Thread w = Thread.currentThread();// 等待时间设置int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {if (w.isInterrupted())          // 判断当前线程是否被中断 s.tryCancel();                  // 尝试取消操作 SNode m = s.match;                  // 获取当前节点的匹配节点,如果节点不为null代表匹配或取消操作,则返回if (m != null)return m;if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel();continue;}}if (spins > 0)spins = shouldSpin(s) ? (spins-1) : 0;else if (s.waiter == null)s.waiter = w; // establish waiter so can park next iterelse if (!timed)LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}
}

通过上面的源码,其实我们之前分析同步模式的时候差不太多,变化的地方其中包括返回内容判断这里判断的是match节点是否为null,还有就是spins时间设置这里发现了shoudSpin用来判断是否进行轮训,来看一下shouldSpin方法:

/*** 判断节点是否是fulfill节点,或者是头结点为空再或者是头结点和当前节点相等时则不需要进行轮训操作*/
boolean shouldSpin(SNode s) {SNode h = head;return (h == s || h == null || isFulfilling(h.mode));
}

实际上就是判断节点是否是fulfill节点,或者是头结点为空再或者是头结点和当前节点相等时则不需要进行轮训操作,如果满足上述条件就不小进行轮训等到操作了直接进行等待就行了。

接下来我们来用例子一点点解析原理:

首先先进行一个put操作,这样可以简单分析下内部信息。

/*** SynchronousQueue原理内容** @author battleheart*/
public class SynchronousQueueDemo1 {public static void main(String[] args) throws Exception {SynchronousQueue<Integer> queue = new SynchronousQueue<>();Thread thread1 = new Thread(() -> {try {queue.put(1);} catch (InterruptedException e) {e.printStackTrace();}});thread1.start();}
}

首先它会进入到transfer方法中,进行第一步的判断他的类型信息,如下所示:

SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;

通过上面代码可以看到e=1所以是DATA类型,接下来进行判断是如何进行操作,当前堆栈是空的,如何判断堆栈为空呢?上面也讲到了head节点为空时则代表堆栈为空,接下来就要判断如果head节点为空或head指向的节点和当前操作内容模式相同,则进行等待操作,如下代码所示:

SNode h = head;
if (h == null || h.mode == mode) {  // empty or same-modeif (timed && nanos <= 0) {      // can't waitif (h != null && h.isCancelled())casHead(h, h.next);     // pop cancelled nodeelsereturn null;} else if (casHead(h, s = snode(s, e, h, mode))) {SNode m = awaitFulfill(s, timed, nanos);if (m == s) {               // wait was cancelledclean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}
} 

显然头结点是空的,所以进入到第一个fi语句中执行等待操作,如果指定了timed则判断时间是否小于0,如果小于0则直接null,反之判断当前节点是否不是头结点以及头结点是否取消,潘祖条件弹出头结点,并将下一个节点设置为头结点,上述条件在当前例子中都不满足,所以要进入到下面这段代码中,首先进行对s进行初始化值,并且进行入栈操作,casHead(h, s = snode(s, e, h, mode)),下面看一下栈中的情况如下图所示:

当执行完了入栈操作之后接下来要执行awaitFulfill这里的操作就是轮训以及将当前节点的线程赋值,并且挂起当前线程。此时的栈的情况如下图所示:

当有同样的模式进行操作时候也是重复上述的操作内容,我们这里模拟两次put操作,让让我们看一下栈中的情况如下图所示:

通过上图可以看到,其实就是将头结点移动到了新的节点上,然后新节点的next节点维护这下一个节点的引用,好了,上述内容分析是同模式的操作,接下来我们试着进行take操作时,这时候会发什么内容呢?

/*** SynchronousQueue例子二进行两次put操作和一次take操作** @author battleheart*/
public class SynchronousQueueDemo1 {public static void main(String[] args) throws Exception {SynchronousQueue<Integer> queue = new SynchronousQueue<>();Thread thread1 = new Thread(() -> {try {queue.put(1);} catch (InterruptedException e) {e.printStackTrace();}});thread1.start();Thread.sleep(2000);Thread thread2 = new Thread(() -> {try {queue.put(2);} catch (InterruptedException e) {e.printStackTrace();}});thread2.start();Thread.sleep(2000);Thread thread6 = new Thread(() -> {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}});thread6.start();}
}

上面例子正好符合上面例子两次put操作的截图,进行两次put操作过后再进行take操作,接下来我们来看一下take操作是如何进行操作的,换句话说当有不同模式的操作时又是如何进行处理呢?上面分析的内容是同种操作模式下的,当有不同操作则会走下面内容:

 else if (!isFulfilling(h.mode)) { // try to fulfillif (h.isCancelled())            // already cancelledcasHead(h, h.next);         // pop and retryelse if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappearSNode m = s.next;       // m is s's matchif (m == null) {        // all waiters are gonecasHead(s, null);   // pop fulfill nodes = null;           // use new node next timebreak;              // restart main loop}SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);} else                  // lost matchs.casNext(m, mn);   // help unlink}}
} else {                            // help a fulfillerSNode m = h.next;               // m is h's matchif (m == null)                  // waiter is gonecasHead(h, null);           // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h))          // help matchcasHead(h, mn);         // pop both h and melse                        // lost matchh.casNext(m, mn);       // help unlink}
}

最下面的else我们等会来进行分析,我们看到如果不是同模式的话,则会先判断是否是fulfill模式,如果不是fulfill模式,则进入到第一个if语句中,显然通过图示6可以得出,头结点head模式并不是fillfull模式,则进入到该if语句中,上来首先判断当前头结点是否被取消了,如果被取消则将头结点移动到栈顶下一个节点,反之则将s节点赋值为fulfill模式按位或当前节点模式,个人认为目的是既保留了原有模式也变成了fulfill模式,我们开篇就讲到了,REQUEST=0,二进制则是00,而DATA=1,其二进制为01,而FULFILLING=2,其二进制表示10,也就是说如果当前节点是REQUEST的话那么节点的内容值时00|10=10,如果节点是DATA模式则s节点的模式时01|10=11,这样的话11既保留了原有模式也是FULFILLING模式,然后将头节点移动到当前s节点,也就是将FULFILLING模式节点入栈操作,目前分析到这里时casHead(h, s=snode(s, e, h, FULFILLING|mode),栈的情况如下图所示:

接下来运行for循环里面内容,先运行如下内容:

SNode m = s.next;       // m is s's match
if (m == null) {        // all waiters are gonecasHead(s, null);   // pop fulfill nodes = null;           // use new node next timebreak;              // restart main loop
}

先判断当前节点也就是头结点s的下一个节点上图中head=s节点,所以s.next节点代表的是Ref-750,判断当前节点是否为空,如果为空的话代表没有可匹配的节点,先对head进行替换为null代表堆栈为空,然后将当前s节点设置为null,退出fulfill匹配模式进入到主循环中,会重新进行对当前节点进行操作,是消费还是匹配,显然本例子中m节点是不为空的,所以这里不会运行,跳过之后运行下面内容:

SNode mn = m.next;
if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);
} else                  // lost matchs.casNext(m, mn);   // help unlink

mn节点在上图中对应的是Ref-681,这里是重点,m.tryMatch(s),m节点尝试匹配s节点,进入到方法里,到这一步是我们再来看一下头结点的元素的内容:

并且唤醒m节点的,告诉m节点,你现在有匹配的对象了你可以被唤醒了,这里唤醒之后就会进入到awaitFulfill下面的操作

SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {               // wait was cancelledclean(s);return null;
}
if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);

运行这里的线程显然是上图中的m节点,因为m节点被唤醒了,m==s代表的是取消了节点,显然没有进行该操作,然后就是帮助头结点进行fulfill操作,这里重点说一下这段代码:

if ((h = head) != null && h.next == s)casHead(h, s.next);  

获取当前头结点,也就是上图中的头结点如果不为空而且h.next节点为m节点正好是m节点进行操作时的s节点,也就是说这个语句是成立的,直接将头节点指向了上图的mn节点,这里的操作和take中的下面操作是一样的,也就是帮助fulfill操作弹出栈顶和栈顶匹配的节点内容,下面代码:

SNode mn = m.next;
if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);
} else                  // lost matchs.casNext(m, mn);   // help unlink

重点是casHead的代码,弹出s和m两个节点,此时栈中内容如下图所示:

主要的流程分析完毕了,但是细心的朋友会发现,最后面还有一个帮助fulfill的操作,(transfer中)代码如下所示:

else {                            // help a fulfillerSNode m = h.next;               // m is h's matchif (m == null)                  // waiter is gonecasHead(h, null);           // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h))          // help matchcasHead(h, mn);         // pop both h and melse                        // lost matchh.casNext(m, mn);       // help unlink}
}

个人理解是这样的,我们上面也分析到了如果模式是相同模式情况和如果是不同模式且模式不为匹配模式的情况,但是还会有另外一种情况就是如果是不同模式并且头结点是匹配模式的就会进入到帮助去fullfill的情况,我来画图说明一下该情况:

如上图所示,上一个匹配操作没有进行完然后又来了一个请求操作,他就会帮助head进行匹配操作,也就是运行上面的代码逻辑,逻辑和匹配内容是一样的。

接下来让我们看一下取消的clean方法内容:

void clean(SNode s) {s.item = null;   // 将item值设置为nulls.waiter = null; // 将线程设置为nullSNode past = s.next;   // s节点下一个节点如果不为空,并且节点是取消节点则指向下下个节点,这里是结束的标识,代表没有了。if (past != null && past.isCancelled())past = past.next;// 如果取消的是头节点则运行下面的清理操作,操作逻辑很简单就是判断头结点是不是取消节点,如果是则将节点一定到下一个节点SNode p;while ((p = head) != null && p != past && p.isCancelled())casHead(p, p.next);// 取消不是头结点的嵌套节点。while (p != null && p != past) {SNode n = p.next;if (n != null && n.isCancelled())p.casNext(n, n.next);elsep = n;}
}

通过源码可以看到首先是先找到一个可以结束的标识past,也就说到这里就结束了,判断是否不是头节点被取消了,如果是头节点被取消了则进行第一个while语句,操作也很简单就是将头节点替换头结点的下一个节点,如果不是头节点被取消了则进行下面的while语句操作,其实就是将取消的上一个节点的下一个节点指定为被取消节点的下一个节点,到此分析完毕了。

结束语

如果有分析不正确的请各位指正,我这边改正~

转载于:https://www.cnblogs.com/dwlsxj/p/synchronousqueue-unfair-pattern.html

图解SynchronousQueue原理详解-非公平模式相关推荐

  1. 图解SynchronousQueue原理-公平模式

    SynchronousQueue原理详解-公平模式 一.介绍 SynchronousQueue是一个双栈双队列算法,无空间的队列或栈,任何一个对SynchronousQueue写需要等到一个对Sync ...

  2. 深入剖析Redis系列(三) - Redis集群模式搭建与原理详解

    前言 在 Redis 3.0 之前,使用 哨兵(sentinel)机制来监控各个节点之间的状态.Redis Cluster 是 Redis 的 分布式解决方案,在 3.0 版本正式推出,有效地解决了 ...

  3. 【Android架构师java原理详解】二;反射原理及动态代理模式

    前言: 本篇为Android架构师java原理专题二:反射原理及动态代理模式 大公司面试都要求我们有扎实的Java语言基础.而很多Android开发朋友这一块并不是很熟练,甚至半路初级底子很薄,这给我 ...

  4. 详解非局部均值滤波原理以及用MATLAB源码实现

    详解非局部均值滤波原理以及用MATLAB源码实现 序言 均值滤波.中值滤波.高斯滤波在滤除噪声的过程中,无可避免的使图像的边缘细节和纹理信息所被滤除.针对此问题,Buades[1]等人提出了非局部均值 ...

  5. LVS原理详解(3种工作模式及8种调度算法)

    2017年1月12日, 星期四 LVS原理详解(3种工作模式及8种调度算法) LVS原理详解及部署之二:LVS原理详解(3种工作方式8种调度算法) 作者:woshiliwentong  发布日期:20 ...

  6. 动态复权(真实价格)模式原理详解!

    动态复权(真实价格)模式原理详解! 如果没有意外,你之前一直在使用前复权价格做回测,使用前复权价格回测存在未来函数(未卜先知,提前使用未来的数据),因此你的回测结果都是错的. Tell me why? ...

  7. Java集合迭代器原理图解_Java Iterator接口遍历单列集合迭代器原理详解

    这篇文章主要介绍了Java Iterator接口遍历单列集合迭代器原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Iterator接口概述 ...

  8. 图解Semaphore信号量之AQS共享锁-非公平模式

    介绍 之前我们已经讲解过关于AQS的独占锁,这一章节主要讲解AQS的共享锁,以Semaphore信号量来进行讲解,相信通过看了本章节内容的同学可以对AQS的共享模式有一个了解,Semaphore信号量 ...

  9. AQS抽象队列同步器原理详解

    系列文章目录 第一节 synchronized关键字详解-偏向锁.轻量级锁.偏向锁.重量级锁.自旋.锁粗化.锁消除 AQS抽象队列同步器原理详解 系列文章目录 前言 一.AQS特性 二.AQS原理 1 ...

  10. 晶体三极管结构及其工作原理详解

    晶体三极管基本概述 晶体管是一种与其他电路元件结合使用时可产生电流增益.电压增益和信号功率增益的多结半导体器件.因此,晶体管称为有源器件,而二极管称为无源器件.晶体管的基本工作方式是在其两端施加电压时 ...

最新文章

  1. pytorch遇见RuntimeError: CUDA out of memory的解决
  2. 员工因公司而加入,却因主管而离开
  3. 网站优化数据分析不建议你遗落这三点
  4. BugkuCTF–flag在index里
  5. 前端学习(3028):vue+element今日头条管理-使用icon图标的处理
  6. mysql数学函数名_MYSQL 常见数学函数说明
  7. 多项式辗转相除法求最大公约数_多项式的一些性质
  8. 为什么Windows的兼容性这么强大,到底用了什么技术?
  9. 收到阿里年终奖后,我感觉穷的睡不着,网友:贫穷限制了我的想象力
  10. javascript用DOM解释XML
  11. ***WIN2003 PHP服务器的另类技术
  12. FLV视频合并-JAVA代码
  13. CSS3实现骗人版无缝轮播图
  14. 迅雷11下载报错:下载引擎未启动
  15. 地铁译:Spark for python developers ---Spark的数据戏法
  16. 计算机里FC方式,谁知道头文字D里提到的FD,FR,FC,FF指的是什么驱动方式的车?...
  17. python数据可视化案例 淘宝粽子_Python可视化对比分析淘宝低价人群和匿名用户的淘宝连衣裙数据...
  18. flash传值php乱码,Flash中出现中文乱码的解决办法,网页模板帮助中心
  19. Windows记录ping时间戳
  20. 瞬时: lnstant

热门文章

  1. ValueError: threshold must be numeric and non-NAN, try sys.maxsize for untruncated representation
  2. cuda对应pytorh安装
  3. ipython notebook使用教程
  4. 基于Caffe的人脸检测实现
  5. c++ 之 template函数模板
  6. python反转数字_[蓝桥杯]使用列表反转的回文数(Python代码),数字,利用,取反
  7. oracle java 映射_java程序访问映射后的oracle
  8. MySQL破log_MySQL中的binlog相关命令和恢复技巧
  9. linux shell 将命令行终端输出结果写入保存到文件中
  10. php报错 Function name must be a string in xxxx