futureTask的超时原理解析
序
本文主要解析一下futureTask的超时原理。
实例
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<?> future = executor.submit(new Callable<Void>() {public Void call() throws Exception {//do somethingreturn null;}});
future.get(500, TimeUnit.MILLISECONDS);复制代码
里头构造的是java/util/concurrent/ThreadPoolExecutor.java
submit
java/util/concurrent/AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}复制代码
execute
java/util/concurrent/ThreadPoolExecutor.java
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}复制代码
这里只是放入workQueue,然后判断是否需要添加线程
runWorker
java/util/concurrent/ThreadPoolExecutor.java
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}复制代码
这里循环从workQueue取出task,然后调用task.run()
futureTask.run
java/util/concurrent/FutureTask.java
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}复制代码
这里如果执行完成的话,会调用set(result),而异常的话,会调用setException(ex)
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}复制代码
都把状态从NEW设置为COMPLETING
future.get(long)
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}复制代码
这里看awaitDone,等待指定的时候,发现状态不是COMPLETING,则抛出TimeoutException,让调用线程返回。
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}复制代码
这里等待超时,然后返回原始状态
小结
由此可见,超时的机制其实不能中断callable里头实际执行的动作,超时只是让调用线程能够在指定时间返回而已,而底层调用的方法,实际还在执行。这里是需要额外注意的。
futureTask的超时原理解析相关推荐
- easy excel date 类型解析报错_ptarchiver原理解析
pt-archiver原理解析 作为MySQL DBA,可以说应该没有不知道pt-archiver了,作为pt-toolkit套件中的重要成员,往往能够轻松帮助DBA解决数据归档的问题.例如线上一个流 ...
- Nacos-注册中心原理解析
Nacos-注册中心原理解析 一.注册中心 二.Nacos注册中心原理解析 2.1 NamingService 2.2 NacosNamingService 2.2.1 NamingProxy 2.2 ...
- 分布式缓存系统Redis原理解析
Redis作为内存数据库已经广泛应用于大数据领域,已经成为分布式架构下的基础组件.本文主要介绍了Redis内部的实现原理包括IO模型.内存管理.数据持久化等以及三种集群架构,旨在了解其中的实现机制. ...
- Elasticsearch大数据量写入调优和原理解析
前言 千万.亿级别数据批量写入ES的调优和原理解析 Elasticsearch version (bin/elasticsearch --version): 7.8 Plugins installed ...
- 秋色园QBlog技术原理解析:性能优化篇:access的并发极限及超级分库分散并发方案(十六)...
上节回顾: 上节 秋色园QBlog技术原理解析:性能优化篇:数据库文章表分表及分库减压方案(十五) 中, 介绍了 秋色园QBlog 在性能优化方面,从技术的优化手段,开始步入数据库设计优化,并从数据的 ...
- 微服务精通之Hystrix原理解析
前言 经过微服务精通之Ribbon原理解析的学习,我们了解到了服务消费者获取服务提供者实例的过程,在这之后,服务消费者会调用服务提供者的接口.但是在调用接口的过程中,我们经常会遇见服务之间的延迟和通信 ...
- Golang-Context扫盲与原理解析
Golang-Context扫盲与原理解析 一.什么是Context? context是一个包,是Go1.7引入的标注库,中文译做上下文,准确的说是goroutine的上下文,包含goroutine的 ...
- OkHttp原理解析(二)
前言 上一篇我们学习了OKHttp的请求执行流程,知道了最终请求流程都会交给getResponseWithInterceptorChain方法来执行,接下来我们就详细分析执行getResponseWi ...
- AsyncTask机制原理解析
AsyncTask机制原理解析 Android为我们提供了2种方便的异步处理方案,Handler和AsyncTask,两种方式适合的场景网上一搜就知道了,但是为什么呢?这篇分析将为你揭晓答案.前面分析 ...
最新文章
- Caddy-基于go的微型serve用来做反向代理和Gateway
- sqlserver 字符转数值_PLC根据寄存器数值查询MySQL/SQLServer数据库,将数据到寄存器...
- hibernate联合主键
- 监控系统选型,这篇不可不读
- ip地址怎么设置才有效_如果想减肥,怎么拆解目标才是有效的?
- 对多线程程序,单核cpu与多核cpu如何工作相关的探讨
- IE浏览器提示对象不支持“append”属性或方法
- M1支持 Accusonus ERA Bundle for mac(音频降噪消除去混音插件包)
- 高通QFIL9008端口刷机报错问题
- 无头浏览器 html5定位,PhantomJS-无头浏览器的妙用
- win10自带的删除电脑流氓弹窗软件工具怎么用
- ERROR 2002 HY000 Can't connect to local MySQL server thro
- termux最新安装kali
- 【算法百题之四十二】罗马数字转整数
- 一分钟明白 VS manifest 原理
- 怎样使用PS制作木刻效果图片?添加木刻特效原来这么简单!
- 彭明盛,Samuel J Palmisano,2010年的工资单
- 假定某计算机的CPU主频为80 MHz,CPI为4,并且平均每条指令访存1.5
- 中华英才网走到这个地步,失败,技术哪去了?
- 大流行清楚地表明,我们需要完全自动化的豪华共产主义