JCU-futuretask如何实现
一. Future是什么?
1. Future是什么?
JDK 的 Future 就类似于我们网购买东西的订单号,当我们执行某一耗时的任务时,我们可以另起一个线程异步去执行这个耗时的任务,同时我们可以干点其他事情。当事情干完后我们再根据 future 这个"订单号"去提取耗时任务的执行结果即可。因此 Future 也是多线程中的一种应用模式。
扩展: 说起多线程,那么 Future 又与 Thread 有什么区别呢?最重要的区别就是 Thread 是没有返回结果的,而 Future 模式是有返回结果的。
2. 如何使用Future
前面搞明白了什么是Future,下面我们再来举个简单的例子看看如何使用Future。
假如现在我们要打火锅,首先我们要准备两样东西:把水烧开和准备食材。因为烧开水是一个比较漫长的过程(相当于耗时的业务逻辑),因此我们可以一边烧开水(相当于另起一个线程),一边准备火锅食材(主线程),等两者都准备好了我们就可以开始打火锅了。
public class DaHuoGuo {public static void main(String[] args) throws Exception {FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {@Overridepublic String call() throws Exception {println(Thread.currentThread().getName() + ":" + "开始烧开水...");// 模拟烧开水耗时Thread.sleep(2000);println(Thread.currentThread().getName() + ":" + "开水已经烧好了...");return "开水";}});Thread thread = new Thread(futureTask);thread.start();// do other thingprintln(Thread.currentThread().getName() + ":" + " 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)...");// 模拟准备火锅食材耗时Thread.sleep(3000);println(Thread.currentThread().getName() + ":" + "火锅食材准备好了");String shicai = "火锅食材";// 开水已经稍好,我们取得烧好的开水String boilWater = futureTask.get();println(Thread.currentThread().getName() + ":" + boilWater + "和" + shicai + "已经准备好,我们可以开始打火锅啦");}public static void println(String content){SimpleDateFormat sdf = new SimpleDateFormat();// 格式化时间 sdf.applyPattern("HH:mm:ss");// a为am/pm的标记Date date = new Date();// 获取当前时间 System.out.println("["+sdf.format(date)+"] "+content);}
}// [14:46:51] main: 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)...
// [14:46:51] Thread-0:开始烧开水...
// [14:46:53] Thread-0:开水已经烧好了...
// [14:46:54] main:火锅食材准备好了
// [14:46:54] main:开水和火锅食材已经准备好,我们可以开始打火锅啦
从以上代码中可以看到,我们使用Future主要有以下步骤:
- 新建一个
Callable
匿名函数实现类对象,我们的业务逻辑在Callable
的call
方法中实现,其中Callable
的泛型是返回结果类型; - 然后把
Callable
匿名函数对象作为FutureTask
的构造参数传入,构建一个futureTask
对象; - 然后再把
futureTask
对象作为Thread
构造参数传入并开启这个线程执行去执行业务逻辑; - 最后我们调用
futureTask
对象的get
方法得到业务逻辑执行结果。
可以看到跟 Future
使用有关的 JDK 类主要有 FutureTask
和 Callable
两个,下面主要对 FutureTask
进行源码分析。
扩展:还有一种使用 Future 的方式是将 Callable 实现类提交给线程池执行的方式,这里不再介绍,自行百度即可。
二. FutureTask源码分析
(一) FutureTask的成员变量和成员方法
- 我们先来看下FutureTask的类结构:
可以看到 FutureTask 实现了 RunnableFuture
接口,而RunnableFuture接口又继承了 Future
和 Runnable
接口。因为FutureTask间接实现了Runnable接口,因此可以作为任务被线程Thread执行;此外,最重要的一点就是FutureTask还间接实现了Future接口,因此还可以获得任务执行的结果。
- 成员变量
我们首先来看下FutureTask的成员变量有哪些,理解这些成员变量对后面的源码分析非常重要。/** 封装的Callable对象,其call方法用来执行异步任务 */ private Callable<V> callable; /** 用来装异步任务的执行结果 */ private Object outcome; /** 执行callable任务的线程 */ private volatile Thread runner; /** 线程等待节点,reiber stack的一种实现 */ private volatile WaitNode waiters; /** 任务执行状态 */ private volatile int state;private static final sun.misc.Unsafe UNSAFE; // 使用 Unsafe 执行 cas 修改成员变量时, 用到的字段偏移量 private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset;// 静态块 static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);} }
(二) FutureTask的状态变化
前面讲了FutureTask的成员变量,有一个表示状态的成员变量state我们要重点关注下,state变量表示任务执行的状态。
private volatile int state;
/** 任务新建状态 */
private static final int NEW = 0;
/** 任务正在完成状态,是一个瞬间过渡状态 */
private static final int COMPLETING = 1;
/** 任务正常结束状态 */
private static final int NORMAL = 2;
/** 任务执行异常状态 */
private static final int EXCEPTIONAL = 3;
/** 任务被取消状态,对应cancel(false) */
private static final int CANCELLED = 4;
/** 任务中断状态,是一个瞬间过渡状态 */
private static final int INTERRUPTING = 5;
/** 任务被中断状态,对应cancel(true) */
private static final int INTERRUPTED = 6;
可以看到任务状态变量state有以上7种状态,0-6分别对应着每一种状态。任务状态一开始是NEW,然后由FutureTask的三个方法set
,setException
和cancel
来设置状态的变化,其中状态变化有以下四种情况:
- NEW -> COMPLETING -> NORMAL:
这个状态变化表示异步任务的正常结束,其中COMPLETING是一个瞬间临时的过渡状态,由set方法设置状态的变化; - NEW -> COMPLETING -> EXCEPTIONAL:
这个状态变化表示异步任务执行过程中抛出异常,由setException方法设置状态的变化; - NEW -> CANCELLED:
这个状态变化表示被取消,即调用了cancel(false),由cancel方法来设置状态变化; - NEW -> INTERRUPTING -> INTERRUPTED:
这个状态变化表示被中断,即调用了cancel(true),由cancel方法来设置状态变化。
(三) run()
方法
public void run() {// 为了确保只有1个线程在执行futureTask, 需要确保两个提交同时满足, 否则直接从run()方法返回// (1) futureTask 的状态是 new// (2) futureTask 此时的执行线程为 null, 即还没有线程执行该 futureTask// 什么样的调用方式会让多个线程执行痛经一个 futureTask 呢? // 答: 实例化了一个 futureTask 对象, 然后调用了多次 new Thread(futureTask).start()if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 代码执行到这里, 已经确保只有1个线程可以执行 futureTask, // 所以直接在当前线程中调用 callable.call() 即可; 调用中: // (1) 如果发生异常: 更新状态为 EXCEPTIONAL , 通过方法 setException()?// (2) 如果没有发生异常, 更新状态为 NORMAL, 通过方法 set()Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// 代码执行到这里, 还是已经确保了只有1个线程可以执行 futureTask// 无论当前线程执行是否抛出异常, 执行后都应该把 futureTask 的 runner 属性置 null// 表示当前线程已执行完毕runner = null;// 后面3行是在处理执行过程中被 interrupt 的情况, 因为 run() 方法并不能实时响应中断,// 只是通过代码逻辑检测中断(参考while(!Thread.currentThread.isInterrupted())循环),// 因此, 在代码执行后响应中断, s >= INTERRUPTING 的情形, 处理方法为: // private void handlePossibleCancellationInterrupt(int s) {// if (s == INTERRUPTING)// while (state == INTERRUPTING)// Thread.yield();// }if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield();int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}
这里值得注意的是判断线程满不满足执行异步任务条件时, runner 是否为 null 是调用 UNSAFE 的 CAS 方法 compareAndSwapObject
来判断和设置的,同时 compareAndSwapObject
是通过成员变量 runner 的偏移地址 runnerOffset 来给 runner 赋值的,此外,成员变量 runner 被修饰为 volatile
是在多线程的情况下, 一个线程的 volatile 修饰变量的设值能够立即刷进主存,因此值便可被其他线程可见。
(四)FutureTask的状态更改方法: set()
和 setException()
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将run()最后的执行结果保存到 outcome 成员outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state NORMALfinishCompletion();}
}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将run()最后的执行结果保存到 outcome 成员outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state EXCEPTIONALfinishCompletion();}
}
(五)FutureTask的唤醒等待线程方法
因为 set(V v)
和 setException(Throwable t)
方法最后都调用了 finishCompletion()
, 就是表示异步任务不管正常还是异常结束, 都要执行一部分统一的操作, 这些操作主要是来唤醒所有因为 “调用 get()
方法时因异步任务还未执行完而阻塞” 的线程. 这些阻塞线程会被包装成 WaitNode
类形成栈存储. 因此唤醒(移除)的顺序是"后进先出"即后面先来的线程先被先唤醒(移除),关于这个线程等待链表是如何成链的,后面再继续分析。
private void finishCompletion() {// waiters 是 FutureTask 的成员变量, 每个因调用 get() 而阻塞的线程, 都会被// 包装为 WaitNode 对象(定义见下方), 所有的阻塞线程会组成一个链表存储. 首先看到的这个外层// for 循环其实是一种 "彻底清空所有WaitNode" 的保证, 真正遍历链表进行唤醒的是// 内部的 for (;;) 循环; 需要这个保证是因为动作的起点是: // if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) // 判断, 这个判断只能确保当时没有新的线程因get()被加入等待队列, 所以需要加上外层的for检测for (WaitNode q; (q = waiters) != null;) {// 判断没有新线程加入get()的等待队列if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 如下所有代码都是普通的遍历链表, 执行唤醒 WaitNode 内部线程的操作for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}// 无意义, done()的方法体1.8版本中为空 {}done(); // 因为异步任务已经执行完且结果已经保存到outcome中,因此此时可以将callable对象置空了callable = null;
}
[注]: WaitNode 定义:
static final class WaitNode {volatile Thread thread; // 包装线程// 成链表的标志 (实际为栈, 用栈顶元素执行cas判断, 确定是否有新线程加入get()等待队列)volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }
}
(六)FutureTask.get方法,获取任务执行结果
前面我们起一个线程在其run
方法中执行异步任务后,此时我们可以调用FutureTask.get
方法来获取异步任务执行的结果。
public V get() throws InterruptedException, ExecutionException {int s = state;// (1) 如果任务状态state<=COMPLETING,说明异步任务正在执行过程中,// 此时会调用awaitDone方法阻塞等待if (s <= COMPLETING)s = awaitDone(false, 0L);// (2) 代码执行到这里, 说明等待的线程已被唤醒, 任务执行完毕: // 任务可能执行成功也可能执行失败, report() 会根据执行的状态// 选择正常返回还是抛异常. 定义详见下面return report(s);
}
- awaitDone( )方法
// 2个参数出现的原因是: 有的线程指调用 get() 只想等待有限时间
// 等到任务结束返回的普通 get(), timed 参数为false
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 计算最大等待的时间点. 不限制等待时长的时间点取0final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;// 还未入栈boolean queued = false;for (;;) {// (1) 等待线程被执行中断, 抛异常退出if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 任务执行状态int s = state;// (2) s > COMPLETING 表示任务执行完毕, 返回最终状态退出. // 任务可能正常结束(NORMAL),可能抛出异常(EXCEPTIONAL) ,// 或任务被取消(CANCELLED,INTERRUPTING或INTERRUPTED状态的一种)if (s > COMPLETING) {// 【问】run()方法在任务结束时,也会调用finishCompletion(), 诸个将等待栈中的// WaitNode节点的thread置空,这里为什么又要再调用一次 q.thread = null 清空呢?// 【答】因为若很多线程来获取任务执行结果,在任务执行完的那一刻,此时获取任务的线程// 要么已经在线程等待链表中; 要么此时还是一个孤立的WaitNode节点。// (1)在线程等待链表中的的所有WaitNode节点将由finishCompletion来移除(同时唤醒)所有// 等待的WaitNode节点,以便垃圾回收;// (2)而孤立的线程WaitNode节点此时还未阻塞,因此不需要被唤醒,此时只要把其属性置为// null,然后其有没有被谁引用,因此可以被GC。if (q != null)q.thread = null;return s;}// 任务还在执行中, 继续等待else if (s == COMPLETING) Thread.yield();// 如果节点还未构造, 构造节点else if (q == null)q = new WaitNode();// 将构造的节点加入该线程等待栈的头部// [问]: 为什么节点加入栈的动作要写在循环内呢? // [答]: 这是多线程下cas节点入栈的标准写法. 因为入栈动作可能失败, 所以写在// 死循环内持续入栈; 这也是循环内判断 else if (q == null) 分支的// 原因: 这个分支是保证节点只构造一次, 但入栈动作可执行无数次知道成功else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 处理get()线程限时等待的情况: else if (timed) {nanos = deadline - System.nanoTime();// 等待已超时if (nanos <= 0L) {removeWaiter(q);return state;}// 等待未超时, 继续等待预期时间LockSupport.parkNanos(this, nanos);}// 处理不限时get()的情况: else// 线程进入阻塞等待状态LockSupport.park(this);}
}
总的来说, 将本来可以写在一起的代码逻辑, 比如构造节点后入栈, 然后将节点中的线程阻塞这3个先后动作, 拉平成同一等级的分支写在死循环里的做法, 是一种兼顾 cas 操作失败的写法. 即保证无限次 cas 尝试, 又保证无需 cas 的连贯动作可以在下一次 for 循环中like执行.
- report( )方法
private V report(int s) throws ExecutionException {// 执行结果Object x = outcome;// (1) 正常返回if (s == NORMAL)return (V)x;// (2) 因取消任务而抛异常退出if (s >= CANCELLED)throw new CancellationException();// (3) 任务失败退出throw new ExecutionException((Throwable)x);
}
(七) FutureTask.cancel方法,取消执行任务
下面可以看到, 只有当执行cancel动作时, 还没有线程执行任务时才能执行取消
public boolean cancel(boolean mayInterruptIfRunning) {// 状态 != NEW, 则已有现成在执行任务, 不能取消// cas修改状态时发现状态不是 NEW 了, 说明有新线程执行任务了, 也不能取消任务if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // 如果润徐中断的话, 对线程中断if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 最终唤醒阻塞栈中的等待线程finishCompletion();}return true;
}
三. 总结
总的来说, 最简单的实现 future 模式, 只要:
- ( 1 ) 声明一个
volatile
的标记变量, 标记任务是否执行完毕 - ( 2 ) 未执行完毕时, 调用
get()
的线程执行flag.wait()
即可. 利用了jvm内部的条件等待队列 - ( 3 ) 用线程执行run()方法
反观 javaSE 的实现, 有几方面扩展:
- ( 1 ) 标记变量不止是
true/false
,取而代之的是一系列状态: new, completing, NORMAL, EXCEPTIONAL等. 这主要是为了配合run(),get(),cancel()在多线程下的逻辑 - ( 2 )
get()
线程阻塞的问题上, javaSE没有使用synchronized的条件等待队列, 而是用 cas 操作等待栈的方法. 当新线程执行get()阻塞时, 其它线程感值到新线程是通过 cas 查看栈顶节点是否发生变化得来的 - ( 3 ) 对于
run()
方法上, 通过javaSE的实现通过设置成员变量volatile Thread runner
, 来限制同一时刻最多只有一个线程执行run()方法 - ( 4 ) 除此之外, javaSE版本还实现了
cancel
等方法
JCU-futuretask如何实现相关推荐
- FutureTask源码分析
2019独角兽企业重金招聘Python工程师标准>>> 在JCU中,FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的 ...
- FutureTask中Treiber堆的实现
2019独角兽企业重金招聘Python工程师标准>>> 在文章FutureTask源码分析中简单说明了FutureTask中使用Treiber堆栈来保存等待结果的线程,本文将详细分析 ...
- 线上接口经常超时,我用线程池+ FutureTask解决了,YYDS
欢迎关注方志朋的博客,回复"666"获面试宝典 之前红包权益领取查询的接口超时了,因为有用户订购的权益有点多 解决方案 用线程池+ FutureTask将1个查询拆分成多个小查询 ...
- 一次搞懂 Runnable、Callable、Future、FutureTask,不懂不要钱!
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 一般创建线程只有两种方式,一种是继承Thread,一种是实 ...
- FutureTask demo
package com.xinwei.order.entity;import java.util.concurrent.ExecutorService; import java.util.concur ...
- futuretask使用_JDK源码分析-FutureTask
1. 概述 FutureTask 是一个可取消的.异步执行任务的类,它的继承结构如下: 它实现了 RunnableFuture 接口,而该接口又继承了 Runnable 接口和 Future 接口,因 ...
- Callable、Future和FutureTask
2019独角兽企业重金招聘Python工程师标准>>> 创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任 ...
- FutureTask 实现预加载数据 在线看电子书、浏览器浏览网页等
FutureTask 有点类似Runnable,都可以通过Thread来启动,不过FutureTask可以返回执行完毕的数据,并且FutureTask的get方法支持阻塞. 由于:FutureTask ...
- 接口经常超时?线程池+ FutureTask来解决!
之前红包权益领取查询的接口超时了,因为有用户订购的权益有点多 解决方案 用线程池+ FutureTask将1个查询拆分成多个小查询 选择FutureTask是因为它具有仅执行1次run()方法的特性( ...
- Java--FutureTask原理与使用(FutureTask可以被Thread执行,可以被线程池submit方法执行,并且可以监控线程与获取返回值)...
package com;import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; im ...
最新文章
- 【Python】常用包整理,包括Numpy、Pandas、sklearn、url、pymysql、Wxpy、Xlwt、Pyecharts等
- tableau中文版教程pdf_PDF 文件
- boost::geometry::default_distance_result用法的测试程序
- 结构化项目管理:十步法 [摘自:成功的软件项目管理]
- 第一章导言的笔记与思考
- [译作]Class in Jscript Part I
- partition分区(左小右大)
- Eclipse插件安装之,使用(已经下载的zip)安装包直接安装插件(例:glassfish 插件 plugin)
- 嵌入式系统——文件系统
- Worksheet Crafter Premium Edition for Mac(教学工作表制作工具)
- 学习数据结构与算法分析如何帮助您成为更优秀的开发人员
- Zepto:基础学习
- 模式识别和机器学习--- 2.3高斯分布
- FreeSWITCH第三方库(音频)的简单介绍(一)
- java 大字符集_JAVA语言之java 乱码 字符集编码
- 索尼Xperia X Performance解锁、刷TWRP、刷原厂固件、刷安卓8.1AOSP
- 【学点心理学】八本值得反复阅读的心理类书籍推荐
- 03_CSS字符属性
- 关于项目运行或者打包出现“primordials is not defined”导致运行或打包失败问题
- 关于电脑壁纸分辨率低的原因及解决办法(个人感悟,并非系统性的盘点)
热门文章
- 实现数字到Excel中列序号的转换
- Win10 U盘插入无法识别,显示(脱机,签名冲突),解决办法
- 华为手机系统升级后锁屏界面显示波斯历的解决方法
- 转载……经典面试题目
- Vue Element UI 组件化 之 背景图组件
- 2岁男孩抽300毫升骨髓救姐姐 父亲起名“博生”
- android 修改 dpi_红米2WC|魔趣100|安卓10.0|10.10定制|归属地|机型修改|性能调整|稳定流畅...
- CloudComparePCL RANSAC提取圆柱体
- 使用Pytorch自带模型预测图片
- abap 多线程-员工花名册