本人是本文的作者,首发于ifeve(非阻塞同步算法实战(二)-BoundlessCyclicBarrier)


前言

相比上一 篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。

注:该工具类已分享到https://github.com/trytocatch/concurrent-tools

需求介绍

我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。

该工具同时还维护一个版本号,await方法可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。

如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。

因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。

问题分析

简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。

至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。

解决方案

以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。

因为较为复杂,下面先给出完整的代码,再讲解其中的关键。

注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一个由调用者维护版本号。

/*** @author trytocatch@163.com* @time 2013-1-31*/
public class BoundlessCyclicBarrier {protected final AtomicReference<VersionQueue> waitQueueRef;public BoundlessCyclicBarrier() {this(0);}public BoundlessCyclicBarrier(int startVersion) {waitQueueRef = new AtomicReference<VersionQueue>(new VersionQueue(startVersion));}public final void awaitWithAssignedVersion(int myVersion)throws InterruptedException {awaitImpl(true, myVersion, 0);}/**** @param myVersion* @param nanosTimeout* @return if timeout, or be canceled and doesn't reach myVersion, returns false* @throws InterruptedException*/public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {return awaitImpl(true, myVersion, nanosTimeout);}public final void await() throws InterruptedException {awaitImpl(false, 0, 0);}/**** @param nanosTimeout* @return if and only if timeout, returns false* @throws InterruptedException*/public final boolean await(long nanosTimeout)throws InterruptedException {return awaitImpl(false, 0, nanosTimeout);}/*** pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier)* @return old queue version*/public int nextCycle() {VersionQueue oldQueue = waitQueueRef.get();VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);for(;;){if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {for (Thread t : oldQueue.queue)LockSupport.unpark(t);break;}oldQueue = waitQueueRef.get();newQueue.version = oldQueue.version + 1;}return oldQueue.version;}/*** pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)* @param newAssignVersion*/public void nextCycle(int newAssignVersion) {VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));for (Thread t : oldQueue.queue)LockSupport.unpark(t);}/*** if version update has stopped, invoke this to awake all threads*/public void cancel(){VersionQueue oldQueue = waitQueueRef.get();if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {for (Thread t : oldQueue.queue)LockSupport.unpark(t);}public final int getVersion() {return waitQueueRef.get().version;}private static final class VersionQueue {final private ConcurrentLinkedQueue queue;int version;final boolean isCancelQueue;VersionQueue(int curVersion){this(curVersion, false);}VersionQueue(int curVersion, boolean isCancelQueue) {this.version = curVersion;this.isCancelQueue = isCancelQueue;queue = new ConcurrentLinkedQueue();}}/**** @param assignVersion is myVersion available* @param myVersion wait for this version* @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid) * @return if timeout, or be canceled and doesn't reach myVersion, returns false * @throws InterruptedException */ protected boolean awaitImpl(boolean assignVersion, int myVersion, long nanosTimeout) throws InterruptedException { boolean timeOutEnable = nanosTimeout > 0;long lastTime = System.nanoTime();VersionQueue newQueue = waitQueueRef.get();//Aif (assignVersion && newQueue.version - myVersion >= 0)return true;while (true) {VersionQueue submitQueue = newQueue;//BsubmitQueue.queue.add(Thread.currentThread());//Cwhile (true) {newQueue = waitQueueRef.get();//Dif (newQueue != submitQueue){//E: it's a new cycleif(assignVersion == false)return true;else if(newQueue.version - myVersion >= 0)return true;else if (newQueue.isCancelQueue)//F: be canceledreturn false;else//just like invoking awaitImpl againbreak;}if (timeOutEnable) {if (nanosTimeout <= 0)return false;LockSupport.parkNanos(this, nanosTimeout);long now = System.nanoTime();nanosTimeout -= now - lastTime;lastTime = now;} elseLockSupport.park(this);if (Thread.interrupted())throw new InterruptedException();}}}
}

代码分析

先分析一下awaitImpl方法,A和D是该方法的关键点,决定着它属于哪一个批次,对应哪一个版本。这里有个小细节,在nexeCycle,cancel解除阻塞时,该线程可能并不在队列中,因为插入队列发生在C处,这在A和D之后(虽然看起来C在D之前,但D取到的queue要在下一次循环时才被当作submitQueue),所以,在E处再进行了一次判断,开始解除阻塞时,旧队列肯定被新队列所替换,newQueue != submitQueue一定为真,就会不调用park进行阻塞了,也就不需要解除阻塞,所以即使解除阻塞时,该线程不在队列中也是没问题的。

再看E处,当进入一个新的cycle时(当前队列与提交的队列不同),a)如果没指定版本,或者到达或超过了指定版本,则返回true;b)如果当前调用了cancel,则当前队列的isCancelQueue将为true,则不继续傻等,返回false;c)或者还未到达指定版本,break,插入到当前队列中,继续等待指定版本的到达。

