Guarded Suspension模式 等待唤醒机制的规范

  • 1. Guarded Suspension 模式
  • 2. 扩展 Guarded Suspension 模式
  • 3. Dubbo 源码分析
  • 4. 总结

  前不久,同事小灰工作中遇到一个问题,他开发了一个 Web 项目:Web 版的文件浏览器,通过它用户可以在浏览器里查看服务器上的目录和文件。这个项目依赖运维部门提供的文件浏览服务,而这个文件浏览服务只支持消息队列(MQ)方式接入。消息队列在互联网大厂中用的非常多,主要用作流量削峰和系统解耦。在这种接入方式中,发送消息和消费结果这两个操作之间是异步的,你可以参考下面的示意图来理解。

  在小灰的这个 Web 项目中,用户通过浏览器发过来一个请求,会被转换成一个异步消息发送给 MQ,等 MQ 返回结果后,再将这个结果返回至浏览器。小灰同学的问题是:给 MQ 发送消息的线程是处理 Web 请求的线程 T1,但消费 MQ 结果的线程并不是线程 T1,那线程 T1 如何等待 MQ 的返回结果呢?为了便于你理解这个场景,我将其代码化了,示例代码如下:

public class ViewFile {/*** 发送消息* @param message*/void sendMsg(Message message) {}/*** MQ消息返回后会调用该方法, 该方法的执行线程不同于发送消息的线程* @param message*/void onMessage(Message message) {}/*** 处理浏览器发来的请求*/void handWebReq() {Message message = new Message(1L, "...");sendMsg(message);String result = "..."; // 处理浏览器发来的请求}static class Message {private Long id;private String content;public Message() {}public Message(Long id, String content) {this.id = id;this.content = content;}}
}

  

1. Guarded Suspension 模式

  上面小灰遇到的问题,在现实世界里比比皆是,只是我们一不小心就忽略了。比如,项目组团建要外出聚餐,我们提前预订了一个包间,然后兴冲冲地奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们:“您预订的包间服务员正在收拾,请您稍等片刻。”过了一会,大堂经理发现包间已经收拾完了,于是马上带我们去包间就餐。

  我们等待包间收拾完的这个过程和小灰遇到的等待 MQ 返回消息本质上是一样的,都是等待一个条件满足:就餐需要等待包间收拾完,小灰的程序里要等待 MQ 返回消息。

  那我们来看看现实世界里是如何解决这类问题的呢?现实世界里大堂经理这个角色很重要,我们是否等待,完全是由他来协调的。通过类比,相信你也一定有思路了:我们的程序里,也需要这样一个大堂经理。的确是这样,那程序世界里的大堂经理该如何设计呢?其实设计方案前人早就搞定了,而且还将其总结成了一个设计模式:Guarded Suspension。所谓 Guarded Suspension,直译过来就是“保护性地暂停”。那下面我们就来看看,Guarded Suspension 模式是如何模拟大堂经理进行保护性地暂停的。

  下图就是 Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject,内部有一个成员变量——受保护的对象,以及两个成员方法——get(Predicate p)和onChanged(T obj)方法。其中,对象 GuardedObject 就是我们前面提到的大堂经理,受保护对象就是餐厅里面的包间;受保护对象的 get() 方法对应的是我们的就餐,就餐的前提条件是包间已经收拾好了,参数 p 就是用来描述这个前提条件的;受保护对象的 onChanged() 方法对应的是服务员把包间收拾好了,通过 onChanged() 方法可以 fire 一个事件,而这个事件往往能改变前提条件 p 的计算结果。下图中,左侧的绿色线程就是需要就餐的顾客,而右侧的蓝色线程就是收拾包间的服务员。

  GuardedObject 的内部实现非常简单,是管程的一个经典用法,你可以参考下面的示例代码,核心是:get() 方法通过条件变量的 await() 方法实现等待,onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能。逻辑还是很简单的,所以这里就不再详细介绍了。

public class GuardedObject<T> {private T obj;private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();private final long timeOut = 1;/*** 获取受保护对象* @param p* @return*/T get(Predicate<T> p) {lock.lock();try {while (p.test(obj)) {done.await(timeOut, TimeUnit.SECONDS);}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}return obj;}/*** 事件通知方法* @param obj*/void onChange(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}}
}

  

