Java中线程池ThreadPoolExecutor原理探究
一、 前言
线程池主要解决两个问题:一方面当执行大量异步任务时候线程池能够提供较好的性能,这是因为使用线程池可以使每个任务的调用开销减少(因为线程池线程是可以复用的)。另一方面线程池提供了一种资源限制和管理的手段,比如当执行一系列任务时候对线程的管理,每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目。
二、 类图结构
Executors其实是个工具类,里面提供了好多静态方法,根据用户选择返回不同的线程池实例。
ThreadPoolExecutor继承了AbstractExecutorService,成员变量ctl是个Integer的原子变量用来记录线程池状态 和 线程池线程个数,类似于ReentrantReadWriteLock使用一个变量存放两种信息。
Integer类型是32位二进制标示,其中高3位用来表示线程池状态,后面 29位用来记录线程池线程个数。
线程池状态含义:
RUNNING:接受新任务并且处理阻塞队列里的任务
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成以后的状态
线程池状态转换:
RUNNING -> SHUTDOWN
显式调用shutdown()方法,或者隐式调用了finalize(),它里面调用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP
显式 shutdownNow()方法
SHUTDOWN -> TIDYING
当线程池和任务队列都为空的时候
STOP -> TIDYING
当线程池为空的时候
TIDYING -> TERMINATED
当 terminated() hook 方法执行完成时候
线程池参数:
corePoolSize:线程池核心线程个数
workQueue:用于保存等待执行的任务的阻塞队列。
比如基于数组的有界ArrayBlockingQueue、,基于链表的无界LinkedBlockingQueue,最多只有一个元素的同步队列SynchronousQueue,优先级队列PriorityBlockingQueue,具体可参考 https://www.atatech.org/artic...
maximunPoolSize:线程池最大线程数量。
ThreadFactory:创建线程的工厂
RejectedExecutionHandler:饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略,比如AbortPolicy(抛出异常),CallerRunsPolicy(使用调用者所在线程来运行任务),DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务),DiscardPolicy(默默丢弃,不抛出异常)
keeyAliveTime:存活时间。如果当前线程池中的线程数量比基本数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间
TimeUnit,存活时间的时间单位
线程池类型:
newFixedThreadPool
创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
newSingleThreadExecutor
创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
newCachedThreadPool
创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列,keeyAliveTime=60说明只要当前线程60s内空闲则回收。这个特殊在于加入到同步队列的任务会被马上被执行,同步队列里面最多只有一个任务,并且存在后马上会拿出执行。
newSingleThreadScheduledExecutor
创建一个最小线程个数corePoolSize为1,最大为Integer.MAX_VALUE,阻塞队列为DelayedWorkQueue的线程池。
其中Worker继承AQS和Runnable是具体承载任务的对象,Worker继承了AQS自己实现了简单的不可重入独占锁,其中status=0标示锁未被获取状态也就是未被锁住的状态,state=1标示锁已经被获取的状态也就是锁住的状态。
DefaultThreadFactory是线程工厂,newThread方法是对线程的一个分组包裹,其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程。
三、 源码分析
3.1 添加任务到线程池exectue方法
如果当前线程池线程个数小于corePoolSize则开启新线程
否则添加任务到任务队列
如果任务队列满了,则尝试新开启线程执行任务,如果线程个数>maximumPoolSize则执行拒绝策略。
重点看addWorkder方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查队列是否只在必要时为空.(1)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//循环cas增加线程个数for (;;) {int wc = workerCountOf(c);//如果线程个数超限则返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//cas增加线程个数,同时只有一个线程成功if (compareAndIncrementWorkerCount(c))break retry;//cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}
}//到这里说明cas成功了,(2)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {//创建workerfinal ReentrantLock mainLock = this.mainLock;w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。mainLock.lock();try {//重新检查线程池状态,为了避免在获取锁前调用了shutdown接口(3)int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加任务workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//添加成功则启动任务if (workerAdded) {t.start();workerStarted = true;}}
} finally {if (! workerStarted)addWorkerFailed(w);
}
return workerStarted;}
代码比较长,主要分两部分,第一部分双重循环目的是通过cas增加线程池线程个数,第二部分主要是并发安全的把任务添加到workers里面,并且启动任务执行。
先看第一部分的(1)
展开!运算后等价于
也就是说下面几种情况下会返回false:
当前线程池状态为STOP,TIDYING,TERMINATED
当前线程池状态为SHUTDOWN并且已经有了第一个任务
当前线程池状态为SHUTDOWN并且任务队列为空
内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。
到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。
3.2 工作线程Worker的执行
先看下构造函数:
这里添加一个新状态-1是为了避免当前线程worker线程被中断,比如调用了线程池的shutdownNow,如果当前worker状态>=0则会设置该线程的中断标志。这里设置了-1所以条件不满足就不会中断该线程了。运行runWorker时候会调用unlock方法,该方法吧status变为了0,所以这时候调用shutdownNow会中断worker线程。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // status设置为0,允许中断boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// 如果线程池当前状态至少是stop,则设置中断标志;// 如果线程池当前状态是RUNNININ,则重置中断标志,重置后需要重新//检查下线程池状态,因为当重置中断标志时候,可能调用了线程池的shutdown方法//改变了线程池状态。if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//任务执行前干一些事情beforeExecute(wt, task);Throwable thrown = null;try {task.run();//执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//任务执行完毕后干一些事情afterExecute(task, thrown);}} finally {task = null;//统计当前worker完成了多少个任务w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//执行清了工作processWorkerExit(w, completedAbruptly);}
}如果当前task为空,则直接执行,否者调用getTask从任务队列获取一个任务执行,如果任务队列为空,则worker退出。private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?retry:
for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果当前线程池状态>=STOP 或者线程池状态为shutdown并且工作队列为空则,减少工作线程个数if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}try {//根据timed选择调用poll还是阻塞的takeRunnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}
}}private void processWorkerExit(Worker w, boolean completedAbruptly){
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//统计整个线程池完成的任务个数
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {completedTaskCount += w.completedTasks;workers.remove(w);
} finally {mainLock.unlock();
}//尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空
//或者当前是stop状态当前线程池里面没有活动线程
tryTerminate();//如果当前线程个数小于核心个数,则增加
int c = ctl.get();
if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);
}}
3.3 shutdown操作
调用shutdown后,线程池就不会在接受新的任务了,但是工作队列里面的任务还是要执行的,但是该方法立刻返回的,并不等待队列任务完成在返回。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {//权限检查checkShutdownAccess();//设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回advanceRunState(SHUTDOWN);//设置中断标志interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {mainLock.unlock();
}
//尝试状态变为TERMINATED
tryTerminate();
}
如果当前状态>=targetState则直接返回,否者设置当前状态为targetState
private void advanceRunState(int targetState) {
for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
设置所有线程的中断标志,主要这里首先加了全局锁,同时只有一个线程可以调用shutdown时候设置中断标志,然后尝试获取worker自己的锁,获取成功则设置中断标示
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}
} finally {mainLock.unlock();
}}
3.4 shutdownNow操作
调用shutdown后,线程池就不会在接受新的任务了,并且丢弃工作队列里面里面的任务,正在执行的任务会被中断,但是该方法立刻返回的,并不等待激活的任务执行完成在返回。返回队列里面的任务列表。
调用队列的drainTo一次当前队列的元素到taskList,
可能失败,如果调用drainTo后队列海不为空,则循环删除,并添加到taskList
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {checkShutdownAccess();//权限检查advanceRunState(STOP);// 设置线程池状态为stopinterruptWorkers();//中断线程tasks = drainQueue();//移动队列任务到tasks
} finally {mainLock.unlock();
}
tryTerminate();
return tasks;
}
调用队列的drainTo一次当前队列的元素到taskList,
可能失败,如果调用drainTo后队列海不为空,则循环删除,并添加到taskList
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}
}
return taskList;
}
3.5 awaitTermination操作
等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}
}
四、总结
线程池巧妙的使用一个Integer类型原子变量来记录线程池状态和线程池线程个数,设计时候考虑到未来(2^29)-1个线程可能不够用,到时只需要把原子变量变为Long类型,然后掩码位数变下就可以了,但是为啥现在不一劳永逸的定义为Long那,主要是考虑到使用int类型操作时候速度上比Long类型快些。
通过线程池状态来控制任务的执行,每个worker线程可以处理多个任务,线程池通过线程的复用减少了线程创建和销毁的开销,通过使用任务队列避免了线程的阻塞从而避免了线程调度和线程上下文切换的开销。
另外需要注意的是调用shutdown方法作用仅仅是修改线程池状态让现在任务失败并中断当前线程,这个中断并不是让正在运行的线程终止,而是仅仅设置下线程的中断标志,如果线程内没有使用中断标志做一些事情,那么这个对线程没有影响。
Java中线程池ThreadPoolExecutor原理探究相关推荐
- java excutorthread_JAVA 线程池ThreadPoolExcutor原理探究
概论 线程池(英语:thread pool):一种线程使用模式.线程过多会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务.这避免了在处理短时间 ...
- Java 线程池(ThreadPoolExecutor)原理分析与使用 – 码农网
线程池的详解 Java 线程池(ThreadPoolExecutor)原理分析与使用 – 码农网 http://www.codeceo.com/article/java-threadpool-exec ...
- Java中线程池,你真的会用吗
转载自 Java中线程池,你真的会用吗 在<深入源码分析Java线程池的实现原理>这篇文章中,我们介绍过了Java中线程池的常见用法以及基本原理. 在文中有这样一段描述: 可以通过Ex ...
- Java中线程池,你真的会用吗?
在<深入源码分析Java线程池的实现原理>这篇文章中,我们介绍过了Java中线程池的常见用法以及基本原理. 在文中有这样一段描述: 可以通过Executors静态工厂构建线程池,但一般不建 ...
- 并发编程五:java并发线程池底层原理详解和源码分析
文章目录 java并发线程池底层原理详解和源码分析 线程和线程池性能对比 Executors创建的三种线程池分析 自定义线程池分析 线程池源码分析 继承关系 ThreadPoolExecutor源码分 ...
- [Java高并发系列(5)][详细]Java中线程池(1)--基本概念介绍
1 Java中线程池概述 1.1 什么是线程池? 在一个应用当中, 我们往往需要多次使用线程, 这意味着我们需要多次创建和销毁线程.那么为什么不提供一个机制或概念来管理这些线程呢? 该创建的时候创建, ...
- java中线程池的使用_Java中线程池的简单使用
什么是线程池? 顾名思义线程池就是线程的容器 举个例子:在没有共享电源的年代,车站有5个人手机都没电且都没有带电源,这五个人想要给手机充电只能去车站的售货亭各花100块钱买一个移动电源:但是现在共享电 ...
- Java 线程池(ThreadPoolExecutor)原理分析与使用
ThreadPoolExecutor原理概述 在我们的开发中"池"的概念并不罕见,有数据库连接池.线程池.对象池.常量池等等.下面我们主要针对线程池来一步一步揭开线程池的面纱. 使 ...
- 一文弄懂Java中线程池原理
在工作中,我们经常使用线程池,但是你真的了解线程池的原理吗?同时,线程池工作原理和底层实现原理也是面试经常问的考题,所以,今天我们一起聊聊线程池的原理吧. 为什么要用线程池 使用线程池主要有以下三个原 ...
最新文章
- 导入导出 SAPSCRIPT 的程序
- SPF Tarjan算法求无向图割点(关节点)入门题
- Java Error(三)
- 图片与Byte相互转换,文件和字节流的转换方法
- JSP和HTML中实现字符串换行
- 解密Arm Neoverse V1 和 Neoverse N2 平台 为下一代基础设施带来计算变革
- Jquery调用ajax参数说明
- java 柯里化_函数式编程(Java描述)——Java中的函数及其柯里化
- Linux下的进程池(1)
- 麻省计算机音乐博士,MIT又一突破!用AI过滤音源,让音乐更悦耳
- 感染暴风一号u盘病毒的解决办法
- Autodesk 3dsMax 2019安装注册教程
- cs1.6国内正版服务器,2021最新CS1.6 HLDS 8684 纯净比赛服务端(Win版)
- FastDFS文件存储系统
- ls 命令显示的total是什么意思
- SQL Server卸载不干净和重新安装问题
- List of file signatures
- Exchange控制台错误:WinRM客户端已将请求发送到HTTP服务器
- Golang处理excel用流式写入,追加行数据
- 介绍18650锂离子电池的命名规则