如果没有进入E处的IF内,则当前线程会被阻塞,直到超时,然后返回false;或被中断,然后抛出InterruptedException;或被解除阻塞,重新进行E处的判定。

这里还有个小细节,既然cancel时,把当前的队列设置了isCancelQueue,那么之后指定版本的await会不会也直接返回了呢?其实不会的,因为它若要执行F处的判断,则先必需通过E处的判定,这意味着,当前队列已经不是提交时的那个设置了isCancelQueue的队列了。

代码中对于cancel的处理,其实并不保证cancel后,之前的await都会被解除阻塞并返回,如果cancel后,紧接着又调用了nextCycle,那么可能某线程感知不到cancel的调用,唤醒后又继续等待指定的版本。cancel的目的是在于不让线程傻等,既然恢复版本更新了,那就继续等待吧。

如果自己维护版本号,则应该保证递增。另外,版本号的设计,考虑到了int溢出的情况,版本的前后判断,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,这样,版本号就相当于循环使用了,只要两个比较的版本号的差不超过int的最大值,那么都是正确的,int的最大值可是20多亿,几乎不可能出现跨度这么大的两个版本号的比较,所以,认为它是正确的。

小结

本文讲到了一个非阻塞同步算法设计时的小技巧,如果多个变量之间要维护某种特定关系,那么可以将它们封装到一个类中,再用CAS更新这个类的引用,这样就达到了:要么都被更新,要么都没被更新,保持了多个变量之间的一致性。同时需要注意的是,每次更新都必需创建新的包装对象,假如有其它更好的办法,应该避免使用该方法。

转载于:https://www.cnblogs.com/trytocatch/p/boundlesscyclicbarrier.html

