FutureTask的简单示例:

FutureTask的应用场景,如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。

public static void main(String[] args) throws InterruptedException{

FutureTask ft = new FutureTask<>(()->{//Callable接口的实现类

int num = new Random().nextInt(10);

TimeUnit.SECONDS.sleep(num);

return num;

});

Thread t = new Thread(ft);

t.start();

//模拟主线程做一些其他操作,跟futureTask任务并行

//等需要futureTask的运行结果时,可以调用get方法获取。

TimeUnit.SECONDS.sleep(2);

try {

//等待任务执行完成,获取返回值

Integer num = ft.get();

System.out.println(num);

} catch (Exception e) {

e.printStackTrace();

}

}

Callable接口

Java现在的多线程机制,核心方法run是没有返回值的;如果要保存run方法里面的计算结果,必须等待run方法计算完,无论计算过程多么耗时。而Callable接口可以看作是Runnable接口的补充,call方法带有返回值,并且可以抛出异常。

public interface Callable {

V call() throws Exception;

}

Future接口

当我们启动一个线程(线程t1)去完成一个计算耗时的操作(调用耗时方法method())时,如果另外一个线程(线程t2)一直等待线程t1的计算结果这显然是不明智的。但是,我们可以在调用method()的时候立即返回一个Future,而我们可以通过Feature获取method()的各种执行信息(计算是否取消,是否计算完成,获取执行结果)

public interface Future {

//还没计算完,可以取消计算过程

boolean cancel(boolean mayInterruptIfRunning);

//判断计算是否被取消

boolean isCancelled();

//判断是否计算完

boolean isDone();

//阻塞式获取任务执行结果

V get() throws InterruptedException, ExecutionException;

//在指定的时间内获取计算结果

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

RunnableFuture接口

RunnableFuture实现了Runnable和Future。因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。

public interface RunnableFuture extends Runnable, Future {

void run();

}

FutureTask实现类

FutureTask实现了RunnableFuture接口。

public class FutureTask implements RunnableFuture

FutureTask的源码分析 任务的执行状态, 当我们把FutureTask看作一个Future,那么它的作用就是控制Callable的call方法的执行过程,在执行的过程中自然会有状态的转换:

/**

* 通常一个FutureTask新建出来,state就是NEW状态;

* COMPETING和INTERRUPTING用的进行时,表示瞬时状态,存在时间极短;

* NORMAL代表顺利完成;

* EXCEPTIONAL代表执行过程出现异常;

* CANCELED代表执行过程被取消;

* INTERRUPTED被中断

*

* 可能的状态转移:

* (执行过程顺利完成)NEW -> COMPLETING -> NORMAL

* (执行过程出现异常)NEW -> COMPLETING -> EXCEPTIONAL

* (执行过程被取消)NEW -> CANCELLED

* (执行过程中,线程中断)NEW -> INTERRUPTING -> INTERRUPTED

*/

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

//将要执行的任务

private Callable callable;

//用于get()返回的结果,也可能是用于get()方法抛出的异常

private Object outcome; // non-volatile, protected by state reads/writes

//执行callable的线程,调用FutureTask.run()方法通过CAS设置

private volatile Thread runner;

//栈结构的等待队列,该节点是栈中的最顶层节点。

private volatile WaitNode waiters;

FutureTask的run()方法

public void run() {

//保证callable任务只被运行一次,如果state状态不为New或者设置运行线程runner失败则直接返回false,说明线程已经启动过。

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

try {

Callable c = callable;//callable从构造方法中传入

if (c != null && state == NEW) {

V result;

boolean ran;

try {

result = c.call();//执行任务,并返回result

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

setException(ex);//保存call方法抛出的异常,将state状态设置成NORMAL

}

if (ran)

set(result);//保存call方法的执行结果,将state状态设置成EXCEPTIONAL

}

} finally {

runner = null;

int s = state;

//判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

FutureTask的set方法

protected void set(V v) {

//通过CAS把state的NEW状态修改成COMPLETING状态

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

//修改成功则把v值赋给outcome变量。

outcome = v;

//然后再把state状态修改成NORMAL,表示现在可以获取返回值。

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

finishCompletion();//唤醒等待队列中的所有节点。

}

}

FutureTask的setException 方法

protected void setException(Throwable t) {

//通过CAS把state的NEW状态修改成COMPLETING状态。

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t;//将异常值赋给outcome

//把state状态修改成EXCEPTIONAL,表示待返回的异常信息设置成功。

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

finishCompletion();//醒等待队列中的所有节点。

}

}

FutureTask的handlePossibleCancellationInterrupt方法

private void handlePossibleCancellationInterrupt(int s) {

if (s == INTERRUPTING)

while (state == INTERRUPTING)

Thread.yield(); // 如果正在响应中断,则调用Thread.yield()等待响应中断结束(INTERRUPTED)。

}

FutureTask的finishCompletion方法

private void finishCompletion() {

for (WaitNode q; (q = waiters) != null;) {

//通过CAS把栈顶的元素置为null,相当于弹出栈顶元素

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

for (;;) {

Thread t = q.thread;

if (t != null) {

q.thread = null;

LockSupport.unpark(t);//唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能cancel,异常等)

}

WaitNode next = q.next;

if (next == null)

break;

q.next = null; // unlink to help gc

q = next;

}

break;

}

}

done();

callable = null; // to reduce footprint

}

