JDK源码系列:Future是如何实现的?
大家好,我们在异步编程时向线程池提交(submit)一个任务后会得到一个 Future对象,通过 future.get() 方法可以堵塞等待结果的完成,例如:
public static void main(String[] args) throws ExecutionException, InterruptedException {//准备一个futureTask//FutureTask 是一个实现了 RunnableFuture接口(Runnable+Future 接口)的类RunnableFuture<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {System.out.println(Thread.currentThread().getId()+"do something");return true;}});//提交到一个线程上或者线程池上执行Executors.newSingleThreadExecutor().submit(futureTask);//获取执行结果futureTask.get();futureTask.get(1,TimeUnit.SECONDS);
}
通过对submit方法源码的查看可以知道,无论是你提交的这个任务 是 Runnable 对象 还是Callable对象 或者是 RunnableFuture对象,都会返回一个Future对象,实际上这个对象是一个RunnableFuture对象(FutureTask)。如下:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;
}
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {//返回的都是FutureTaskreturn new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {//返回的都是FutureTaskreturn new FutureTask<T>(callable);
}
通过上面的分析可知,返回的Future对象的本质是FutureTask类,今天就分析下 FutureTask 类的实现原理。
任务submit(futureTask)之后会被线程池中某个线程获取执行,关于线程池的执行原理与过程可以看下老吕之前对线程池的分析文章(线程池源码分析之ThreadPoolExecutor)。
今天主要看下 futureTask.get()和futureTask.get(1,TimeUnit.SECONDS)的背后发生了什么?
FutureTask类代码阅读并注释
public class FutureTask<V> implements RunnableFuture<V> {/*** 可能的状态变迁* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*///state 代表了当前任务的状态private volatile int state;private static final int NEW = 0;//还没被调度,只有这时候可以被取消private static final int COMPLETING = 1;//已被调度,还未执行完毕private static final int NORMAL = 2;//执行完毕,并且正常private static final int EXCEPTIONAL = 3;//执行完毕,有异常private static final int CANCELLED = 4;//任务被取消private static final int INTERRUPTING = 5;//任务中断中private static final int INTERRUPTED = 6;//任务已被中断//这个就是要执行的任务private Callable<V> callable;//无论是正常结果还是异常 都会附到 outcome上private Object outcome; //用来执行任务的线程private volatile Thread runner;/** Treiber stack of waiting threads *///等待结果的线程集合(单向链表,Treiber stack 特赖贝尔 设计,//链表节点的追加是通过CAS方式进行的,可以解决 多线程并发追加的安全问题//(比如多个线程同时调用了futureTask.get() 来获取结果,都会被堵塞住,做一个排队))private volatile WaitNode waiters;//返回结果或者抛出异常@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}//构造函数public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; }//构造函数public FutureTask(Runnable runnable, V result) {//将Runnable 做了一个适配,转换为 Callable对象this.callable = Executors.callable(runnable, result);this.state = NEW; }//任务是否被取消了public boolean isCancelled() {return state >= CANCELLED;}//任务是否已经开始执行public boolean isDone() {return state != NEW;}//取消任务的执行public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//任务状态为NEW并且通过CAS替换成INTERRUPTING或CANCELLED 失败//则返回失败return false;try { if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)//中断正在执行任务的线程t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {//通过所有等待结果的线程finishCompletion();}return true;}//堵塞获取结果,关键方法,主要关注下 awaitDone 方法 public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)//如果未完成,则进入等待s = awaitDone(false, 0L);return report(s);}//带超时时间的堵塞获取结果,关键方法,主要关注下 awaitDone 方法 public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}protected void done() { }//关键方法,回写正常执行结果,并唤醒所有等待的线程protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//CAS加锁成功outcome = v;//正常结果UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // CAS改变任务最终状态:正常完成//唤醒所有等待的线程finishCompletion();}}//异常结果回写protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//CAS加锁成功outcome = t;//异常信息UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // CAS改变任务最终状态:异常//唤醒所有等待的线程finishCompletion();}}//关键方法:任务执行后,会唤醒所有等待的线程 LockSupport.unpark(thread);public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))//防止多个线程执行同一个futureTask任务对象return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//执行核心任务逻辑result = c.call();ran = true;} catch (Throwable ex) {//有异常result = null;ran = false;//回写异常并唤醒所有等待的线程setException(ex);}if (ran) //回写正常结果并唤醒所有等待的线程set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}//特殊用途,先不关注protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}private void handlePossibleCancellationInterrupt(int s) {if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); }//使用了简单链表来记录等待执行结果的线程static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}//移除并唤醒所有等待结果的线程private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;//关键代码,唤醒执行通过 LockSupport.park 进入等待的线程//与awaitDone 方法呼应LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint}//等待任务完成,关键方法,//没有计时的用 LockSupport.park(this);//有计时的用 LockSupport.parkNanos(this, nanos);//这里与 finishCompletion 中的 LockSupport.unpark 呼应private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)//排队,通过cas追加链表节点queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {//移除超时的节点removeWaiter(q);return state;}//关键代码 限时等待LockSupport.parkNanos(this, nanos);}else//关键代码 无限期等待LockSupport.park(this);}}
//移除某个等待的节点private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) { // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;if (q.thread != null)pred = q;else if (pred != null) {pred.next = s;if (pred.thread == null) // check for racecontinue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;//记录 state 变量的内存地址偏移量(相对与futureTask对象起始地址)private static final long runnerOffset;//记录 执行任务的线程 变量的内存地址偏移量private static final long waitersOffset;//记录 等待链表的头节点 变量的内存地址偏移量static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}}
总结:
通过代码分析可以了解到
1)当某个线程调用 get()或者 get(long timeout, TimeUnit unit)方法获取结果时,如果任务还没有完成,则会通过CAS方式将调用线程加入到一个单向链表中,并且通过 LockSupport.park(this);
或者 LockSupport.parkNanos(this, nanos)实现线程自身的堵塞
2)当线程池中线程调度任务后会执行FutureTask类的run()方法,在run方法中会调用 Callable接口的call方法执行真正的任务代码,执行完成后 回写结果或异常到 outcome对象中,并且唤醒所有等待的线程,使用了 LockSupport.unpark(thread); 方法,这与 get中的 LockSupport.park 相呼应
3)为了解决线程安全问题,大量使用了CAS算法,大量应用 Unsafe 类中的CAS方法
4)示意图
今天就到这里,晚安!
JDK源码系列:Future是如何实现的?相关推荐
- HashSet源码分析:JDK源码系列
1.简介 继续分析源码,上一篇文章把HashMap的分析完毕.本文开始分析HashSet简单的介绍一下. HashSet是一个无重复元素集合,内部使用HashMap实现,所以HashMap的特征耶继承 ...
- JDK源码系列(2)-Object类
引言 我们都知道,在Java中,Object是所有类的超类,所有的类其实都是隐含继承自Object类的,所以extends Object默认是不用写的,当然你写了也不会错.所有的类都可以使用Objec ...
- 大白话讲解JDK源码系列:从头到尾再讲一遍ThreadLocal
引言 其实网上有很多关于ThreadLocal的文章了,有不少文章也已经写的非常好了.但是很多同学反应还有一些部分没有讲解的十分清楚,还是有一定的疑惑没有想的十分清楚.因此本文主要结合常见的一些疑问. ...
- JDK源码系列:子线程如何继承父线程上通过ThreadLocal绑定的数据
上一篇中老吕介绍了ThreadLocal线程数据绑定的原理,今天聊聊父子线程之间如何继承ThreadLocal上维护的数据. 开发过程中异步执行任务有两种情况,第一种情况是 主线程 通过 new Th ...
- JDK源码系列:ThreadLocal弱引用真的是过度设计吗?
在<码处高效:Java开发手册>这本书上详细描述了ThreadLocal的原理,也有过度设计的说法, 难道弱引用设计真的没必要吗?对此老吕要仔细分析分析,ThreadLocal到底该不该使 ...
- JDK源码系列:synchronized与wait、notify、notifyAll
大家好,今天聊一聊synchronized与obj.wait().obj.notify().obj.notifyAll() 之间的关系以及它们的实现原理. 我们今天采用边写demo边分析的方式来进行. ...
- JDK源码系列(3)-String
在JDK中,String的使用频率和被研究的程度都非常高,所以接下来我只说一些比较重要的内容. 一.String类的概述 String类的声明如下: public final class String ...
- JDK源码系列:AQS(队列同步器)原理
大家好,好久不见,今天看下JDK中的JUC包中AQS(AbstractQueuedSynchronizer 队列同步器)的实现原理. JUCL下的锁和synchronized提供的锁的区别 1.锁的获 ...
- JDK源码系列(6)-StringBuilder
一.概述 StringBuilder是一个可变的字符串序列,这个类被设计去兼容StringBuffer类的API,但不保证线程安全性,是StringBuffer单线程情况下的一个替代实现.在可能的情况 ...
最新文章
- java hashcode返回1_java – 为什么hashCode()在所有连续执行中为对象返回相同的值?...
- 固态硬盘市场或将提前爆发
- thinkphp memcache mysql_thinkphp中memcache的用法实例
- 第三十三期:对于人工智能的恐惧及其5个解决方法
- Windows Phone 7 Developer Tools amp; Training Kit 正式版发布!
- 400集python入门到精通_2020年最强Python学习路线+教程,400集带你从入门到精通
- AC日记——Sagheer, the Hausmeister codeforces 812b
- 华为交换机命令 端口速率_华为交换机看端口速率
- PDA开发从入门到精通
- 基于STM32的物联网语音控制智能家居
- Windows之API集合
- 为什么有机棉这么贵,还深受欢迎?
- Android OpenCV应用篇三:提取图片中的文字
- 关于Docker入门的一些事(4)
- 概率论与数理统计学习笔记——第三十五讲——依概率收敛,切比雪夫不等式
- 医疗大数据模型:医疗保险欺诈发现大数据模型
- 2022第四届智慧健康城市国际研讨会成功召开
- 武警二路擒敌拳16式——第一式动作分解
- 如何设计出性能更优的MySQL数据库schema?
- 笔试题13——击鼓传花
热门文章
- Niuke 练习赛19 托米航空公司 搜索
- IDEA换主题白色,调整字体样式大小,设置背景豆沙绿
- 查询期刊的ISO版缩写的巧妙方法
- Softing dataFEED OPC Suite将西门子PLC数据存储到Oracle数据库中
- AVR ATMEGA8的初次使用
- unity vr 粒子系统_Unity粒子系统插件,酷炫特效唾手可得
- OneAPI 编译cp2k 9.1和cp2k 2023.1
- asp毕业设计——基于asp+access的在线人才招聘网设计与实现(毕业论文+程序源码)——人才招聘网
- 搜索引擎优化的未来趋势
- MinGW-w32下载、安装和配置环境教程