非阻塞同步算法实战(二)-BoundlessCyclicBarrier相关推荐

  1. linux驱动 阻塞和非阻塞IO 篇二

    @上一篇介绍了linux阻塞与非阻塞的基本概念,以及应用程序的小demo和kernel层对应的api函数.那接下来就以实例来分析,如何在linux驱动层添加等待队列和轮询的方法,以及区别. ** 一: ...

  2. Linux非阻塞IO(二)网络编程中非阻塞IO与IO复用模型结合

    上文描述了最简易的非阻塞IO,采用的是轮询的方式,这节我们使用IO复用模型. 阻塞IO 过去我们使用IO复用与阻塞IO结合的时候,IO复用模型起到的作用是并发监听多个fd. 以简单的回射服务器为例,我 ...

  3. 非阻塞同步算法与CAS(Compare and Swap)无锁算法

    锁(lock)的代价 锁是用来做并发最简单的方式,当然其代价也是最高的.内核态的锁的时候需要操作系统进行一次上下文切换,加锁.释放锁会导致比较多的上下文切换和调度延时,等待锁的线程会被挂起直至锁释放. ...

  4. Verilog之非阻塞赋值(二)——赋值延后一个周期

    阻塞与非阻塞赋值,当在always块中的每一个条件分支中,仅有一条赋值语句(不管是阻塞与非阻塞,且要满足条件中的条件判断式不含有在本模块中定义并赋值的reg变量(采用非阻塞赋值)),那么阻塞和非阻塞都 ...

  5. [Python]再学 socket 之非阻塞 Server

    再学 socket 之非阻塞 Server 本文是基于 python2.7 实现,运行于 Mac 系统下 本篇文章是上一篇初探 socket 的续集, 上一篇文章介绍了:如何建立起一个基本的 sock ...

  6. tornado异步请求非阻塞

    前言也许有同学很迷惑:tornado不是标榜异步非阻塞解决10K问题的嘛?但是我却发现不是torando不好,而是你用错了 比如最近发现一个事情:某网 前言 也许有同学很迷惑:tornado不是标榜异 ...

  7. 使用tornado让你的请求异步非阻塞

    2019独角兽企业重金招聘Python工程师标准>>> 前言 也许有同学很迷惑:tornado不是标榜异步非阻塞解决10K问题的嘛?但是我却发现不是torando不好,而是你用错了. ...

  8. Verilog之非阻塞赋值(三)—— 赋值延后一个周期

    总结:(一.二为一组,不延后:三.四为一组,延后1周期) 在Verilog之非阻塞赋值(二)中,相关说法不全面,因为文本编辑器不支持更改,故完善之后,将此文作为第三部分 前提:always块描述的时序 ...

  9. 驱动程序开发:阻塞与非阻塞IO

    这里写自定义目录标题 一.关于阻塞与非阻塞IO的基础 Ⅰ.阻塞IO访问 Ⅱ.非阻塞IO访问 二.实验(根据上一篇按键中断实验改) Ⅰ.阻塞方式实验 Ⅱ.非阻塞方式实验 1.驱动程序 2.APP应用程序 ...

最新文章

  1. 0x22.搜索 - 深度优先搜索
  2. python常见异常
  3. 计算机辅助语言和计算语言学关系,建构主义理论视角下计算机辅助语言学习环境对留学生学习汉语动机的影响——以广西大学泰国留学生为例-语言学及应用语言学专业论文.docx...
  4. linux笔记_文件搜索命令
  5. query builder python-elasticsearch返回指定字段
  6. WIN7打补丁后VS2012出现版本不兼容
  7. python与中文的那点事
  8. 括号匹配问题(九度教程第 26 题)
  9. C++操作符的优先级
  10. sp2 xp 英文版序列号_64位 Windows XP SP2 VOL+简体中文语言包+序列号
  11. 超频到3200最佳时序_ddr4内存时序多少为好
  12. python 线程锁_Python线程锁的实现
  13. [笔记]光照系统 实时GI、烘焙GI
  14. 经典书籍《征服市场的人》阅读心得
  15. android磁场传感器页面布局在哪,教程:Android传感器—传感器查询demo
  16. 解决安装软件出现错误Error 1935安装程序集
  17. html a去掉下划线
  18. 计算机视觉领域的杰出人物,计算机视觉领域的大牛们
  19. 互联网时代,学什么专业就业好?
  20. 饥荒创建账号服务器,饥荒服务器搭建详细图文教程 饥荒怎么创建服务器

热门文章

  1. 常见面试算法:Logistic回归、树回归
  2. php服务器怎么保活,think-queue消息队列
  3. 计算机应用理论题计算机系统,计算机应用理论题.doc
  4. c语言printf打印字符串,puts()vs printf()用于以C语言打印字符串
  5. 安装pattern出错mysql_config not found
  6. ci php做记录删除,PHP CI APC 使用记录
  7. 聊聊Election Algorithms
  8. 原生Servlet 上传文件
  9. 深度学习(deep learning)优化调参细节(trick)
  10. Java 使用Commons-fileupload组件实现上传