FutureTask源码学习
基本属性
/* Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/private volatile int state; //线程状态private static final int NEW = 0; // 进来未执行的状态private static final int COMPLETING = 1; // 表示正在结束,临时状态,用于结束前cas操作;异常和有结果前都会有private static final int NORMAL = 2; // 正常结束private static final int EXCEPTIONAL = 3; // 异常结束private static final int CANCELLED = 4; // 任务被取消private static final int INTERRUPTING = 5; // 中断中 也是临时状态private static final int INTERRUPTED = 6; // 已中断// 运行任务private Callable<V> callable; // 整个任务周期中 存放值的对象,会有异常或者正常的值private Object outcome; // 正在执行任务的线程 private volatile Thread runner;// get任务阻塞的线程 会存在这个队列里面private volatile WaitNode waiters;
构造方法
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();//callable就是程序员自己实现的业务类this.callable = callable;//设置当前任务状态为 NEWthis.state = NEW; // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {// 利用适配器模式 把runnable 换成Callablethis.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}
状态相关方法
// 是否中断
public boolean isCancelled() {return state >= CANCELLED;
}
// 只要是不等于new 都属于线程已经开始执行的标记
public boolean isDone() {return state != NEW;
}
取消任务
public boolean cancel(boolean mayInterruptIfRunning) {// 如果状态等于new 并且 修改状态成功 说明可以取消 否则返回false 取消是失败// mayInterruptIfRunning 是否会发生中断情况 说白了就是以中断抛异常结束,if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try { Thread t = runner;if (t != null)// 加中断标记,具体看线程是否响应中断,捕获异常 自己处理t.interrupt();} finally { // final state//设置任务状态为 中断完成。UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 使阻塞队列的线程全部唤醒退出finishCompletion();}return true;}
执行任务方法
public void run() {// 判断线程状态 是否已经执行过或 cas失败 说明runnerOffset 已经被别的线程修改了// runnerOffset 时futuretask的局部变量if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 判断任务是否存在 状态是否是新状态 if (c != null && state == NEW) {V result;boolean ran;try {// 执行任务result = c.call();ran = true;} catch (Throwable ex) {// 抛异常result = null;ran = false;// 设置异常值setException(ex);}if (ran)// 设置返回值set(result);}} finally {//设置当前线程引用为空runner = null;int s = state;// 如果正在中断中if (s >= INTERRUPTING)// 直到中断结束 或者改变成其他状态handlePossibleCancellationInterrupt(s);}}
设置返回值
protected void setException(Throwable t) {//使用CAS方式设置当前任务状态为 完成中..//失败的话 外部线程等不及了,直接在set执行CAS之前 将 task取消了。 很小概率事件。if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//引用的是 callable 向上层抛出来的异常。outcome = t;//将当前任务的状态 修改为 EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 唤醒获取数据的节点finishCompletion();}}
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;//将结果赋值给 outcome之后,马上会将当前任务状态修改为 NORMAL 正常结束状态。UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion();}}
唤醒所有节点 注意 done();方法
private void finishCompletion() {//q指向waiters 链表的头结点。for (WaitNode q; (q = waiters) != null;) {//使用cas设置 waiters 为 null 为了保证只有一个线程在循环这个唤醒操作 因为外部方法cancel也会触发唤醒节点操作if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//自循环 唤醒所有获取数据的线程for (;;) {//获取当前node节点封装的 threadThread t = q.thread;//条件成立:说明当前线程不为nullif (t != null) {q.thread = null;//help GC//唤醒当前节点对应的线程LockSupport.unpark(t);}//next 当前节点的下一个节点 继续唤醒WaitNode next = q.next;// 当下一个几点为空时 说明是最后节点咯if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}// 这个方法 就是我们执行完task任务的回调方法 一般我们都会重写逻辑在里面 done();//将callable 设置为null helpGCcallable = null; }
get 方法阻塞队列
// 可以带有超时时间
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state// 只有小于COMPLETING 正在结束 状态的线程才会进行阻塞 直到唤醒线程 如果唤醒后状态还是没变 就超时if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();// 上面执行完 就能返回值return report(s);}
report 返回值
private V report(int s) throws ExecutionException {//正常情况下,outcome 保存的是callable运行结束的结果//非正常,保存的是 callable 抛出的异常。Object x = outcome;//条件成立:当前任务状态正常结束if (s == NORMAL)//直接返回callable运算结果return (V)x;//被取消状态if (s >= CANCELLED)throw new CancellationException();//执行到这,说明callable接口实现中,是有bug的...throw new ExecutionException((Throwable)x);}
awaitDone 阻塞分析
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;//引用当前线程 封装成 WaitNode 对象WaitNode q = null;//表示当前线程 waitNode对象 有没有 入队/压栈boolean queued = false;//自旋for (;;) {//条件成立:说明当前线程唤醒 是被其它线程使用中断这种方式喊醒的。interrupted()//返回true 后会将 Thread的中断标记重置回false.if (Thread.interrupted()) {//当前线程node出队removeWaiter(q);//抛出中断异常throw new InterruptedException();}//假设当前线程是被其它线程 使用unpark(thread) 唤醒的话。会正常自旋,走下面逻辑。int s = state;//条件成立:说明当前任务 已经有结果了.. 可能有结果 可能有异常if (s > COMPLETING) {// 这里正常情况下都是外面自循环已经创建好了节点才会出现//条件成立:说明已经为当前线程创建过node了,此时需要将 node.thread = null helpGCif (q != null)q.thread = null;//直接返回当前状态.return s;}//条件成立:说明当前任务接近完成状态...这里让当前线程再释放cpu ,进行下一次抢占cpu。else if (s == COMPLETING) // cannot time out yetThread.yield();//条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象else if (q == null)q = new WaitNode();//条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队else if (!queued){//当前线程node节点 next 指向 原 队列的头节点 waiters 一直指向队列的头!q.next = waiters;//cas方式设置waiters引用指向 当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);}//第三次自旋,会到这里。else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}// 阻塞 等待唤醒或中断LockSupport.parkNanos(this, nanos);}else//当前get操作的线程就会被park了。 线程状态会变为 WAITING状态,相当于休眠了..//除非有其它线程将你唤醒 或者 将当前线程 中断。LockSupport.park(this);}}
FutureTask源码学习相关推荐
- FutureTask源码解析二
本篇主要介绍FutureTask源码 我们知道FutureTask实现了RunnableFuture接口,即Runnable接口和Future接口,Runable可以对应FutureTask的task ...
- JDK源码分析 FutureTask源码分析
文章目录 前言 一.Callable接口 二.Future接口 三.FutureTask源码分析 3.1 Future继承结构图 3.2 参数介绍 3.3 构造函数 3.4. FutureTask的A ...
- Shiro源码学习之二
接上一篇 Shiro源码学习之一 3.subject.login 进入login public void login(AuthenticationToken token) throws Authent ...
- Shiro源码学习之一
一.最基本的使用 1.Maven依赖 <dependency><groupId>org.apache.shiro</groupId><artifactId&g ...
- mutations vuex 调用_Vuex源码学习(六)action和mutation如何被调用的(前置准备篇)...
前言 Vuex源码系列不知不觉已经到了第六篇.前置的五篇分别如下: 长篇连载:Vuex源码学习(一)功能梳理 长篇连载:Vuex源码学习(二)脉络梳理 作为一个Web前端,你知道Vuex的instal ...
- vue实例没有挂载到html上,vue 源码学习 - 实例挂载
前言 在学习vue源码之前需要先了解源码目录设计(了解各个模块的功能)丶Flow语法. src ├── compiler # 把模板解析成 ast 语法树,ast 语法树优化,代码生成等功能. ├── ...
- 2021-03-19Tomcat源码学习--WebAppClassLoader类加载机制
Tomcat源码学习--WebAppClassLoader类加载机制 在WebappClassLoaderBase中重写了ClassLoader的loadClass方法,在这个实现方法中我们可以一窥t ...
- jQuery源码学习之Callbacks
jQuery源码学习之Callbacks jQuery的ajax.deferred通过回调实现异步,其实现核心是Callbacks. 使用方法 使用首先要先新建一个实例对象.创建时可以传入参数flag ...
- JDK源码学习笔记——Integer
一.类定义 public final class Integer extends Number implements Comparable<Integer> 二.属性 private fi ...
最新文章
- lua 多条件_【LUA】只需花费你半天时间
- 共享打印机,解决驱动检测失败无法连接共享打印机问题
- 修改CMD设置使其支持鼠标选择复制
- pytorch 1.7.x训练保存的模型在1.4低版本无法加载
- matlab中的end
- 优酷 米兔机器人_米兔机器人如何发豆芽?
- Go 语言为Fibonacci函数实现Read方法
- Mysql优化(出自官方文档) - 第二篇
- BestCoder Round #66 (div.2)B GTW likes gt
- 如何把Mysql卸载干净?(亲测有效)
- 树莓派(Raspberry Pi) 命令行下如何配置wifi(wlan)
- excel如何根据身份证批量提取员工年龄?
- 精选1000个机械原理动图
- 商务个人邮箱,vip邮箱哪个最好用?外贸邮箱哪个是安全邮箱?
- C# 学习笔记04-15
- 一篇关于数学建模美赛论文撰写的心得
- VPX高速信号处理板设计资料第240篇:4C6678_K7_DDR3_VPX高速信号处理板
- 稠密的无人机激光雷达点云数据处理与分析方法与工具科普系列(一)
- WAF是干什么的 有哪些功能
- Redis 根据IPv6地址查询全球国家、省、市位置信息方案