2019独角兽企业重金招聘Python工程师标准>>>

概念

Callable类的定义

@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

Runnable类的定义

@FunctionalInterface
public interface Runnable {public abstract void run();
}

Future类的定义

public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Callable Runnable Future 都是为异步执行设计的接口类。Callable与Runnable接口的区别是Callable有返回值,并且会抛出异常信息,Runnable没有返回值,也不允许抛出异常。Future则可以判断任务是否执行完,是否取消,以及取消当前任务和获取结果。

使用实例

Runnable实例

Runnable runnable = new Runnable() {@Overridepublic void run() {// do business jobSystem.out.println("thread run = " + Math.random());}
};new Thread(runnable).start();
//  new Thread(runnable).run();System.out.println("done");

定义了一个runnable实例放入Thread并且调用start()方法就可以启动一个线程来执行。这里注意是调用Thread.start()方法,不能调用Thread.run()方法。run()其实是串行执行的,start()才会启动线程异步执行。

start()和run()区别

/* What will be run. */
private Runnable target;public Thread(Runnable target) {init(null, target, "Thread-" + nextThreadNum(), 0);
}@Override
public void run() {if (target != null) {target.run();}
}/*** Causes this thread to begin execution; the Java Virtual Machine* calls the <code>run</code> method of this thread.*/
public synchronized void start() {if (threadStatus != 0)throw new IllegalThreadStateException();group.add(this);boolean started = false;try {start0();started = true;} finally {try {if (!started) {group.threadStartFailed(this);}} catch (Throwable ignore) {}}
}private native void start0();

阅读Thread类的源代码,构造函数将runnable对象赋值给内部的target变量。调用run()就是直接调用target对象的run()。调用start()其实是调用start0(),而start0()是一个native本地方法,由JVM调用操作系统类库来启动线程。

Callable+Future实例

Callable<Double> callable = new Callable<Double>() {@Overridepublic Double call() throws Exception {// do business jobreturn Math.random();}
};FutureTask<Double> future = new FutureTask<>(callable);new Thread(future).start();// do business jobSystem.out.println("future result = " + future.get());
System.out.println("future result = " + future.get(100, TimeUnit.MILLISECONDS));

Callable必须要结合Future来一起使用,声明一个callable实例,通过这个实例再生成一个FutureTask类型的实例放入Thread执行。当调用future.get()的时候,如果future task已经执行完毕则可以获得结果,否则堵塞当前线程直到线程执行完并且返回结果,future.get(long, TimeUnit)支持获取执行结果超时限制。

为什么一定要生成这个FutureTask实例?原因是Thread的构造方法只接受Runnable类型的变量

Thread(Runnable target) {...}
Thread(Runnable target, AccessControlContext acc) {...}
Thread(Runnable target, String name) {...}

再看一下FutureTask的定义

public class FutureTask<V> implements RunnableFuture<V> {...public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}...
}public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}

FutureTask继承自RunnableFuture,而RunnableFuture同时继承了Runnable和Future。FutureTask是Future的实现类又是Runnable的实现类,可以得出的结论是Future提供的方法都是基于Callable接口实现的。

Future 源码阅读

Future线程状态

Future内部定义了一组线程的运行状态