2. 扩展 Guarded Suspension 模式

  上面我们介绍了 Guarded Suspension 模式及其实现,这个模式能够模拟现实世界里大堂经理的角色,那现在我们再来看看这个“大堂经理”能否解决小灰同学遇到的问题。

  Guarded Suspension 模式里 GuardedObject 有两个核心方法,一个是 get() 方法,一个是 onChanged() 方法。很显然,在处理 Web 请求的方法 handleWebReq() 中,可以调用 GuardedObject 的 get() 方法来实现等待;在 MQ 消息的消费方法 onMessage() 中,可以调用 GuardedObject 的 onChanged() 方法来实现唤醒。

public class ViewFile {/*** 发送消息* @param message*/void sendMsg(Message message) {}/*** MQ消息返回后会调用该方法, 该方法的执行线程不同于发送消息的线程* @param message*/void onMessage(Message message) {GuardedObject<Message> go = null; // 如何找到匹配的go?go.onChange(message);}/*** 处理浏览器发来的请求*/void handWebReq() {Message message = new Message(1L, "...");sendMsg(message);GuardedObject<Message> go = new GuardedObject<>();Message msm = go.get(Objects::nonNull);onMessage(msm);}
}

  但是在实现的时候会遇到一个问题,handleWebReq() 里面创建了 GuardedObject 对象的实例 go,并调用其 get() 方等待结果,那在 onMessage() 方法中,如何才能够找到匹配的 GuardedObject 对象呢?这个过程类似服务员告诉大堂经理某某包间已经收拾好了,大堂经理如何根据包间找到就餐的人。现实世界里,大堂经理的头脑中,有包间和就餐人之间的关系图,所以服务员说完之后大堂经理立刻就能把就餐人找出来。

  我们可以参考大堂经理识别就餐人的办法,来扩展一下 Guarded Suspension 模式,从而使它能够很方便地解决小灰同学的问题。在小灰的程序中,每个发送到 MQ 的消息,都有一个唯一性的属性 id,所以我们可以维护一个 MQ 消息 id 和 GuardedObject 对象实例的关系,这个关系可以类比大堂经理大脑里维护的包间和就餐人的关系。

  有了这个关系,我们来看看具体如何实现。下面的示例代码是扩展 Guarded Suspension 模式的实现,扩展后的 GuardedObject 内部维护了一个 Map,其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 对象实例,同时增加了静态方法 create() 和 fireEvent();create() 方法用来创建一个 GuardedObject 对象实例,并根据 key 值将其加入到 Map 中,而 fireEvent() 方法则是模拟的大堂经理根据包间找就餐人的逻辑。

public class GuardedObject<T> {private T obj;private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();private final long timeOut = 1;private static final ConcurrentHashMap<Object, GuardedObject> map = new ConcurrentHashMap();/*** 静态方法创建GuardedObject* @param key* @param <K>* @return*/static <K> GuardedObject create(K key) {GuardedObject go = new GuardedObject();map.put(key, go);return go;}/*** 点燃事件* @param key* @param obj* @param <K>* @param <T>*/static <K, T> void fireEvent(K key, T obj) {GuardedObject go = map.remove(key);if (go != null) {go.onChange(obj);}}/*** 获取受保护对象* @param p* @return*/T get(Predicate<T> p) {lock.lock();try {while (p.test(obj)) {done.await(timeOut, TimeUnit.SECONDS);}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}return obj;}/*** 事件通知方法* @param obj*/void onChange(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}}
}

  这样利用扩展后的 GuardedObject 来解决小灰同学的问题就很简单了,具体代码如下所示。

public class ViewFile {/*** 发送消息* @param message*/void sendMsg(Message message) {}/*** MQ消息返回后会调用该方法, 该方法的执行线程不同于发送消息的线程* @param message*/void onMessage(Message message) {GuardedObject.fireEvent(message.id, message);}/*** 处理浏览器发来的请求*/void handWebReq() {Long id = 1L; // id 生成器Message message = new Message(id, "...");GuardedObject<Message> go = GuardedObject.create(id);sendMsg(message);Message msm = go.get(Objects::nonNull);onMessage(msm);}static class Message {private Long id;private String content;public Message() {}public Message(Long id, String content) {this.id = id;this.content = content;}}
}

  

3. Dubbo 源码分析