FutureTask的runAndReset 方法 (可被子类重写,外部无法直接调用)

protected boolean runAndReset() {

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))

return false;

boolean ran = false;

int s = state;

try {

Callable c = callable;

if (c != null && s == NEW) {

try {

c.call(); // don't set result

ran = true;

} catch (Throwable ex) {

setException(ex);

}

}

} finally {

runner = null;

s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

return ran && s == NEW;

}

该方法和run方法的区别是,run方法只能被运行一次任务,而该方法可以多次运行任务。而runAndReset这个方法不会设置任务的执行结果值,如果该任务成功执行完成后,不修改state的状态,还是可运行(NEW)状态,如果取消任务或出现异常,则不会再次执行。

FutureTask的get方法

public V get() throws InterruptedException, ExecutionException {

int s = state;

if (s <= COMPLETING)//如果state状态小于等于COMPLETING,说明任务还没开始执行或还未执行完成

s = awaitDone(false, 0L);//调用awaitDone方法阻塞该调用线程(将当前线程挂起等待)

return report(s);//如果state的状态大于COMPLETING,则说明任务执行完成,或发生异常、中断、取消状态。直接通过report方法返回执行结果。

}

FutureTask的get(long timeout, TimeUnit unit) 方法

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

同get方法,该get方法支持阻塞等待多长时间,如果超时直接抛出TimeoutException异常。

FutureTask的report方法

private V report(int s) throws ExecutionException {

Object x = outcome;

//如果state的状态为NORMAL,说明任务正确执行完成,直接返回计算后的值。

if (s == NORMAL)

return (V)x;

if (s >= CANCELLED)//state的状态大于等于CANCELLED,说明任务被成功取消执行、或响应中断,直接返回CancellationException异常

throw new CancellationException();

throw new ExecutionException((Throwable)x);//否则返回ExecutionException异常。

}

FutureTask的awaitDone 方法

构建栈链表的节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

JDK1.7版本是使用AQS的双向链表队列实现的。

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

//如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常

if (Thread.interrupted()) {

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

//如果state状态大于COMPLETING 则说明任务执行完成,或取消

if (s > COMPLETING) {

if (q != null)

q.thread = null;

return s;

}

//如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

//构建节点

else if (q == null)

q = new WaitNode();

//把当前节点入栈

else if (!queued)

queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

//如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间

//如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

//阻塞当前线程

else

LockSupport.park(this);

}

}

FutureTask的removeWaiter 方法

移除栈中的节点元素,需要使用CAS自旋来保障移除成功。

private void removeWaiter(WaitNode node) {

if (node != null) {

node.thread = null;

retry:

for (;;) { // restart on removeWaiter race

for (WaitNode pred = null, q = waiters, s; q != null; q = s) {

s = q.next;

if (q.thread != null)

pred = q;

else if (pred != null) {

pred.next = s;

if (pred.thread == null) // check for race

continue retry;

}

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))

continue retry;

}

break;

}

}

}

