Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战
一.引言
使用 executor 线程池时经常用到 shutdown / shutdownNow + awaitTermination 方法关闭线程池,下面看下几种方法的定义与常见用法。
二.API 释义
1.shutdown
/*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution. Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}
shutdown 主要工作如下:
Initiates an orderly shutdown in which previously submitted
tasks are executed - 之前提交的继续执行
but no new tasks will be accepted. - 不再接收新任务
This method does not wait for previously submitted tasks to
complete execution. - 该方法不会等待以前提交的任务完成,可以配合 awaitTermination 方法等待。
2.shutdownNow
/*** Attempts to stop all actively executing tasks, halts the* processing of waiting tasks, and returns a list of the tasks* that were awaiting execution. These tasks are drained (removed)* from the task queue upon return from this method.** <p>This method does not wait for actively executing tasks to* terminate. Use {@link #awaitTermination awaitTermination} to* do that.** <p>There are no guarantees beyond best-effort attempts to stop* processing actively executing tasks. This implementation* cancels tasks via {@link Thread#interrupt}, so any task that* fails to respond to interrupts may never terminate.** @throws SecurityException {@inheritDoc}*/public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}
shutdownNow 主要工作如下:
Attempts to stop all actively executing tasks - 尝试停止正在执行的任务
halts the processing of waiting tasks - 停止等待的任务
and returns a list of the tasks that were awaiting execution - 返回等待列表的任务
These tasks are drained (removed) from the task queue upon return from this method. - 从该方法返回时,将这些任务从队列中删除
Tips:
该任务不会等待主动执行的任务终止,可以配合 awaitTermination 方法等待。
该方法尽可能停止主动执行的任务,通过 Thread.interrupt 实现,未能响应中断的任务可能不会停止
该方法与 shutdown 差别在 interruptIdleWorkers 和 interruptWorkers,后者会调用 interrutp 方法到正在执行的 worker 上,而前者只会取消等待的任务。
3.awaitTermination
* Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}
awaitTermination 方法会等待线程到达 TERMINATED 即已终止的状态。如果线程池已经关闭,则直接返回 true;如果线程池未关闭,该方法会根据 Timeout + TimeUnit 的延时等待线程结束,并根据到期后的线程池状态返回 true 或者 false,注意该方法不会关闭线程池,只负责延时以及检测状态。
4.runState
A.状态
线程 runState 的几种状态与转换
RUNNING:接受新任务并处理排队的任务
SHUTDOWN:不接受新任务,但处理排队的任务
STOP:不接受新任务,不处理排队任务,并中断正在进行的任务
TIDYING:所有任务都已终止,workerCount 为零,线程转换到状态 TIDYING,将运行 terminate() Hook
TERMINATED:终止()已完成
B.转换
RUNNING -> SHUTDOWN : 在调用 shutdown() 时,可能隐含在 finalize() 中
(RUNNING or SHUTDOWN) -> STOP : 调用 shutdownNow()
SHUTDOWN -> TIDYING:当队列和池都为空时
STOP -> TIDYING:当池为空时
TIDYING -> TERMINATED:当 terminate() 钩子方法完成时
三.实践
1.shutdown + awaitTermination(500ms)
processNumBuffer 为 Runnable 内的逻辑,针对给定的一批数字求出最小,最大值并返回结果字符串保存,共 500000 个数字,每 50000 个数字生成一个 Runnable。执行逻辑后调用 shutdown + awaitTermination。getCurrentThread 方法负责打印当前的可用线程,用来观测调用 shutdown 和 awaitTermination 后线程池中线程的变化。
import java.util.concurrent.{CopyOnWriteArraySet, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}object ExecutorPoolShutdown {// Runnable 内执行逻辑,寻找上下界def processNumBuffer(nums: Array[Int], taskId: Long): String = {val maxNum = nums.maxval minNum = nums.minval log = s"TaskId: $taskId Length: ${nums.length} Min: $minNum Max: $maxNum"log}def main(args: Array[String]): Unit = {// 存储所有 Task 的日志val logAll = new StringBuffer()// 存储所有可用的 TaskIdval taskSet = new CopyOnWriteArraySet[Long]()// 初始化线程池val executor = new ThreadPoolExecutor(6, 10, 3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())val numIterator = (0 until 500000).iterator// 每50000个数据生成一个 TasknumIterator.grouped(50000).foreach(group => {executor.execute(new Runnable() {override def run(): Unit = {val taskId = Thread.currentThread().getIdtaskSet.add(taskId) // 添加 taskIDval res = processNumBuffer(group.toArray, taskId)logAll.append(res + "\n") // 添加统计日志}})})// 调用 shutdownexecutor.shutdown()// 获取当前线程println("After shutdown...")getCurrentThread()val st = System.currentTimeMillis()// 调用 awaitTerminatioval state = executor.awaitTermination(500, TimeUnit.MILLISECONDS)val cost = System.currentTimeMillis() - stprintln(s"Executor State: $state Cost: $cost")// 再次获取当前线程println("After awaitTermination...")getCurrentThread()println(logAll.toString)println(taskSet.toArray().mkString(","))}
}
由于任务逻辑比较简单,调用 shutdow 和 awaitTermation 后线程数均为 5 且是和 pool 无关的线程,说明线程池的 Task 在 shutdow 后就已经全部运行完毕了,这点从 awaitTermation 返回的状态为 true 和等待时间为 0 也可以看出来。最后就是 500000 / 50000 = 10 共计10条 log,再次说明 task 都执行完毕,最后显示本次任务执行中共使用了 6 个 Task。
Tips:
shutdown 后如果线程池已经结束,则 awaitTermation 方法不会等待,直接返回 true。
2.任务延时 + shutdown + awaitTermination(500ms)
上面的任务执行速度求最大最小值,执行速度很快,所以不好看到 shutdown 和 awaitTerminatioN 的作用,下面模拟一个运行时间稍长的任务,在原始 Runnable 中加入 sleep(2000),延长程序2s的运行时间:
numIterator.grouped(50000).foreach(group => {executor.execute(new Runnable() {override def run(): Unit = {val taskId = Thread.currentThread().getIdtaskSet.add(taskId)Thread.sleep(2000)val res = processNumBuffer(group.toArray, taskId)logAll.append(res + "\n")}})})
再次运行:
这次的结果和上面完全不同,首先是不管是调用 shutdown 还是 awaitTermination,可以看到活跃线程中都包含 pool-1-thread-x,即这两个 API 调用后逻辑仍在运行没有结束;再看 awaitTermination 返回的状态为 false,代表线程池未完全关闭,cost=507ms,等待 500ms 后线程池仍未完全退出,但是主线程已经结束,所以 LogAll 里没有日志加入,最后只打印出了使用过的 TaskId。
Tips:
shutdown 后线程仍在继续运行,对应前面提到的 shutdown 之后当前运行的任务继续执行,只不过不会增加新任务,而 awaitTermination 后线程依然活跃,对应前面的 awaitTermination 方法只返回线程池关闭状态,不会关闭线程池。
3.任务延时 + shutdown + awaitTermination(2000ms)
调整 awaitTermination 的等待时间,从500ms提升至2000ms
val st = System.currentTimeMillis()val state = executor.awaitTermination(2000, TimeUnit.MILLISECONDS)val cost = System.currentTimeMillis() - stprintln(s"Executor State: $state Cost: $cost")
再次运行:
和上面相比,executor 的关闭状态返回的仍然是 false,但是等到的时间延迟至约 2000ms,可以看到随着等待时间增加,一部分 task已经完成了,但是并没有全部完成。将延时时间修改为 5000ms 再次运行:
等到 4000ms 时 executor 就结束了,所以 awaitTermination 返回为 true,后续也没有 pool-1-thread-x 相关的 task,最终的输出 log 也完整。
4.shutdownNow + awaitTermination(500ms)
shutdownNow 相比 shutdown 会多一个返回值,即等待列表的任务。
val tasks = executor.shutdownNow()tasks.asScala.foreach(task => {println(task)})
运行一下:
由于任务运行很快,所以快速任务下,shutdown 和 shutdownNow 结果相同,awaitTermination 返回为 ture 且未等待。
5.任务延时 + shutdownNow + awaitTermination(500ms)
numIterator.grouped(50000).foreach(group => {executor.execute(new Runnable() {override def run(): Unit = {val taskId = Thread.currentThread().getIdtaskSet.add(taskId)Thread.sleep(2000)val res = processNumBuffer(group.toArray, taskId)logAll.append(res + "\n")}})})
任务增加 sleep 2000ms 后再运行一下:
最上面的任务显示 :
Caused by: java.lang.InterruptedException: sleep interrupted
即任务 sleep 期间被 interrupt 了,所以执行的 task 结束,和 shutdown 相比,正在执行的线程并不会被 interrupt;下面打印出来4个 Runnable,因为这四个 Task 还在等待队列中,shutdownNow 直接把他们返回了;最后下面因为 executor 已经关闭,所以状态为 true,等待时间为0。
6.任务延时 + awaitTermination(xxxms) + shutdownNow
通过上面 shotdownNow + awaitTermination 的示例中可以看到,如果任务不能很快执行,那么调用 shotdownNow 的结果就是所有 task 都没结束,任务没有任何改动。如果希望对任务设定一定期间,能完成多少完成多少,可以调整顺序,修改为先 awaitTermination 再 shutdownNow:
val st = System.currentTimeMillis()val state = executor.awaitTermination(3000, TimeUnit.MILLISECONDS)val cost = System.currentTimeMillis() - stprintln(s"Executor State: $state Cost: $cost")println("After awaitTermination...")getCurrentThread()val tasks = executor.shutdownNow()tasks.asScala.foreach(task => {println(task)})println("After shutdown...")getCurrentThread()
调整完顺序后再次运行:
等待时间设置为 3000ms,相当于你对你的任务要求是: 3000ms 内能跑完多少算多少,没跑完就不要了;可以看到 awaitTermination 到期后返回状态为 false,说明线程内的任务还未全部结束;再看下面 shutdownNow 后,线程里已经不存在 pool-1-thread-x ,且打印出部分结果,共计6条;最下面是 interrupt 其他正在运行的 task 打印的异常栈,最终程序 exit(0) 正常退出。
四.总结
经过上面的代码分析,对几个方法进行一下总结:
shutdown : 等待执行的任务执行,不再添加新任务
shutdownNow:interrupt 当前执行的任务,不再添加新任务,返回等待的任务
awaitTermination:不影响线程池开关状态,只返回状态,可以堵塞线程等待一定时间
可以结合上面的6个例子以及自己任务的耗时和容忍度,决定怎么组合上面三个 API,如果一定要等到 executor 内的 task 都运行完毕再关闭 executor 且不好估算内部 task 运行时间,可以采用如下操作:
executor.shutdown()while (!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) {println("Task is Running...")}println("Task is Finish!")
通过 while true 保证线程池内 task 都运行完毕才进行后续操作,不过需要注意 Task 内部不要有死循环,否则会导致无法跳出该 While 循环,整个程序堵塞在这里。
Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战相关推荐
- (!详解 Pytorch实战:①)kaggle猫狗数据集二分类:加载(集成/自定义)数据集
这系列的文章是我对Pytorch入门之后的一个总结,特别是对数据集生成加载这一块加强学习 另外,这里有一些比较常用的数据集,大家可以进行下载: 需要注意的是,本篇文章使用的PyTorch的版本是v0. ...
- t检验特征筛选详解及实战
t检验特征筛选详解及实战 数据的种类 我们都知道,一般数据可以分为两类,即定量数据(数值型数据)和定性数据(非数值型数据),定性数据很好理解,例如人的性别,姓名这些都是定性数据.定量数据可以分为以下几 ...
- 使用pickle保存机器学习模型详解及实战(pickle、joblib)
使用pickle保存机器学习模型详解及实战 pickle模块实现了用于序列化和反序列化Python对象结构的二进制协议. "Pickling"是将Python对象层次结构转换为字节 ...
- 数据不平衡(class_weight、评估方法、上采样、下采样)、详解及实战
数据不平衡(class_weight.评估方法.上采样.下采样).详解及实战 核心学习函数方法: np.random.choice() np.where() np.concatenate() np.v ...
- 标称变量(Categorical Features)或者分类变量(Categorical Features)缺失值填补、详解及实战
标称变量(Categorical Features)或者分类变量(Categorical Features)缺失值填补.详解及实战 核心学习函数或者方法: KNeighborsClassifier() ...
- 编码字典类特征、使用sklearn的DictVectorizer方法将字典组成的列表转换成向量、详解及实战
编码字典类特征.使用sklearn的DictVectorizer方法将字典组成的列表转换成向量.详解及实战 sklearn.feature_extraction.DictVectorizer() 把字 ...
- AdaBoostClassifer详解及实战
AdaBoostClassifer详解及实战 AdaBoost算法是Adaptive Boost的简称,属于Boosting类算法. 随机森林是一种经典的Bagging算法. Boosting通过将一 ...
- 什么是奇异值?奇异值分解是什么?SVD分解详解及实战
什么是奇异值?奇异值分解是什么?SVD(Singular Value Decomposition)分解详解及实战 TSVD:Truncated Singular Value Decomposition ...
- pandas读写MySQL数据库详解及实战
pandas读写MySQL数据库详解及实战 SQLAlchemy是Python中最有名的ORM工具. 关于ORM: 全称Object Relational Mapping(对象关系映射). 特点是操纵 ...
最新文章
- 病理分析常见数据集及常用模型方法总结
- 分区操作后索引的状态
- Python爬虫项目--爱拍视频批量下载
- web工程was部署
- 央视影音 for Mac 1.2.1 中文版 – CCTV和地方卫视直播软件
- c语言99乘法表流程图表,C语言做99乘法表.doc
- 工业软件下载大全202108
- python阿拉伯数字转中文_阿拉伯数字转换成中文的python代码
- 计算机连接网络是飞行模式怎么办,电脑wifi界面只有飞行模式怎么办
- 做IT精英还是IT民工? 从事IT只是个体力活
- offsetX,offsetLeft,offsetWidth的区别详解
- #编写一个函数,实现接受一个字符串,分别统计大写字母、小写字母、数字、其他字符的个数,并且返回结果
- python安装报错,Windows 7 Service Pack 1 and all applicable updates
- 图像分割:DeepLabV3与DeepLabV3+介绍
- PCI Express学习篇---物理层电气特性(三)Transmitter Compliance Test
- 74HC1G66模拟开关,多路复用
- mysql中的mul
- 计算机网络授时设置,网络授时系统,网络校时系统
- VC++图片类型的识别(附源码)
- 前端学习之理解304过程
热门文章
- 鲲云科技助力双瑞风电数字化转型,开启安全生产“智慧眼”
- 小学生职业体验计算机拆装,小小飞行员职业体验——记小学部的花样探究
- 空间计量软件代码资源集锦(Matlab/R/Python/SAS/Stata)
- 南京信息工程大学计算机专业是几本,南京信息工程大学是几本?优势专业有哪些...
- 基于JAVA网上求职招聘系统计算机毕业设计源码+数据库+lw文档+系统+部署
- 数据结构初阶最终章------>经典八大排序(C语言实现)
- CSS中设置鼠标样式
- Java中接口的实现
- 解决JasperReport在Linux系统下中文字体无法显示的问题
- kindeditor在服务器上上传图片显示叉叉,什么原因?,kindeditor=4.1.5 文件上传漏洞利用...