  其实在编程领域,异步的场景还是挺多的,比如 TCP 协议本身就是异步的,我们工作中经常用到的 RPC 调用,在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的。可能你会觉得奇怪,平时工作中的 RPC 调用大多数都是同步的啊?这是怎么回事呢?

  其实很简单,一定是有人帮你做了异步转同步的事情。例如目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情,那它是怎么做的呢?下面我们就来分析一下 Dubbo 的相关源码。

  于下面一个简单的 RPC 调用,默认情况下 sayHello() 方法,是个同步方法,也就是说,执行 service.sayHello(“dubbo”) 的时候,线程会停下来等结果。


DemoService service = 初始化部分省略
String message = service.sayHello("dubbo");
System.out.println(message);

  如果此时你将调用线程 dump 出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是 TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明 Dubbo 帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在 DefaultFuture.get() 方法上,所以可以推断:Dubbo 异步转同步的功能应该是通过 DefaultFuture 这个类实现的。


  不过为了理清前后关系,还是有必要分析一下调用 DefaultFuture.get() 之前发生了什么。DubboInvoker 的 108 行调用了 DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了 request(inv, timeout) 方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。


public class DubboInvoker{Result doInvoke(Invocation inv){// 下面这行就是源码中108行// 为了便于展示,做了修改return currentClient .request(inv, timeout).get();}
}

  DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待 - 通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。

// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();// 调用方通过该方法等待结果
Object get(int timeout) {long start = System.nanoTime();lock.lock();try {while (!isDone()) {done.await(timeout);long cur = System.nanoTime();if (isDone() || cur - start > timeout) {break;}}} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException();}return returnFromResponse();
}// RPC结果是否已经返回
boolean isDone() {return response != null;
}// RPC结果返回时调用该方法
private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {done.signalAll();}} finally {lock.unlock();}
}

  调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。

  当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。

  

4. 总结

  Guarded Suspension 模式本质上是一种等待唤醒机制的实现,只不过 Guarded Suspension 模式将其规范化了。规范化的好处是你无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个 Bug 来。但 Guarded Suspension 模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本篇文章就直接对 GuardedObject 的功能进行了增强,Dubbo 中 DefaultFuture 这个类也是采用的这种方式,你可以对比着来看,相信对 DefaultFuture 的实现原理会理解得更透彻。当然,你也可以创建新的类来实现对 Guarded Suspension 模式的扩展。

  Guarded Suspension 模式也常被称作 Guarded Wait 模式、Spin Lock 模式(因为使用了 while 循环去等待),这些名字都很形象,不过它还有一个更形象的非官方名字:多线程版本的 if。单线程场景中,if 语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,那就没有其他活动线程了,这意味着 if 判断条件的结果也不会发生变化了。但是多线程场景中,等待就变得有意义了,这种场景下,if 判断条件的结果是可能发生变化的。所以,用“多线程版本的 if”来理解这个模式会更简单。

