生产者-消费者模型之集合SynchronousQueue源码解读
目录
- `SynchronousQueue` 简述
- `SynchronousQueue` 源码
- `SynchronousQueue` 属性
- `SynchronousQueue` 内部类
- `TransferStack` 类(非公平策略,单链表构成栈)
- `TransferQueue` 类(公平策略,单链表构成队列)
- `SynchronousQueue` 构造函数
- `TransferStack` 类(非公平策略)原理
- `transfer()` 方法
- `TransferQueue` 类(公平策略)原理
- `transfer()` 方法
- 入队或入栈操作
- `put(E e)` 方法
- `offer(E e)` 方法
- `add(E e)` 方法
- 出队或出栈操作
- `take()` 方法
- `poll()` 方法
- `remove()` 方法
- `SynchronousQueue` 总结
- `SynchronousQueue` 示例
- 非公平模式
- 公平模式
SynchronousQueue
简述
SynchronousQueue
来自于jdk 1.5
的JUC
包,是一个线程安全的阻塞队列SynchronousQueue
不能简单的使用有界或无界来形容,因为它的内部根本就没有容量SynchronousQueue
中每个插入操作必须等待另一个线程的对应移除操作,反之亦然;两个操作是同步等待的,即一个先到达的操作必须等待另一个匹配的操作出现,两个操作才能成功匹配并传递数据之后返回SynchronousQueue
支持公平策略和非公平策略(默认),所以底层有两种数据结构:队列(实现公平策略,先进先出,单链表实现)和栈(实现非公平策略,先进后出,单链表实现)- 实现了
Serializable
接口,支持序列化;不支持null
元素的传递
SynchronousQueue
源码
SynchronousQueue
属性
/*** CPU中通常一个内核一个线程,后来有了超线程技术,可以把一个物理核心,模拟成两个逻辑核心,线程量增加一倍* 因此这里获取的是CPU的实际可用线程数量,比如 i7-8750H 它具有6核心12线程,因此获取的就是12而不是6* 通常CPU的实际可用线程数量越高,运行并发的程序的效率也越高*/
static final int NCPUS = Runtime.getRuntime().availableProcessors();/*** 超时等待的线程在阻塞之前应该自旋的次数* 该值是经验推导的 ——它适用于各种处理器和 OS。从经验上看,最佳值似乎不会随 CPU 数量(超过 2)而变化,因此只是一个常数(0或32)。*/
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;/*** 非超时等待的线程在阻塞之前应该自旋的次数* 通常大于超时等待的线程的旋转次数,因为不需要检查每次旋转的之后的剩余超时时间,它们旋转的更快(0或512)*/
static final int maxUntimedSpins = maxTimedSpins * 16;/*** 采用自旋而不是使用park阻塞的超时时间边界纳秒数,这是也是一个估计值* 即超时时间大于1000L,那么使用parkNanos阻塞当前线程,否则采用快速的自旋等待即可* 原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确* 因此,在超时非常短的场景下,AQS会进入无条件的快速自旋而不是挂起线程* <p>* 这个参数在AQS的超时获取锁,Condition的超时等待中也被使用*/
static final long spinForTimeoutThreshold = 1000L;/*** Transferer 实例引用,用于在传输操作无法正常完成时存储阻塞的线程以及元素* 只会初始化某一个公平或者非公平模式的子类实例*/
private transient volatile Transferer<E> transferer;/*** 下面的字段只是为了兼容JDK1.5的SynchronousQueue的序列化策略* 只有在序列化或者反序列化时才会初始化,在高版本的SynchronousQueue中永远不会使用*/
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
SynchronousQueue
内部类
TransferStack
类(非公平策略,单链表构成栈)
- 底层是一个单链表的结构,结点类型是
SNode
类型 TransferStack
持有一个单链表头部head
作为栈顶- 每一个入栈的线程遵循先进后出的原则
- 属性的赋值都是采用
CAS
操作来完成的
abstract static class Transferer<E> {abstract E transfer(E e, boolean timed, long nanos);
}static final class TransferStack<E> extends Transferer<E> {// 表示消费数据的出栈线程static final int REQUEST = 0;// 表示生产数据的入栈线程static final int DATA = 1;// 表示匹配另一个入栈线程或出栈线程static final int FULFILLING = 2;// 栈的头部元素,即栈顶volatile SNode head;// TransferStack的数据结构static final class SNode {volatile SNode next;// 相匹配的节点volatile SNode match; // 等待的线程volatile Thread waiter; Object item; // 模式: REQUEST 或者 DATA 或者 FULFILLINGint mode;SNode(Object item) {this.item = item;}// CAS操作都是使用 UNSAFE来完成的private static final sun.misc.Unsafe UNSAFE;private static final long matchOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = SNode.class;matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}// ......
}
通过 SNode
的 mode
属性可以确定三种模式的线程节点
REQUEST
:表示出栈的线程在等待消费数据DATA
:表示入栈的线程在等待生产数据FULFILLING
:表示某个请求模式的结点正在与栈中的另一个等待的结点完成匹配
TransferQueue
类(公平策略,单链表构成队列)
- 底层是一个单链表的结构,结点类型是
Qnode
类型 TransferQueue
持有一个双链表头部head
和尾部tail
- 每一个入队的线程遵循先进先出的原则
abstract static class Transferer<E> {abstract E transfer(E e, boolean timed, long nanos);
}static final class TransferQueue<E> extends Transferer<E> {// 队列头结点transient volatile QNode head;// 队列尾结点transient volatile QNode tail;// 中断或超时结点的前继结点,用于移除的结点属于尾结点时使用transient volatile QNode cleanMe;TransferQueue() {// 初始化一个哨兵结点,item为null,isData为falseQNode h = new QNode(null, false);head = h;tail = h;}static final class QNode {volatile QNode next; volatile Object item; // 当前结点所属的线程,用于控制 park/unparkvolatile Thread waiter; // 是否存放了数据 true 是 false 否final boolean isData;QNode(Object item, boolean isData) {this.item = item;this.isData = isData;}// CAS操作都是使用 UNSAFE来完成的private static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = QNode.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}// ......
}
SynchronousQueue
构造函数
public SynchronousQueue() {// 调用另一个构造器,默认传入falsethis(false);
}public SynchronousQueue(boolean fair) {// true -> TransferQueue// false -> TransferStacktransferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferStack
类(非公平策略)原理
head
指针指向栈顶节点。新来的线程如果不能和栈顶节点匹配,会被构造成REQUEST
或者DATA
模式的节点压入栈顶等待;新来的线程如果可以和栈顶节点匹配,那么也会构造成一个FULFILLING
节点压入栈顶,随后与下面的节点匹配,匹配成功两个节点一定是相邻的节点,成功之后都会出栈- 由于节点的入栈都是压入栈顶,这样的话
TransferStack
中等待的节点被匹配的优先级并不是入栈的先后顺序原则,而是越晚入栈的线程反而会被越先匹配,这就是非公平模式的由来
transfer()
方法
/**- 生产或消费一个元素- @param e 如果非空,则表示生产者要交给消费者的数据; 如果为null,则表示消费者请求获取生产者的数据- @param timed 如果是超时操作,那么为 true- @return 如果返回的结果不为 null,那么表示数据被转移了或获取了;如果为 null,这个操作可能因为超时或者被中断而失败了*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {SNode s = null; // 如果 e 为 null 则 mode 初始化为 REQUEST,表示出栈线程在等待消费数据// 否则 mode 初始化为 DATA,表示入栈线程在等待生产数据int mode = (e == null) ? REQUEST : DATA;for (; ; ) {// 获取栈顶SNode h = head;// 栈顶为 null 或 栈顶的模式与此次操作数据的模式相同if (h == null || h.mode == mode) {// 设置了 timed 为 true && 等待时间 <= 0,表示不能等待,需要立即操作 if (timed && nanos <= 0) { // 栈顶不为 null && 栈顶已被取消if (h != null && h.isCancelled())// 重新设置头结点(弹出之前的头结点)casHead(h, h.next); elsereturn null;}// 否则,说明不是超时操作,或是超时时间还 > 0,还可以等待后来的节点匹配// 生成一个 SNode 结点;将原来的 head 头节点设置为该结点的 next 节点;将head 头节点设置为该节点else if (casHead(h, s = snode(s, e, h, mode))) {// 空旋或者阻塞直到 s 结点被 awaitFulfill 操作所匹配SNode m = awaitFulfill(s, timed, nanos);// m 有可能 s 匹配的节点,也有可能是 s 自己,即等待被中断而被取消了 if (m == s) {// 如果 m == s 为 true,即 s 被取消了 clean(s);// 那么从栈中清除 s 节点return null;}// h 赋值为最新的 head,如果 h 不为 null,并且 h.next == s,表明 s 被后来的节点匹配了if ((h = head) != null && h.next == s)// 比较并替换 head 域(移除插入在 s 之前的结点和 s 结点)casHead(h, s.next); // 无论是消费元素还是生产元素,都会返回它们之间匹配传递的数据 return (E) ((mode == REQUEST) ? m.item : s.item);}}// 否则,表示 h 不为 null 并 h 的模式 != mode,那么调用 isFulfilling 判断 h 是否被匹配 // 如果还没匹配,那么说明 s 可以与 h 尝试匹配 else if (!isFulfilling(h.mode)) { // 如果 h 被取消了if (h.isCancelled()) // 比较并替换 head 域(h 头结点出栈)casHead(h, h.next); /** 如果没被取消,生成一个 SNode 结点* s.next=head,即 h 成为了 s 的后继,s 成为新 head 结点,即栈顶结点* 可能是有两个线程同时匹配一个等待的结点,其中一个CAS成功,那么另一个自然失败了,继续下一次循环*/else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {// CAS成功之后,表示s匹配h成功,开启一个循环,尝试h匹配s直到成功或者被匹配者取消for (; ; ) {// 获取s的后继m,第一次循环时m即上面CAS操作中的h,后续循环时m为新的后继SNode m = s.next; // 如果此时m为null,表示m结点被取消了,并且由clean方法可知后面也没有等待的结点了。或者循环到了栈底部,没有后继结点了if (m == null) { // 尝试CAS的将head从s指向null,如果成功,表示没有等待结点了casHead(s, null); // 也置为null,下一次使用snode新建结点,重新匹配,目的是可以使得这个s结点被回收s = null; break; }SNode mn = m.next;// m 匹配 s,注意最上面是s匹配m,这里是m匹配s的操作// 简单的说就是 tryMatch会尝试将m.match设置为sif (m.tryMatch(s)) {// 到这里的表示,两个结点彻底匹配成功// 成功之后,尝试CAS的将head从s指向mn,即移除两个已被匹配的结点casHead(s, mn); // 无论是消费元素还是生产元素,都会返回它们之间匹配传递的数据return (E) ((mode == REQUEST) ? m.item : s.item);} else /** 匹配失败,表示m被取消了* 尝试CAS的将s.next从m指向mn,即帮助移除m结点,继续下一次内层循环* 下一次将会由新后继mn去匹配s,直到匹配成功,或者循环到了栈底部,此时后继mn为null* 然后由于后继null,自然会设置head指向null,那么这个请求结点将进入第二大步*/s.casNext(m, mn); }}}// 否则,表示h被匹配了,那么帮助后续匹配的步骤else { SNode m = h.next; // 如果此时m为null,表示m结点被取消了,并且由clean方法可知后面也没有等待的结点了if (m == null) // 尝试CAS的将head从h指向null,如果成功,表示没有等待结点了,这个h结点也被清理了// 如果失败,表示又有另一个新的结点进来,并且另一个线程这一步的CAS操作成功了casHead(h, null); else {SNode mn = m.next;// m调用tryMatch尝试匹配s,简单的说就是 tryMatch会尝试将m.match设置为sif (m.tryMatch(h)) // 到这里的表示,两个结点彻底匹配成功,成功之后,尝试CAS的将head从h指向mn,即移除两个已被匹配的结点casHead(h, mn); else /** 匹配失败,表示m被取消了* 尝试CAS的将h.next从m指向mn,即帮助移除m结点,继续下一次内层循环* 下一次将会由新后继mn去匹配h,直到匹配成功,或者循环到了栈底部,此时后继mn为null* 然后由于后继null,自然会设置head指向null,将可能进入第一大步*/h.casNext(m, mn); }}}
}
如果栈为空或者栈顶元素的模式与当前线程的模式一致,则当前线程模式不能与栈顶节点的模式匹配。则要么返回、要么等待
1.1.如果是超时操作,并且超时时间
< = 0
。那么如果栈顶节点此时被取消了,那么尝试CAS
的将head
指向原head
的后继,将其移除,继续下一次循环;否则直接返回null
1.2. 如果不是超时操作,或是剩余超时间
> 0
,表示还可以被动等待后来的节点匹配,注意在等待期间不会将其从栈中移除,除非匹配到的线程是自己。如果等待过程中被匹配成功,则移除栈里面的匹配线程(消费和生产),并返回传递的数据;如果等待被取消了,则返回null
否则,就是栈顶元素的模式与当前线程的模式不一致,那么判断栈顶节点是否已经被匹配了(是否是匹配节点),如果没有匹配(不是匹配节点),表示当前线程可以主动与栈顶节点尝试匹配
2.1. 如果栈顶元素被取消了,则尝试
CAS
将head
指向原head
的后继,继续下一次循环2.2. 如果栈顶元素没有被取消。尝试构建一个匹配节点,
mode = mode | FULFILLING(2)
,然后将匹配节点压入栈顶,原栈顶节点成为该节点的后继,表示该匹配节点尝试与后继节点匹配。随后内层循环进行匹配,成功之后两个匹配的节点一起出栈,并返回传递的数据;如果最终匹配失败,那么尝试将head
指向null
,继续下一次外层循环否则,表示栈顶节点已经被匹配了(是匹配节点),那么当前线程尝试帮助栈顶节点和其后继节点完成后续的匹配,即出栈过程,类似于第二种行为,之后继续下一次循环
TransferQueue
类(公平策略)原理
- 公平模式和非公平模式不同的是,初始化一个
TransferQueue
对象,会创建一个Qnode
哨兵结点,item
为null
,isData
为false
。head
和tail
都指向该结点 head
永远都指向一个哨兵节点(或者称作已经匹配成功的节点)。==进入队列的线程都是从队列尾部进入的,head
的后继将会被作为新进来的线程匹配的节点,后来尝试匹配的线程不会像TransferStack
那样构造一个节点,匹配成功之后原来等待的节点出队列- 如果当前新来的线程模式不能与
head.next
节点匹配,则进入入队操作;如果能与之匹配,则进行匹配操作,而这个新来的线程是不入队的,只是在队列中移除与这个新来的线程匹配的线程而已 TransferQueue
中等待的节点被匹配的优先级就是入队的先后顺序原则,先入队的先被匹配,这就是公平模式的由来
transfer()
方法
/**- 生产或消费一个元素- @param e 如果非空,则表示生产者要交给消费者的数据; 如果为null,则表示消费者请求获取生产者的数据- @param timed 如果是超时操作,那么为true- @return 如果返回的结果不为null,那么表示数据被转移了或获取了;如果为null,这个操作可能因为超时或者被中断而失败了*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {QNode s = null;// 如果e为null那么isDate初始化为false,表示消费线程在等待消费数据// 否则isDate初始化为true,表示生产线程在等待生产数据boolean isData = (e != null);for (; ; ) {QNode t = tail;// 获取尾结点QNode h = head;// 获取头结点// 如果t或者h为null,那么说明双重队列没有初始化,那么结束本次循环,继续下一次循环if (t == null || h == null) continue; // 如果队列为空,或者尾节点的模式和该线程的模式一致,这表示当前线程请求模式不能与head.next节点匹配if (h == t || t.isData == isData) { QNode tn = t.next;// 如果t不为tail,说明尾节点发生了变化,那么结束本次循环,继续下一次循环if (t != tail) continue;// 如果t为tail,但是后继tn不为null,说明其他线程节点入队,但是还没有来得及改变tail的引用指向if (tn != null) { // 向前推进tail,就是尝试CAS的帮助tail从t指向tn,随后结束本次循环,继续下一次循环advanceTail(t, tn);continue;}// 到这里表示t还是此时的tail,并且还没有新结点入队,此时可以构造新结点入队了// 设置了timed并且等待时间 <= 0,表示不能等待,需要立即操作if (timed && nanos <= 0) return null;if (s == null)// 那么初始化一个QNode结点赋给s,这是将要入队的结点s = new QNode(e, isData);// 完整的入队成功分为两步,第一步是CAS的将s结点加入到队列尾部,第二步是CAS的将tail指向s结点// 尝试CAS的将t的next从null指向s,即尝试将s入队if (!t.casNext(null, s)) // 如果CAS入队失败,表示存在竞争,那么结束本次循环,继续下一次循环continue;// CAS将s入队成功之后,继续尝试CAS的将tail从t指向s,即改变队尾属性的指向advanceTail(t, s); // 调用awaitFulfill方法用于节点s的自旋或者阻塞,直到s被匹配,或者s被取消了Object x = awaitFulfill(s, e, timed, nanos);// 如果x==s 为true,即s被取消了if (x == s) { // 调用clean尝试从队列中移除s结点clean(t, s); return null;}// 到这里,表示s结点被后来的请求成功匹配了。调用isOffList判断s结点是否还未出队if (!s.isOffList()) { // 在公平模式下,如果一个结点被成功匹配,那么该结点一定是 head 的后继// 那么尝试CAS的将head从t指向s,推进head的指向,将s变成headadvanceHead(t, s); // 如果x不为null,表示s结点代表一个消费者请求,此时item指向传递的数据if (x != null) // s的item指向自己,释放传递的数据的引用s.item = s;// s的waiter清空,释放线程引用s.waiter = null;}// 如果x不为null,表示s结点代表一个消费者请求,x就是接收的数据,那么返回x// 如果x为null,表示s结点代表一个生产者请求,e就是传递的数据,那么返回ereturn (x != null) ? (E) x : e;}// 如果队列不为空并且尾结点的模式和该请求的模式不一致,这表示当前线程请求模式可以尝试与head.next结点的匹配else { QNode m = h.next; // 如果此时队列结构被改变了,那么结束本次循环,继续下一次循环if (t != tail || m == null || h != head)continue; Object x = m.item;if (isData == (x != null) || // m结点被匹配 x == m || // m结点被取消 !m.casItem(x, e)) {// CAS操作失败 // 队列头结点出队列,并重试advanceHead(h, m); continue;}// 匹配成功,设置新的头结点advanceHead(h, m); // 唤醒m内部保存的线程LockSupport.unpark(m.waiter);// 如果x不为null,表示m结点代表一个生产者请求,x就是传递的数据,那么返回x// 如果x为null,表示m结点代表一个消费者请求,e就是接收的数据,那么返回ereturn (x != null) ? (E) x : e;}}
}
- 如果队列为空或者队列元素的模式
isData
与当前线程的模式一致,表示当前线程模式不能与栈顶节点的模式匹配。那么将会构造一个节点添加到队尾,并向后推进tail
引用指向,等待被后来的线程匹配,直到等待超时或者等待时被中断,将会返回null
。如果被匹配成功,将会返回传递的数据。最后都会从队列中移除匹配的节点线程 - 如果队列不为空并且尾节点的模式和该线程的模式不一致,表示可以尝试匹配
head
的后继m
。如果m
被匹配了后者被取消了,那么尝试将m
移除队列,随后继续循环与后面的节点匹配,匹配成功之后将会尝试推进head
引用指向,并唤醒被匹配节点内部的线程,最后返回传递的数据。如果后面的匹配过程中队列变成了空或者尾节点的模式和该请求的模式一致,那么将会进入第一种情况
入队或入栈操作
put(E e)
方法
// 尝试生产指定元素,如果此时另一个线程正在等待接收元素,那么传递成功之后返回,否则阻塞该线程,直到另一个线程接收这个元素
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 调用transfer方法,公平或非公平模式调用自己的实现,传递 e、false、0if (transferer.transfer(e, false, 0) == null) {// transfer的返回值如果为 null,表示因为被中断而返回// 那么调用interrupted静态方法重置当前线程的中断状态为falseThread.interrupted();throw new InterruptedException();}
}
offer(E e)
方法
// 尝试生产指定元素
public boolean offer(E e) {if (e == null) throw new NullPointerException();// 调用transfer方法,公平或非公平模式调用自己的实现,传递 e、false、0return transferer.transfer(e, true, 0) != null;
}
add(E e)
方法
public boolean add(E e) {// 实际上调用的 offer 操作if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
出队或出栈操作
take()
方法
// 消费元素,如果此时另一个线程正在等待传递元素,那么接收成功之后返回,否则阻塞该线程,直到另一个传递元素线程的线程来匹配
public E take() throws InterruptedException {// 调用transfer方法,公平或非公平模式调用自己的实现E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();
}
poll()
方法
// 尝试消费元素
public E poll() {// 调用transfer方法,公平或非公平模式调用自己的实现return transferer.transfer(null, true, 0);
}
remove()
方法
// 尝试消费元素
public E remove() {// 直接调用poll方法,获取返回值xE x = poll();if (x != null)return x;elsethrow new NoSuchElementException();
}
SynchronousQueue
总结
SynchronousQueue
中并没有专门用来存储元素的容器,内部的双重栈或双重队列被用来存储等待状态的线程。公平模式将会使用双重队列结构(单向链表实现),遵循先进先出的顺序;而非公平模式则使用双重栈结构(单向链表实现),遵循先进后出的顺序SynchronousQueue
内部使用了volatile + CAS
操作来保证线程安全,以及LockSupport
的park()
和unpark()
方法控制线程的组合和唤醒,并没有使用锁,因此SynchronousQueue
非常适合高并发的环境SynchronousQueue
的生产和消费线程都必须要等待一个匹配的线程才能返回,这适用于需要同步回调机制的接口,即如果另一请求传递或者接收了数据,那么另外一个等待的请求也同时返回
SynchronousQueue
示例
非公平模式
线程
put1
执行put(1)
操作,由于当前没有配对的消费线程,所以put1
线程入栈,自旋一小会后睡眠等待,这时栈状态如下
接着,线程
put2
再次执行了put(2)
操作,跟前面一样,put2
线程入栈,自旋一小会后睡眠等待。这时栈状态如下
这时候,来了一个线程
take1
,执行了take
操作,这时候发现栈顶为put2
线程,匹配成功,但是实现会先把take1
线程入栈,然后take1
线程循环执行匹配put2
线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向put1
线程
最后,再来一个线程
take2
,执行take
操作,这跟步骤3
的逻辑基本是一致的,take2
线程入栈,然后在循环中匹配put1
线程,最终全部匹配完毕,栈变为空,恢复初始状态,如下图所示
可以从上面流程看出,虽然
put1
线程先入栈了,但是却是后匹配,这就是非公平的由来
公平模式
初始化时,
TransferQueue
的状态如下
线程
put1
执行put(1)
操作,由于当前没有配对的消费线程,所以put1
线程入队列,自旋一小会后睡眠等待,这时队列状态如下
接着,线程
put2
执行了put(2)
操作,跟前面一样,put2
线程入队列,自旋一小会后睡眠等待,这时队列状态如下
这时候,来了一个线程
take1
,执行了take
操作,由于head.next
指向put1
线程,put1
线程跟take1
线程配对了,这时take1
线程是不需要入队的执行后
put1
线程被唤醒,take1
线程的take()
方法返回了1
(put1
线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下
最后,再来一个线程
take2
,执行take
操作,这时候只有put2
线程在等候,而且两个线程匹配上了,线程put2
被唤醒,take2
线程take
操作返回了2
(线程put2
的数据),这时候队列又回到了起点,如下所示
以上便是公平模式下,
SynchronousQueue
的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则
生产者-消费者模型之集合SynchronousQueue源码解读相关推荐
- Pseudo-document-based Topic Model(基于伪文档的主题模型)的理解以及源码解读
本文作者:合肥工业大学 管理学院 钱洋 email:1563178220@qq.com 内容可能有不到之处,欢迎交流. 未经本人允许禁止转载. 论文来源 Zuo Y, Wu J, Zhang H, e ...
- 从 Netpoll 中寻找 BIO/NIO 编程模型的对比 | Netpoll 源码解读
前言 最近在阅读<Go 组件设计与实现>这本小册,其中让我很感兴趣的一点是为什么在字节开源中间件团队 CloudWeGo 所开发的网络库 Netpoll 中使用了 NIO 模型,而没有使用 ...
- 判别模型的玻尔兹曼机论文源码解读
前言 三号要去参加CAD/CG会议,投了一篇关于使用生成模型和判别模型的RBM做运动捕捉数据风格识别的论文.这段时间一直搞卷积RBM了,差点把原来的实验内容都忘记了,这里复习一下判别式玻尔兹曼机的训练 ...
- java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...
导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...
- mysql服务器多线程模型_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码 - 陈彦斌 - 博客园...
导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...
- 互斥锁、共享内存方式以及生产者消费者模型
守护进程 1.守护进程的概念 进程指的是一个正在运行的程序,守护进程也是一个普通进程 意思就是一个进程可以守护另一个进程 import time from multiprocessing import ...
- 【Java 并发编程】多线程、线程同步、死锁、线程间通信(生产者消费者模型)、可重入锁、线程池
并发编程(Concurrent Programming) 进程(Process).线程(Thread).线程的串行 多线程 多线程的原理 多线程的优缺点 Java并发编程 默认线程 开启新线程 `Ru ...
- 爬虫--05:多线程与生产者消费者模型
Crawler - 05: Multithreading- und Produzenten-Verbrauchermodell 多线程 一.多线程的基本介绍 1.介绍 2.程序中模拟多任务 二.创建多 ...
- 多线程-生产者-消费者模型
一.前言 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例.该问题描 ...
- Python网络爬虫3 - 生产者消费者模型爬取某金融网站数据
博客首发于www.litreily.top 应一位金融圈的朋友所托,帮忙写个爬虫,帮他爬取中国期货行业协议网站中所有金融机构的从业人员信息.网站数据的获取本身比较简单,但是为了学习一些新的爬虫方法和技 ...
最新文章
- golang database/sql包 简介
- 【*2000】【2018-2019 ICPC, NEERC, Southern Subregional Contest C 】Cloud Computing
- Flask实战2问答平台--导航条
- LeetCode 1248. 统计「优美子数组」(要复习)
- 一个实时精准触达系统的自我修养
- 技术专家:为什么我们最终选择Apache Pulsar替代Kafka?
- java的dom4j怎么调_dom4j.jar 的调试方法
- devc中文注释显示问号_Python零基础入门-(如何让人读懂你的代码)文档注释
- 黄聪:VS2008的动、静态编译[转]
- Java并发编程(三)什么是线程池
- 1043 Is It a Binary Search Tree (25 分) BST反转?不反转 遍历+vector
- vue2使用脚手架配置prettier报错:‘prettier/prettier‘: context.getPhysicalFilename is not a function
- LinuxCentOS 7镜像下载
- 免费备案查询API,支持通过主办单位名称查询备案信息
- 2022年新型智慧城市整体规划建设方案
- 【web渗透思路】任意账号的注册、登录、重置、查看
- 如何用Python写一个安卓APP
- Java写计算器自闭了
- 个人总结对团队的贡献Android方面
- 使用了flink官方示例,尽然提交任务后报错了
热门文章
- 翻译:使用 AWS Deep Racer 的日志分析工具
- 算法:Reverse Linked List
- 多个 本地仓库_老板逼我用 Git,本地指令介绍
- 编程计算二叉树中某结点的层数
- Ubuntu16.04实现定时免密远程拷贝脚本
- 继承、关联、聚合、组合的代码表示
- linux的abrt目录满了,linux:abrt-cli list
- WDSR:Wide Activation for Efficient and Accurate Image Super-Resolution
- 现代通信原理3.2:线性系统的时域与频域特性
- Spring Cloud随记----远程配置文件资源库的建立-涉及一些简单的git操作