Future模式详解
1 Future详解
1.1 Future模式
Future模式是多线程开发中常见的设计模式,它的核心思想是异步调用。对于Future模式来说,它无法立即返回你需要的数据,但是它会返回一个契约,将来你可以凭借这个契约去获取你需要的信息。
这是传统的同步方法,调用一段耗时的程序。当客户端发出call请求,这个请求需要很长的一段时间才会返回,客户端一直在等待直到数据返回随后再进行其他任务的处理。
而使用Future模式:从Data_Future对象可以看到虽然call
本身任然需要一段很长时间处理程序。但是服务程序并不等数据处理完成便立即返回客户端一个伪造的数据(如:商品的订单,而不是商品本身),实现了Future模式的客户端在得到这个返回结果后并不急于对其进行处理而是调用其他业务逻辑,充分利用等待时间,这就是Future模式的核心所在。在完成其他业务处理后,最后再使用返回比较慢的Future
数据。这样在整个调用中就不存在无谓的等待,充分利用所有的时间片,从而提高了系统响应速度。
1.2 Future模式的主要角色
一个非常简单的 Future 模式的实现,它的主要参与者如表所示。
参与者 | 作用 |
---|---|
Main | 系统启动,调用Client发出请求 |
Client | 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData |
Data | 返回数据的接口 |
FutureData | FutrueData是一个虚拟的数据,构造很快,需要装配RealData |
RealData | 真实数据,构造比较慢 |
1.3 Future模式的简单实现
- 传参获取执行任务,获取到执行的任务后
- 新起一个线程执行任务,并把任务线程方法的线程阻塞住
- 当任务执行完成,唤醒任务线程,回调返回方法,获取执行结果
Future
模式的实现中,有一个核心接口Data
,这就是客户端希望获取的数据。这个Data
接口有两个重要的实现,一个是RealData
,也就是真实数据,这就是最终需要获得的、有价值的信息。另外一个就是FutureData
,它是用来提取RealData
的一个“凭据”,FutureData
可以立即返回。
下面是Data接口:
FutureData
实现了一个快速返回的RealData
包装。它只是一个包装,或者说是一个RealData
的虚拟实现。因此,它可以很快被构造并返回。
FutureData
是Future模式的关键。它实际上是真实数据RealData
的代理,封装了获取RealData
的等待过程。
当使用FutrueData
的getResult()
方法时,如果实际的数据没有准备好,那么程序就会阻塞,等RealData
准备好并注入FutureData
中才最终返回数据。
RealData
是最终需要使用的数据模型,它的构造很慢。用sleep()
函数模拟这个过程,简单地模拟一个字符串的构造。
接下来就是客户端程序,Client
主要实现了获取FutureData
,开启构造RealData
的线程,并在接受请求后,很快返回FutureData
。
注意,它不会等待数据真的构造完毕再返回,而是立即返回
FutureData
,即使这个时候FutureData
内并没有真实数据。
最后,就是主函数Main
,它主要负责调用Client
发起请求,并消费返回的数据。
执行结果:
1.4 JDK中的Future模式
Future模式很常用,在JDK中Future模式有一套完整的实现。Future
接口类似于上面的“凭据”接口或者说契约。通过它可以得到真实的数据。RunnableFuture
实现了Future
和Runnable
接口,其中run
方法用于构造真实的数据。它有个具体的实现类:FuntureTask
类。
Future
接口提供了一些简单的控制功能:
public boolean cancel(boolean mayInterruptIfRunning) ;//取消任务
public boolean isCancelled();//是否已经取消
public boolean isDone();//是否已经完成
public V get() throws InterruptedException, ExecutionException;//取得返回对象
public V get(long timeout, TimeUnit unit);//取得返回对,可以设置超时时间
FutureTask源码分析:
- 成员变量:
state | 含义 |
---|---|
NEW | 表示新创建状态,任务尚未执行 |
COMPLETING | 表示当前任务即将结束,但是还未完全结束,返回值还未写入,处于一种临界状态 |
NORMAL | 表示当前任务处于正常结束状态(没有发生异常,中断,取消) |
EXCEPTIONAL | 表示当前任务因为出现异常而中断,处于非正常结束状态 |
CANCELLED | 表示当前任务因调用cancel而处于被取消状态 |
INTERRUPTING | 表示当前任务处于中断中,但是还没有完全中断的阶段 |
INTERRUPTED | 表示当前任务处于已完全中断的阶段 |
// 表示当前任务的状态
private volatile int state;
// 表示当前任务的状态是新创建的,尚未执行
private static final int NEW = 0;
// 表示当前任务即将结束,还未完全结束,值还未写,一种临界状态
private static final int COMPLETING = 1;
// 表示当前任务正常结束
private static final int NORMAL = 2;
// 表示当前任务执行过程中出现了异常,内部封装的callable.call()向上抛出异常了
private static final int EXCEPTIONAL = 3;
// 表示当前任务被取消
private static final int CANCELLED = 4;
// 表示当前任务中断中
private static final int INTERRUPTING = 5;
// 表示当前任务已中断
private static final int INTERRUPTED = 6;// 我们在使用FutureTask对象的时候,会传入一个Callable实现类或Runnable实现类,这个callable存储的就是
// 传入的Callable实现类或Runnable实现类(Runnable会被使用修饰者设计模式伪装为)callable
//submit(callable/runnable):其中runnable使用了装饰者设计模式伪装成了callable
private Callable<V> callable;// 正常情况下,outcome保存的是任务的返回结果
// 不正常情况下,outcome保存的是任务抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes// 保存的是当前任务执行期间,执行任务的线程的引用
private volatile Thread runner;// 因为会有很多线程去get结果,这里把线程封装成WaitNode,一种数据结构:栈,头插头取
private volatile WaitNode waiters;static final class WaitNode {// 线程对象volatile Thread thread;// 下一个WaitNode结点volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}
- 构造方法
构造方法有两种,一种是只传入一个Callable
对象,Callable
对象返回的结果就是FutureTask
对象返回的结果;另一种是传入一个Runnable
对象和一个泛型变量,其中Runnable
对象会被伪装成Callable
对象、传入的泛型变量就是FutureTask
执行任务后返回的结果。
// 这个构造方法传入一个callable,调用get返回的结果就是callable返回的结果
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;// 设置状态为新创建this.state = NEW; // ensure visibility of callable
}
// 这个构造方法传入一个runnable,一个result变量,调用get方法返回的结果就是传入的result变量
public FutureTask(Runnable runnable, V result) {// 装饰者模式,将runnable转化为callable接口this.callable = Executors.callable(runnable, result);// 设置状态为新创建this.state = NEW; // ensure visibility of callable,
}
成员方法
run()
方法及与其相关的方法run()
里面执行的是当前任务的具体逻辑。里面涉及了setException
方法、set
方法、handlePossibleCancellationInterrupt
方法、finishCompletion
方法。run()
方法:里面会调用Callable对象的**run()
方法任务的具体业务逻辑和一些关于任务状态**、返回结果的逻辑。public void run() {// 当前任务状态不为new直接结束//CAS操作把本线程写入为runner,若runner旧值不为null,说明已经启动过了,直接返回,这里也说明了run()里面的具体逻辑只会被执行一次。//obj :包含要修改的字段对象;//offset :字段在对象内的偏移量;//expect : 字段的期望值;//update :如果该字段的值等于字段的期望值,用于更新字段的新值;if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;// 只有当任务状态为new并且runner旧值为null才会执行到这里try {// 传入的callable任务Callable<V> c = callable;// 当任务不为null并且当前任务状态为新建时才会往下执行// 条件1:防止空指针异常// 条件2:防止外部线程cacle掉当前任务if (c != null && state == NEW) {// 储存任务的返回结果V result;// 储存执行是否成功boolean ran;try {// 调用callable.run()并返回结果result = c.call();// 正常执行设置ran为trueran = true;} catch (Throwable ex) {// callable的run()方法抛出了异常// 设置结果为nullresult = null;// 执行失败设置ran为falseran = false;// 内部设置outcome为抛出的异常,//并且更新任务状态为EXCEPTIONAL(执行过程中出现了异常)并且唤醒阻塞的线程setException(ex);}// 如果执行成功(正常执行)if (ran)// 内部设置outcome为callable执行的结果,并且更新任务的状态为NORMAL(任务正常执行)并且唤醒阻塞的线程set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()// 将当前任务的线程设置为nullrunner = null;// state must be re-read after nulling runner to prevent// leaked interrupts// 当前任务的状态int s = state;// 如果state>=INTERRUPTING,说明当前任务处于中断中或已中断状态if (s >= INTERRUPTING)// 如果当前任务处于中,则执行这个方法线程会不断让出cpu直到任务处于已中断状态handlePossibleCancellationInterrupt(s);}}
setException(Throwable t)
方法如果在执行
run()
方法的过程中出现了异常会执行这个方法,里面具体的逻辑是:- 设置任务的状态为
EXCEPTIONAL
(表示因为出现异常而非正常完成); - 设置
outcome
(返回结果)为Callable
对象的run()
方法抛出的异常; - 执行
finishCompletion()
方法唤醒因为调用get()
方法而陷入阻塞的线程。
protected void setException(Throwable t) {// 如果当前任务的状态是新建状态,则设置任务状态为临界状态(即将完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置outcome(结果)为callable.run()抛出的异常outcome = t;// 设置当前任务的状态为出现中断异常UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 唤醒调用get()的所有等待的线程并清空栈finishCompletion();}}
set(V v)
方法如果执行
run()
方法正常结束(没有出现异常)会执行这个方法,里面的具体逻辑是:- 设置任务的状态为**
NORMAL
**(表示正常结束) - 设置
outcome
(返回结果)为Callable对象调用run()
方法返回的结果 - 唤醒因为调用
get()
方法而陷入阻塞的线程。
protected void set(V v) {// 如果当前任务的状态为新建状态,则设置当前任务的状态为临界状态(即将完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置outcome(结果)为callable.run()返回的结果outcome = v;// 设置当前任务的状态为正常结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 唤醒调用get()的所有等待的线程并清空栈finishCompletion();}}
handlePossibleCancellationInterrupt(int s)
方法这个方法在
run()
方法里可能会执行,当任务的状态为中断中时,抢到cpu的线程会释放cpu资源,直到任务状态更新为已中断状态。private void handlePossibleCancellationInterrupt(int s) {if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt}
finishCompletion()
方法任务执行完成(正常结束和非正常结束都代表任务执行完成)会调用这个方法来唤醒所有因调用
get()
方法而陷入阻塞的线程。private void finishCompletion() {// assert state > COMPLETING;// 如果条件成立,说明当前有陷入阻塞的线程for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 获取当前节点封装的threadThread t = q.thread;// 防止空指针异常if (t != null) {// 设置q.thread为null,方便GCq.thread = null;// 唤醒当前节点封装的线程LockSupport.unpark(t);}// 获取下一个WaitNode节点WaitNode next = q.next;// 如果next为空,说明栈现在已经为空,调用get()陷入阻塞的线程已经全部唤醒,直接breakif (next == null)break;// 执行到这里说明还有因调用get()而陷入阻塞的线程,自旋接着唤醒// 这里q.next设置为null帮助GC(垃圾回收)q.next = null; // unlink to help gc// 下一个WaitNode节点q = next;}// 中断break;}}// 空方法,子类可以重写done();// 将callable设置为null,方便GCcallable = null; // to reduce footprint}
- 设置任务的状态为
get()
方法及与其相关的方法
get()
方法获取的是任务执行完后返回的结果**。对于空参的get()
方法来说,如果任务还没有执行完就有线程调用**get()
方法获取结果,则该线程会进入阻塞状态,阻塞的具体方法是**awaitDone
**方法,我们下面会学习。
public V get() throws InterruptedException, ExecutionException {int s = state;// 判断当前任务的状态是否小于COMPLETING,如果成立说明当前任务的状态要么为新建状态要么为临界状态if (s <= COMPLETING)// 条件成立会调用awaitDone方法自旋等待直到任务完成s = awaitDone(false, 0L);return report(s);}
对于指定时间含参的**get
方法来说,如果在指定时间内没有返回结果**,则会抛出时间超时异常TimeoutException
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}
awaitDone(boolean timed, long nanos)
方法
这个方法是用来阻塞所有因调用get()
方法获取结果但是FutureTask
任务还没有执行完的线程。awaitDone
用来阻塞线程时需要满足的条件:任务还没有执行完;线程调用了get()
方法
// 这个方法的作用是等待任务被完成(正常完成或出现异常完成都算完成),被中断,或是被超时
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;// 这个WaitNode其实存储的是当前线程WaitNode q = null;// 表示当前线程代表的WaitNode对象是否入栈成功boolean queued = false;for (;;) {// 如果当前线程出现中断异常,则将该线程代表的WaitNode结点移出栈并抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 获取当前任务的状态int s = state;// 如果当前任务状态大于COMPLETING,说明当前任务已经有结果了(任务完成、中断、取消),直接返回任务状态if (s > COMPLETING) {if (q != null)// 设置q.thread为null,方便GCq.thread = null;return s;}// 当前任务处于临界状态,即将完成,则当前线程释放cpuelse if (s == COMPLETING) // cannot time out yetThread.yield();// 第一次自旋,如果当前WitNode为null,new一个WaitNode结点else if (q == null)q = new WaitNode();// 第二次自旋,如果当前WaitNode节点没有入队,则尝试入队else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 第三次自旋,到这里表示是否定义了超时时间else if (timed) {nanos = deadline - System.nanoTime();// 超出了指定时间,就移除当前节点并返回任务状态if (nanos <= 0L) {removeWaiter(q);return state;}// 未超出时间,挂起当前线程一定时间LockSupport.parkNanos(this, nanos);}else// 挂起当前线程,该线程会休眠(什么时候该线程会继续执行呢?除非有其他线程调用unpark()或者中断该线程)LockSupport.park(this);}}
removeWaiter
方法:出现中断时,清空栈中的结点。
private void removeWaiter(WaitNode node) {if (node != null) {// 帮助GCnode.thread = null;retry:for (;;) { // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;// 后继节点不为空if (q.thread != null)pred = q;// 前驱结点不为空else if (pred != null) {// 前驱结点的后继结点指向当前结点的后继结点,就相当于将当前节点删去pred.next = s;if (pred.thread == null) // check for racecontinue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}
report(int s)
方法这个方法是真正用来获取任务的返回结果的,这个方法在get()
方法里面会被调用,如果该方法被调用,说明任务已经执行完了。
private V report(int s) throws ExecutionException {// 获取outcome的值Object x = outcome;// 如果当前任务的状态为正常结束,则返回outcome的值if (s == NORMAL)return (V)x;// 如果当前任务的状态 >= CANCELLED 说明当前任务状态为被取消、或是在中断中、或是已经中断完成if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
3.cancel(boolean mayInterruptIfRunning)
方法
public boolean cancel(boolean mayInterruptIfRunning) {// 【1】判断当前任务状态,若state == NEW时根据mayInterruptIfRunning参数值给当前任务状态赋值为INTERRUPTING或CANCELLED// a)当任务状态不为NEW时,说明异步任务已经完成,或抛出异常,或已经被取消,此时直接返回false。// b)当前仅当任务状态为NEW时,此时若mayInterruptIfRunning为true,此时任务状态赋值为INTERRUPTING;否则赋值为CANCELLED。if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exception// 【2】如果mayInterruptIfRunning为true,此时中断执行异步任务的线程runner(执行异步任务时就把执行异步任务的线程就赋值给了runner成员变量)if (mayInterruptIfRunning) {try {//获取当前正在执行的线程,可能是null,是null的情况代表当前任务正在队列中,线程还没有获取到它Thread t = runner;//给Thread一个中断信号,如果程序是响应中断的,则走中断逻辑;如果程序不是响应中断的,则什么也不做if (t != null)// 中断执行异步任务的线程runnert.interrupt();} finally { // final state// 最后任务状态赋值为INTERRUPTEDUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}// 【3】不管mayInterruptIfRunning为true还是false,此时都要调用finishCompletion方法唤醒阻塞的获取异步任务结果的线程并移除线程等待链表节点} finally {finishCompletion();}// 返回truereturn true;
}
- 总结
- FutureTask采用的是异步的执行方式。
- FutureTask对象可以使用**
get()
方法返回执行的结果,如果任务还没有执行完,则调用get()
的线程会陷入**阻塞,直到任务执行完。- FutureTask对象可以调用**
cancel
方法取消任务的执行**。
下面演示这个内置的Future模式的使用:
public class FutureTaskTest1 {public static void main(String[] args) throws InterruptedException, ExecutionException {FutureTask<String> cookTask = new FutureTask<>(new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(1000);return "5斤小龙虾";}});long startTime = System.currentTimeMillis();System.out.println("我点了5斤龙虾");new Thread(cookTask).start();System.out.println("我去买奶茶");TimeUnit.SECONDS.sleep(2);System.out.println("我买到奶茶了");String s = cookTask.get();System.out.println("我的"+s+"已经做好了");System.out.println("我一共用了"+(System.currentTimeMillis() - startTime)/1000+"秒,买了午餐和奶茶");}
}
Future模式详解相关推荐
- Future 模式详解(并发使用)
我觉得很多讲Future模式的文章并没有深刻理解Future模式,其实Future模式只是生产者-消费者模型的扩展.经典"生产者-消费者"模型中消息的生产者不关心消费者何时处理完该 ...
- Future 用法详解
Future 用法详解 前言 其他知识点 Java 多线程基础 深入理解aqs ReentrantLock用法详解 深入理解信号量Semaphore 深入理解并发三大特性 并发编程之深入理解CAS 深 ...
- HTTP协议头部与Keep-Alive模式详解
HTTP协议头部与Keep-Alive模式详解 HTTP协议头部与Keep-Alive模式详解 - 玩命写博客 - 博客频道 - CSDN.NET HTTP协议头部与Keep-Alive模式详解 20 ...
- java并发编程Future类详解
作用和举例 future类的作用就是为了调用其他线程完成好后的结果,再返回到当前线程中,如上图举例: 小王自己是主线程,叫外卖等于使用future类,叫好外卖后小王就接着干自己的事去了,当外卖到了的时 ...
- python贪婪匹配_python re模块匹配贪婪和非贪婪模式详解
python re模块匹配贪婪和非贪婪模式详解 这篇文章主要介绍了python re模块匹配贪婪和非贪婪模式详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友 ...
- getinstance方法详解_二、设计模式总览及工厂模式详解
二.架构师内功心法之设计模式 2.架构师内功心法之设计模式 2.1.课程目标 1.通过对本章内容的学习,了解设计模式的由来. 2.介绍设计模式能帮我们解决哪些问题. 3.剖析工厂模式的历史由来及应用场 ...
- Spotify敏捷模式详解三部曲第二篇:研发过程
本文转自:Scrum 中文网 引言 在本系列文章的第一篇,我们介绍了Spotify的敏捷研发团队,以及它独特的组织架构.Spotify的研发团队采用的是一种非常独特的组织架构,如下图所示: 整个研发组 ...
- Spotify敏捷模式详解三部曲第一篇:研发团队
本文转自:Scrum中文网 引言 2018年4月,来自北欧瑞典的音乐流媒体公司.百亿美元独角兽Spotify创造了历史,它成为了当代上市公司当中,第一家通过"直接上市"的方式在美国 ...
- Docker(十四):Docker:网络模式详解
Docker作为目前最火的轻量级容器技术,牛逼的功能,如Docker的镜像管理,不足的地方网络方面. Docker自身的4种网络工作方式,和一些自定义网络模式 安装Docker时,它会自动创建三个网络 ...
最新文章
- pip 或者conda 下载安装torch-{cluster,geometric,scatter,sparse,spline-conv}的时候报错
- AIX系统日志学习笔记之三
- scrapy中的request对象
- MQTT 5.0 新特性(三)| 有效载荷标识与内容类型
- import tensorflow 报错 ImportError: DLL load failed: 找不到指定的模块。
- 嵌入式OS入门笔记-以RTX为案例:三.初探进程
- Python基础知识4: while循环基本使用
- [置顶] android 自定义圆角ImageView以及锯齿的处理
- 和菜鸟一起学android4.0.3源码之lcd屏幕背光调节
- 动手学深度学习Pytorch Task01
- 如何打开KML和KMZ文件并与卫星影像叠加
- www.jb51.com脚本之家漂亮菜单
- mysql 上下文切换_线程上下文切换
- 云控系统php源码,xrkmontor字符云监控系统php源码 v2.5
- 华为交换机配置NQA实现动态链路检测
- oracle utl inaddr,Oracle包utl_inaddr
- VSCode:删除SSH远程连接
- 一种基于人脸追踪和特征分析的疲劳驾驶预警平台
- 灰灰教你学python ~小黄鸡自动回复
- 集成学习研究现状及展望