FutureTask的cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {

if (!(state == NEW &&

UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

return false;

try { // in case call to interrupt throws exception

if (mayInterruptIfRunning) {

//调用runner.interupt(),设置状态为INTERRUPTED.唤醒所有在get()方法等待的线程

try {

Thread t = runner;

if (t != null)

t.interrupt();

} finally { // final state

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

}

}

} finally {

finishCompletion();

}

return true;

}

总结: 任务开始运行后,不能在次运行,保证只运行一次(runAndReset 方法除外) 任务还未开始,或者任务已被运行,但未结束,这两种情况下都可以取消; 如果任务已经结束,则不可以被取消 。

参考地址:

java futuretask 源码_java并发编程——FutureTask源码分析相关推荐

  1. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  2. java 共享锁 独占锁_Java并发编程锁之独占公平锁与非公平锁比较

    Java并发编程锁之独占公平锁与非公平锁比较 公平锁和非公平锁理解: 在上一篇文章中,我们知道了非公平锁.其实Java中还存在着公平锁呢.公平二字怎么理解呢?和我们现实理解是一样的.大家取排队本着先来 ...

  3. java中解决脏读_java并发编程学习之脏读代码示例及处理

    使用interrupt()中断线程     当一个线程运行时,另一个线程可以调用对应的Thread对象的interrupt()方法来中断它,该方法只是在目标线程中设置一个标志,表示它已经被中断,并立即 ...

  4. java 关闭守护线程_Java并发编程之线程生命周期、守护线程、优先级、关闭和join、sleep、yield、interrupt...

    Java并发编程中,其中一个难点是对线程生命周期的理解,和多种线程控制方法.线程沟通方法的灵活运用.这些方法和概念之间彼此联系紧密,共同构成了Java并发编程基石之一. Java线程的生命周期 Jav ...

  5. java线程本地变量_Java并发编程示例(九):本地线程变量的使用

    这篇文章主要介绍了Java并发编程示例(九):本地线程变量的使用,有时,我们更希望能在线程内单独使用,而不和其他使用同一对象启动的线程共享,Java并发接口提供了一种很清晰的机制来满足此需求,该机制称 ...

  6. java程序使用异步总线_JAVA并发编程基础

    CPU核心 核心(Die)又称为内核,是CPU最重要的组成部分.CPU中心那块隆起的芯片就是核心,是由单晶硅以一定的生产工艺制造出来的,CPU所有的计算.接受/存储命令.处理数据都由核心执行.各种CP ...

  7. java aqs源码_Java并发系列-AQS源码学习

    AQS框架学习 Node节点 状态表示 cancelled:表明当前线程已经放弃锁 signal:表明当前线程正在运行,它后面的线程等着被它唤醒 condition:表明当前线程正在有条件的等待 pr ...

  8. java queue 线程安全_java并发编程之线程安全方法

    线程安全的实现方法,包含如下方式 一, 互斥同步 使用互斥锁的方式. 举个栗子 synchronized,最常用的同步实现方案, ReentrantLock,java并发包中工具,后续介绍. 互斥同步 ...

  9. java 等待几秒_Java并发编程synchronized相关面试题总结

    说说自己对于synchronized关键字的了解 synchronized关键字用于解决多个线程之间访问资源的同步性,synchronized关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个 ...

最新文章

  1. 比较2个DataTable中的内容是否相同的方法
  2. Java虚拟机常量池和本地变量表、自己定义的数值自动装入常量池
  3. Android:Margin和Padding
  4. pixhawk PX4FMU和PX4IO最底层启动过程分析
  5. Java中的命令设计模式
  6. 隐藏鼠标指针_Mac鼠标光标消失怎么办?苹果电脑鼠标指针不显示的解决方法
  7. 软件测试也需要推广?!
  8. How to create a hyperlink in SQL Server Reporting Services
  9. TaskTracker执行map或reduce任务的过程(二)
  10. 蓝桥杯 C语言 试题 历届试题 高僧斗法
  11. 数码相片冲印尺寸对照表
  12. 配置交叉编译工具链和环境变量
  13. 【进阶版递归】获取指定目录下的所有后缀为.java的文件
  14. android数据线接口定义,数据线接口种类(手机数据线原来还有这几种!)
  15. linux 删除开机密码,6种清除开机密码方法,总有一个适合你
  16. 谷歌命名工具_Google地图正在重命名整个社区
  17. android logo制作教程视频,Android的APP怎样制作LOGO的尺寸
  18. Win 10 下无法安装.net framework 3.5,错误代码0x800F081F的解决方案
  19. 作为程序员的硬实力是什么 ?
  20. win10开机显示无网络连接服务器,Win10开机无法连接网络

热门文章

  1. WebGoat攻略 for Mac(3)
  2. vue element ui 时间选择器 设置两个时间一前一后
  3. 基于SpringCloud微服务架构的直播平台的设计与实现(前端小程序+后端Java IDEA) 文档+项目源码
  4. Git —— 版本控制
  5. 计算机基本配置要求,Windows10系统对电脑的标准配置要求
  6. 问剑java_中秋国庆佳节 十大Java手机网游推荐
  7. 诺基亚进入NM时代,手机系统三足鼎立!
  8. oracle使用cmd命令导入数据库、删除用户、删除表空间
  9. HTML基础(一)--HTML5新特性和语义化
  10. 会员管理小程序实战开发11-获取会员手机号