解析 Callable Runnable Future 使用和原理
2019独角兽企业重金招聘Python工程师标准>>>
概念
Callable类的定义
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
Runnable类的定义
@FunctionalInterface
public interface Runnable {public abstract void run();
}
Future类的定义
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Callable Runnable Future 都是为异步执行设计的接口类。Callable与Runnable接口的区别是Callable有返回值,并且会抛出异常信息,Runnable没有返回值,也不允许抛出异常。Future则可以判断任务是否执行完,是否取消,以及取消当前任务和获取结果。
使用实例
Runnable实例
Runnable runnable = new Runnable() {@Overridepublic void run() {// do business jobSystem.out.println("thread run = " + Math.random());}
};new Thread(runnable).start();
// new Thread(runnable).run();System.out.println("done");
定义了一个runnable实例放入Thread并且调用start()方法就可以启动一个线程来执行。这里注意是调用Thread.start()方法,不能调用Thread.run()方法。run()其实是串行执行的,start()才会启动线程异步执行。
start()和run()区别
/* What will be run. */
private Runnable target;public Thread(Runnable target) {init(null, target, "Thread-" + nextThreadNum(), 0);
}@Override
public void run() {if (target != null) {target.run();}
}/*** Causes this thread to begin execution; the Java Virtual Machine* calls the <code>run</code> method of this thread.*/
public synchronized void start() {if (threadStatus != 0)throw new IllegalThreadStateException();group.add(this);boolean started = false;try {start0();started = true;} finally {try {if (!started) {group.threadStartFailed(this);}} catch (Throwable ignore) {}}
}private native void start0();
阅读Thread类的源代码,构造函数将runnable对象赋值给内部的target变量。调用run()就是直接调用target对象的run()。调用start()其实是调用start0(),而start0()是一个native本地方法,由JVM调用操作系统类库来启动线程。
Callable+Future实例
Callable<Double> callable = new Callable<Double>() {@Overridepublic Double call() throws Exception {// do business jobreturn Math.random();}
};FutureTask<Double> future = new FutureTask<>(callable);new Thread(future).start();// do business jobSystem.out.println("future result = " + future.get());
System.out.println("future result = " + future.get(100, TimeUnit.MILLISECONDS));
Callable必须要结合Future来一起使用,声明一个callable实例,通过这个实例再生成一个FutureTask类型的实例放入Thread执行。当调用future.get()的时候,如果future task已经执行完毕则可以获得结果,否则堵塞当前线程直到线程执行完并且返回结果,future.get(long, TimeUnit)支持获取执行结果超时限制。
为什么一定要生成这个FutureTask实例?原因是Thread的构造方法只接受Runnable类型的变量
Thread(Runnable target) {...}
Thread(Runnable target, AccessControlContext acc) {...}
Thread(Runnable target, String name) {...}
再看一下FutureTask的定义
public class FutureTask<V> implements RunnableFuture<V> {...public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}...
}public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}
FutureTask继承自RunnableFuture,而RunnableFuture同时继承了Runnable和Future。FutureTask是Future的实现类又是Runnable的实现类,可以得出的结论是Future提供的方法都是基于Callable接口实现的。
Future 源码阅读
Future线程状态
Future内部定义了一组线程的运行状态
/*** The run state of this task, initially NEW. The run state* transitions to a terminal state only in methods set,* setException, and cancel. During completion, state may take on* transient values of COMPLETING (while outcome is being set) or* INTERRUPTING (only while interrupting the runner to satisfy a* cancel(true)). Transitions from these intermediate to final* states use cheaper ordered/lazy writes because values are unique* and cannot be further modified.** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
从注释里面看出来一共有4种状态的变化
- NEW(初始化)-> COMPLETING(运行中)-> NORMAL(完成状态)
- NEW(初始化)-> COMPLETING(运行中)-> EXCEPTIONAL(运行发生错误)
- NEW(初始化)-> CANCELLED(还未运行已经被取消)
- NEW(初始化)-> INTERRUPTING(运行中被取消)-> INTERRUPTED(被取消状态)
Future内部变量
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}
callable即实际运行的callable对象,outcome是运行结果存储的变量,waiters是一个链表结构的东西,其实是一个对象拥有下面一个对象的指针,后面会解释(注释上说这个叫Treiber stack)。
Future.run()
线程启动之后实际调用的是run()方法
public void run() {if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}
run()方法实际调用的是callble.call()方法,获取到返回值之后调用set()方法,set()方法通过CAS将线程状态从NEW设置为COMPLETING,再将返回值设置到outcome变量,然后将线程状态设置为NORMAL完成的状态。最后的finishCompletion()方法下面再讲解。
Future.get()
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}
get()方法判断线程状态,如果线程状态不是小于等于COMPLETING的状态调用report(),report()方法判断线程状态为NORMAL就直接返回outcome的值,如果线程状态为CANCELLED就抛出CancellationException异常。
如果线程状态小于等于COMPLETING,调用awaitDone方法
private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}
}static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}
awaitDone方法内有一个循环,循环内一串判断条件
- 如果线程状态大于COMPLETING,将q(waitNode)变量的thread设置为null,然后把线程状态返回出去
- 如果线程状态等于COMPLETING,调用Thread.yield()让出当前线程的CPU使用时间
- 如果q==null,创建一个新的WaitNode节点
- 如果queued==false(还未被加入等待队列),使用CAS操作将上一步创建的waitNode设置为waiters链表的表头
- 如果有超时限制,判断是否超时,如果超时,将waiters链表的节点移除,如果未超时,调用LockSupport.parkNanos()阻塞线程
- 以上都不满足,调用LockSupport.park()阻塞线程
循环内的判断条件都是排他的,这个循环一般会循环三次。
- 第一次循环执行q==null的条件,创建WaitNode节点。
- 第二次循环执行!queued条件,将刚才创建的waitNode节点设置为waiters链表的表头。WaitNode类存了一个线程的引用以及下一个WaitNode节点的引用,这是一个单向链表的数据结构。
- 第三次循环执行到LockSupport.park*()阻塞线程
为什么需要一个链表?
我能想到的是在多个线程一起调用get()/get(timeout)方法的时候才需要这个链表,因为get()/get(timeout)在task处于非完成状态时是调用LockSupport.park*()阻塞线程的,在多个线程进行get操作,需要一个链表来维护这些线程,一会在task执行完或者出现异常的时候,会在这个链表中找到正在被堵塞的线程调用LockSupport.unpark()来解除堵塞。因为这样的机制才需要一个链表。
再回顾刚才的FutureTask.run()方法,出现异常的时候会调用setException(ex),线程执行完之后会执行set(result)。
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}
出现异常的时候将线程的最终状态设置为EXCEPTIONAL,正常结束的时候将线程的最终状态设置为NORMAL,然后都会调用finishCompletion()。
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint
}protected void done() { }
这个方法会循环这个waiters链表,取出里面正在等待的线程逐个调用LockSupport.unpark(t)来解除堵塞。通过CAS操作将waiters设置为null。
这里还有一个done()方法,方法体是空的。可以被子类重写,做一些线程执行完成之后的操作。这个也可以会称为回调函数。
转载于:https://my.oschina.net/u/232911/blog/3020028
解析 Callable Runnable Future 使用和原理相关推荐
- terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二)、Callable和Future
在"Java多线程系列--"基础篇"01之 基本概念"中,我们介绍过,线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态.线程池也有5种状态:然而 ...
- 【Java并发】Runnable、Callable、Future、FutureTask
创建线程的两种方式 直接继承 Thread 实现 Runnable 接口 这两种方式都有一个缺点:在执行完成任务之后,无法直接获取到最后的执行结果.如果需要获取执行结果,就必须通过共享变量或线程通信的 ...
- futuretask java 并发请求_【Java并发】Runnable、Callable、Future、FutureTask
创建线程的两种方式 直接继承 Thread 实现 Runnable 接口 这两种方式都有一个缺点:在执行完成任务之后,无法直接获取到最后的执行结果.如果需要获取执行结果,就必须通过共享变量或线程通信的 ...
- 一次搞懂 Runnable、Callable、Future、FutureTask,不懂不要钱!
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 一般创建线程只有两种方式,一种是继承Thread,一种是实 ...
- Java中的Runnable、Callable、Future、FutureTask
Java中存在Runnable.Callable.Future.FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别. ...
- Java中的Runnable、Callable、Future、FutureTask的区别与示例
原文地址:http://blog.csdn.net/bboyfeiyu/article/details/24851847 --------------------------------------- ...
- Runnable、Callable、Future、RunnableFuture 和 FuturTask 到底是些啥,到底有啥关系?
直接上来先说结论(我比较喜欢这种方式,毕竟有的文章一开始就给你讲论证,讲了半天都不知道论题的结果是什么) {@interface Runnable} :他的表示是希望被另一个线程所执行,有点抽象,说白 ...
- Callable和Future、FutureTask的使用
http://www.silencedut.com/2016/06/15/Callable%E5%92%8CFuture%E3%80%81FutureTask%E7%9A%84%E4%BD%BF%E7 ...
- Java多线程系列--“JUC线程池”06之 Callable和Future
转载自 Java多线程系列--"JUC线程池"06之 Callable和Future Callable 和 Future 简介 Callable 和 Future 是比较有趣的一 ...
最新文章
- Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent!
- 常见的面试题(整理)
- linux ls的所有参数,Linux ls命令参数详解
- 关于手机端CSS Sprite图标定位的一些领悟
- 2013杭州网赛 1001 hdu 4738 Caocao's Bridges(双连通分量割边/桥)
- 怎么将ts文件快速合成一个文件
- Catfish(鲶鱼) CMS博客 php源码超级简洁!可塑性强,体积小省流,三分钟下载安装
- 计算机相关知识抢答题题库,计算机基础知识抢答赛题库
- echarts 柱状图设置边框_echarts柱状图
- 公司英文名称及部门大全
- poco 连接mysql_[Poco]数据库操作简介
- 导航中的常用坐标系解析
- centos mysql部署_CentOS下MySQL 8.0安装部署,超详细!
- gitweb 搭建教程
- 阿里大力押注的淘宝心选,还赶不上网易严选们吗?
- doc转docx文件会乱吗_Word中doc和docx,到底有什么区别
- python循环语句打印三角形_python循环输出三角形图案的例子
- 卫星图瓦片爬取之google卫星图偏移的问题
- Ubuntu中安装Python h5py
- php extract 字符串,php extract 函数
热门文章
- SD模块的几个增强(VA01-VA03,VA41-VA43)
- SAP SD 客户信贷管理解析
- AI模糊测试:下一个重大网络安全威胁
- 极大似然估计的理解与应用
- (如何从一个列表中随机抽样)np.random.choice(),random.sample()
- (linux) Firefox is already running, but is not responding解决方法
- ICLR认知科学@AI workshop一览
- 总经费8.4亿的上海市脑科学重大专项进展如何?且看2019年度工作汇报会
- 中国AI芯片产业发展白皮书:未来三年年均增长率超50%
- 科技/IT:2019 年 Q3 表现最佳和最差的企业