/*** The run state of this task, initially NEW.  The run state* transitions to a terminal state only in methods set,* setException, and cancel.  During completion, state may take on* transient values of COMPLETING (while outcome is being set) or* INTERRUPTING (only while interrupting the runner to satisfy a* cancel(true)). Transitions from these intermediate to final* states use cheaper ordered/lazy writes because values are unique* and cannot be further modified.** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

从注释里面看出来一共有4种状态的变化

  1. NEW(初始化)-> COMPLETING(运行中)-> NORMAL(完成状态)
  2. NEW(初始化)-> COMPLETING(运行中)-> EXCEPTIONAL(运行发生错误)
  3. NEW(初始化)-> CANCELLED(还未运行已经被取消)
  4. NEW(初始化)-> INTERRUPTING(运行中被取消)-> INTERRUPTED(被取消状态)

Future内部变量

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}

callable即实际运行的callable对象,outcome是运行结果存储的变量,waiters是一个链表结构的东西,其实是一个对象拥有下面一个对象的指针,后面会解释(注释上说这个叫Treiber stack)。

Future.run()

线程启动之后实际调用的是run()方法

public void run() {if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}

run()方法实际调用的是callble.call()方法,获取到返回值之后调用set()方法,set()方法通过CAS将线程状态从NEW设置为COMPLETING,再将返回值设置到outcome变量,然后将线程状态设置为NORMAL完成的状态。最后的finishCompletion()方法下面再讲解。

Future.get()

public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}

get()方法判断线程状态,如果线程状态不是小于等于COMPLETING的状态调用report(),report()方法判断线程状态为NORMAL就直接返回outcome的值,如果线程状态为CANCELLED就抛出CancellationException异常。

如果线程状态小于等于COMPLETING,调用awaitDone方法

private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}
}static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}

awaitDone方法内有一个循环,循环内一串判断条件

  1. 如果线程状态大于COMPLETING,将q(waitNode)变量的thread设置为null,然后把线程状态返回出去
  2. 如果线程状态等于COMPLETING,调用Thread.yield()让出当前线程的CPU使用时间
  3. 如果q==null,创建一个新的WaitNode节点
  4. 如果queued==false(还未被加入等待队列),使用CAS操作将上一步创建的waitNode设置为waiters链表的表头
  5. 如果有超时限制,判断是否超时,如果超时,将waiters链表的节点移除,如果未超时,调用LockSupport.parkNanos()阻塞线程
  6. 以上都不满足,调用LockSupport.park()阻塞线程

循环内的判断条件都是排他的,这个循环一般会循环三次。

  • 第一次循环执行q==null的条件,创建WaitNode节点。
  • 第二次循环执行!queued条件,将刚才创建的waitNode节点设置为waiters链表的表头。WaitNode类存了一个线程的引用以及下一个WaitNode节点的引用,这是一个单向链表的数据结构。
  • 第三次循环执行到LockSupport.park*()阻塞线程

为什么需要一个链表?

我能想到的是在多个线程一起调用get()/get(timeout)方法的时候才需要这个链表,因为get()/get(timeout)在task处于非完成状态时是调用LockSupport.park*()阻塞线程的,在多个线程进行get操作,需要一个链表来维护这些线程,一会在task执行完或者出现异常的时候,会在这个链表中找到正在被堵塞的线程调用LockSupport.unpark()来解除堵塞。因为这样的机制才需要一个链表。

再回顾刚才的FutureTask.run()方法,出现异常的时候会调用setException(ex),线程执行完之后会执行set(result)。

protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}

出现异常的时候将线程的最终状态设置为EXCEPTIONAL,正常结束的时候将线程的最终状态设置为NORMAL,然后都会调用finishCompletion()。

private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint
}protected void done() { }

这个方法会循环这个waiters链表,取出里面正在等待的线程逐个调用LockSupport.unpark(t)来解除堵塞。通过CAS操作将waiters设置为null。

这里还有一个done()方法,方法体是空的。可以被子类重写,做一些线程执行完成之后的操作。这个也可以会称为回调函数。

转载于:https://my.oschina.net/u/232911/blog/3020028

解析 Callable Runnable Future 使用和原理相关推荐

  1. terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二)、Callable和Future

    在"Java多线程系列--"基础篇"01之 基本概念"中,我们介绍过,线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态.线程池也有5种状态:然而 ...

  2. 【Java并发】Runnable、Callable、Future、FutureTask

    创建线程的两种方式 直接继承 Thread 实现 Runnable 接口 这两种方式都有一个缺点:在执行完成任务之后,无法直接获取到最后的执行结果.如果需要获取执行结果,就必须通过共享变量或线程通信的 ...

  3. futuretask java 并发请求_【Java并发】Runnable、Callable、Future、FutureTask

    创建线程的两种方式 直接继承 Thread 实现 Runnable 接口 这两种方式都有一个缺点:在执行完成任务之后,无法直接获取到最后的执行结果.如果需要获取执行结果,就必须通过共享变量或线程通信的 ...

  4. 一次搞懂 Runnable、Callable、Future、FutureTask,不懂不要钱!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 一般创建线程只有两种方式,一种是继承Thread,一种是实 ...

  5. Java中的Runnable、Callable、Future、FutureTask

    Java中存在Runnable.Callable.Future.FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别. ...

  6. Java中的Runnable、Callable、Future、FutureTask的区别与示例

    原文地址:http://blog.csdn.net/bboyfeiyu/article/details/24851847 --------------------------------------- ...

  7. Runnable、Callable、Future、RunnableFuture 和 FuturTask 到底是些啥,到底有啥关系?

    直接上来先说结论(我比较喜欢这种方式,毕竟有的文章一开始就给你讲论证,讲了半天都不知道论题的结果是什么) {@interface Runnable} :他的表示是希望被另一个线程所执行,有点抽象,说白 ...

  8. Callable和Future、FutureTask的使用

    http://www.silencedut.com/2016/06/15/Callable%E5%92%8CFuture%E3%80%81FutureTask%E7%9A%84%E4%BD%BF%E7 ...

  9. Java多线程系列--“JUC线程池”06之 Callable和Future

    转载自  Java多线程系列--"JUC线程池"06之 Callable和Future Callable 和 Future 简介 Callable 和 Future 是比较有趣的一 ...

最新文章

  1. Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent!
  2. 常见的面试题(整理)
  3. linux ls的所有参数,Linux ls命令参数详解
  4. 关于手机端CSS Sprite图标定位的一些领悟
  5. 2013杭州网赛 1001 hdu 4738 Caocao's Bridges(双连通分量割边/桥)
  6. 怎么将ts文件快速合成一个文件
  7. Catfish(鲶鱼) CMS博客 php源码超级简洁!可塑性强,体积小省流,三分钟下载安装
  8. 计算机相关知识抢答题题库,计算机基础知识抢答赛题库
  9. echarts 柱状图设置边框_echarts柱状图
  10. 公司英文名称及部门大全
  11. poco 连接mysql_[Poco]数据库操作简介
  12. 导航中的常用坐标系解析
  13. centos mysql部署_CentOS下MySQL 8.0安装部署,超详细!
  14. gitweb 搭建教程
  15. 阿里大力押注的淘宝心选,还赶不上网易严选们吗?
  16. doc转docx文件会乱吗_Word中doc和docx,到底有什么区别
  17. python循环语句打印三角形_python循环输出三角形图案的例子
  18. 卫星图瓦片爬取之google卫星图偏移的问题
  19. Ubuntu中安装Python h5py
  20. php extract 字符串,php extract 函数

热门文章

  1. SD模块的几个增强(VA01-VA03,VA41-VA43)
  2. SAP SD 客户信贷管理解析
  3. AI模糊测试:下一个重大网络安全威胁
  4. 极大似然估计的理解与应用
  5. (如何从一个列表中随机抽样)np.random.choice(),random.sample()
  6. (linux) Firefox is already running, but is not responding解决方法
  7. ICLR认知科学@AI workshop一览
  8. 总经费8.4亿的上海市脑科学重大专项进展如何?且看2019年度工作汇报会
  9. 中国AI芯片产业发展白皮书:未来三年年均增长率超50%
  10. 科技/IT:2019 年 Q3 表现最佳和最差的企业