33 - Guarded Suspension模式 等待唤醒机制的规范相关推荐

  1. 实战并发编程 - 08基于Guarded Suspension模式优化轮询while(true)

    文章目录 Guarded Suspension模式简介 看牙医的就诊流程 代码举例 总结与拓展 Guarded Suspension模式简介 guarded在这里是"保护"的意思: ...

  2. Java——线程锁,死锁,等待唤醒机制

    一.线程锁 线程安全问题 其实,线程安全问题都是由全局变量及静态变量引起的.若每个线程中对全局变量.静态变量只有读操作,而无写操作,一般来说,这个全局变量是线程安全的:若有多个线程同时执行写操作,一般 ...

  3. Java并发编程实战~Guarded Suspension模式

    Guarded Suspension 模式 比如,项目组团建要外出聚餐,我们提前预订了一个包间,然后兴冲冲地奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们:"您预订的 ...

  4. java基础提升(二):多线程、线程安全、线程状态、等待唤醒机制、线程池

    目录 一. 多线程 1.1并发与并行 1.2 线程与进程 1.3 创建线程类 1.3.1 方式一:继承Thread类 1.3.2 方式二:实现Runnable接口 1.3.3 Thread和Runna ...

  5. Java线程等待唤醒机制(加深理解)

    今天看源码的时候遇到这样一个场景,某线程里面的逻辑需要等待异步处理结果返回后才能继续执行.或者说想要把一个异步的操作封装成一个同步的过程.这里就用到了线程等待唤醒机制,下面具体看一下. 等待唤醒机制示 ...

  6. 24.多线程(等待唤醒机制,volatile,CAS 算法,线程池,定时器,设计模式)

    1.线程间的等待唤醒机制 Object 类中   void wait ()  在其他线程调用此对象的 notify () 方法或 notifyAll () 方法前,导致当前线程等待.         ...

  7. java sleep唤醒_Java中的等待唤醒机制—至少50%的工程师还没掌握!

    Java中的等待唤醒机制-至少50%的工程师还没掌握! 发布时间:2019-12-14 01:53, 浏览次数:222 , 标签: Java 这是一篇走心的填坑笔记,自学Java的几年总是在不断学习新 ...

  8. Java多线程02(线程安全、线程同步、等待唤醒机制)

    Java多线程2(线程安全.线程同步.等待唤醒机制.单例设计模式) 1.线程安全 如果有多个线程在同时运行,而这些线程可能会同时运行这段代码.程序每次运行结果和单线程运行的结果是一样的,而且其他的变量 ...

  9. 主线程 唤醒_Java等待唤醒机制统计子线程运行时间的方式及其疑问

    我想在主线程中获取子线程运行的时间,一种方式是使用join()方法,经验证是可行的: 但是我想试试等待唤醒机制,思路是:子线程启动后主线程等待,子线程结束后唤醒主线程,但是不太清楚为什么会报错,从运行 ...

  10. 27_多线程_第27天(线程安全、线程同步、等待唤醒机制、单例设计模式)_讲义...

    今日内容介绍 1.多线程安全问题 2.等待唤醒机制 01线程操作共享数据的安全问题 *A:线程操作共享数据的安全问题如果有多个线程在同时运行,而这些线程可能会同时运行这段代码.程序每次运行结果和单线程 ...

最新文章

  1. Mysql 索引原理及优化
  2. BC547 晶体管初步测试
  3. 从阿里云下载图片到本地
  4. 在centos下报错:-bash: apt-get: 未找到命令
  5. Android持久化存储(3)SQLite数据库的使用
  6. oracle JOB 查询 添加 修改 删除
  7. full outer join 与full join的区别_基础小白的SQL的JOIN语法解析
  8. ubuntu安装ffmpeg_手把手教你利用ffmpeg制作一个好用灵活的图片、视频压缩工具(再不需要去别的网站和用别的软件啦)...
  9. PHP获取汉字笔画数功能
  10. Python每日一练-----快乐数
  11. 魔改一波合成大西瓜!代码已开源~
  12. 京东上什么卖得最好?
  13. SyntaxError: can't assign to operator
  14. BUUCTF MISC刷题笔记(三)
  15. matplotlib 绘图 中文乱码 0.5
  16. A later version of Node.js is already installed. Setup willnow exit.
  17. 一款vista边栏Gadgets汉英翻译(翻译14种语言的边栏工具下载)
  18. Android Gradle权威指南
  19. 【项目笔记】布局文件报错Suspicious size: this will make the view invisible, probably intended for layout_width
  20. Python sqlalchemy 连接常用的数据库

热门文章

  1. Golang面试题整理
  2. 程序员必备网络基础知识清单,简单易懂
  3. 没有 本地计算机策略组,本地组策略编辑器没有mmc
  4. GD32F130之LVD低压检测
  5. 一个是阆苑仙葩,一个是美玉无瑕
  6. 9月25日百度大脑开放日人像特效专场火热报名中!
  7. 百度大脑大升级:各种算法并驾齐驱
  8. 山无棱-天地合-乃敢与君绝
  9. Beaglebone Black–智能家居控制系统 LAS - 用 UART 连接 ESP8266 (ESP-01 版)
  10. CodeVs 3315 时空跳跃者的魔法(最终版本)