Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码!
上一篇文章中,我们介绍了:Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性。这一篇文章,我们将会介绍ThreadPoolExecutor线程池的execute方法源码,该方法是线程池的核心方法,非常重要。
系列文章:
- Java Executor源码解析(1)—Executor执行框架的概述
- Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性【一万字】
- Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
- Java Executor源码解析(4)—ThreadPoolExecutor线程池submit方法以及FutureTask源码【一万字】
- Java Executor源码解析(5)—ThreadPoolExecutor线程池其他方法的源码
- Java Executor源码解析(6)—ScheduledThreadPoolExecutor调度线程池源码解析【一万字】
- Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池
文章目录
- 1 execute核心提交方法
- 2 addWorker尝试添加新线程
- 2.1 addWorkerFailed添加Worker失败处理
- 2.1.1 tryTerminate尝试终止线程池
- 2.1.1.1 interruptIdleWorkers中断空闲线程
- 3 Worker线程包装类
- 3.1 runWorker执行工作
- 3.1.1 getTask拉取任务
- 3.1.2 processWorkerExit清理/补充Worker
1 execute核心提交方法
public void execute(Runnable command)
传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出RejectedExecutionException;如果任务为null,那么抛出NullPointerException。
execute方法是线程池的绝对核心方法,很多其他内部方法都是为该方法服务的,涉及到的流程和代码非常复杂。
execute方法的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 首先就是command任务的null校验,如果为null直接抛出NullPointerException;
- 调用workerCountOf计算运行线程数量,如果小于corePoolSize,即目前在行线程数小于核心线程数:
- 调用addWorker方法尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限。如果任务提交给新线程成功,那么直接返回true。
- 到这一步,肯定是addWorker方法返回false。表示可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。
- 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。判断线程池是否是RUNNING状态,以及尝试加入任务队列。如果线程池还是RUNNING状态并且成功加入任务队列:
- 再次检查线程池是否不是RUNNING状态,以及尝试将任务移除任务队列。若果线程池不是RUNNING状态,并且将任务移除任务队列成功,那么对任务执行拒绝策略。
- 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。判断当前工作线程数是否为0,如果是,尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,这一步为了维持线程池的活性。 因为有可能在新任务offer加入队列的时候,其他工作线程都因为超时或者SHOTDOWN而被清理,此时仍然需要新开线程对加入进来的任务进行完成。
- 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了。尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限。如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize,那么执行拒绝策略。
上面的判断都没有加锁,因此状态可能都是瞬时性的判断,不保证一直有效。
/*** 传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。* <p>* 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException;* 如果任务为null,那么抛出NullPointerException。** @param command 要执行的任务* @throws RejectedExecutionException 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException* @throws NullPointerException 如果任务为null*/
public void execute(Runnable command) {//首先就是command任务的null校验,如果为null直接抛出NullPointerExceptionif (command == null)throw new NullPointerException();//获取ctl的值cint c = ctl.get();/*1 调用workerCountOf计算线程数量,如果小于corePoolSize*/if (workerCountOf(c) < corePoolSize) {//尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限if (addWorker(command, true))return;//到这里还没有返回,那么表示addWorker失败。//可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等情况//重新获取ctl的值cc = ctl.get();}/** 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,* 或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。** 2 如果线程池还是RUNNING状态,并且成功加入任务队列*/if (isRunning(c) && workQueue.offer(command)) {//获取ctl的值cint recheck = ctl.get();/*再次检查,如果线程池不是RUNNING状态,并且将任务移除队列成功*/if (!isRunning(recheck) && remove(command))//执行拒绝策略reject(command);/** 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。* 判断如果当前工作线程数为0*/else if (workerCountOf(recheck) == 0)//尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,无论成功还是失败//这一步为了维持线程池的活性。addWorker(null, false);}/** 3 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了** 尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限* 如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize*/else if (!addWorker(command, false))//执行拒绝策略reject(command);
}/**1. 该方法用于执行拒绝策略*/
final void reject(Runnable command) {handler.rejectedExecution(command, this);
}
2 addWorker尝试添加新线程
addWorker尝试新建一个工作线程(Worker)并启动去执行任务,同样作为线程池的核心方法。大概步骤为:检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。
addWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 开启一个死循环,相当于自旋,使用retry标记。检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环。
- 获取此时最新的ctl值c,通过c获取此时线程池的状态rs。
- 线程池状态校验:如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行状态,并且(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空)这三个条件有一个成立,那么直接返回false,adderWorker失败。即只有RUNNING状态,或者SHUTDOWN状态并且不是新添加任务的请求并且任务队列不为空,满足这两种情况的一种,才进行下一步;
- 到这一步,说明通过判断线程池状态校验通过。内部再开启一个死循环:首先校验线程数量,如果不符合规则,那么直接返回false。随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步。失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可。
- 获取此时最新的线程数量wc。
- 线程数量校验:如果wc大于等于CAPACITY(最大线程数),或者wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)。满足两种条件的一个,那么直接返回false,表示线程数量不符合要求。
- 到这一步,表示满足线程数量要求。那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1,CAS操作成功之后,直接break retry跳出外层循环,这一步就算完成了,进入下一步尝试新增Worker。
- 到这一步返回跳出循环,表示CAS失败,重新获取最新的ctl值c。
- 继续判断如果线程池运行状态不等于rs,线程池的状态改变了,我们知道线程池的状态是不可逆的,那么continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出。
- 到这一步还没跳出循环,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可。
- 到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker并启动线程的逻辑。
- workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动。workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入。w表示需要新增的Worker对象,初始化为null。
- 开启一个try代码块:
- 新建一个Worker赋给w,传入firstTask参数,作为将要执行的第一个任务,在构造器中还会通过线程工厂初始化一个新线程。
- 获取w内部的新线程t。如果t不为null:
- 下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要获取mainLock锁,成功之后进行下一步并开启一个try代码块:
- 获取此时的线程池运行状态值rs。
- 首先进行的是再次的检查线程池状态,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止。,如果rs小于SHUTDOWN,即属于RUNNING状态,或者rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)满足这两个条件中的一个,才可以真正的开启线程:
- 继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法。那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常。
- 上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法。
- 获取workers的数量s,就表示目前工作线程数量,如果s大于largestPoolSize,即大于历史最大线程数量,那么largestPoolSize更新为s。
- workerAdded置为true,表示新增的工作线程已加入workers集合。
- 最终,无论上面的代码是成功了还是发生了异常,都需要在finally中解锁mainLock。
- 解锁成功之后的代码。如果workerAdded为true,即新增的工作线程已加入workers集合,说明操作完全没问题,一切正常:
- 那么启动线程t,线程t将会执行Worker中的run方法。workerStarted置为true,表示新增的工作线程已启动。
- 下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要获取mainLock锁,成功之后进行下一步并开启一个try代码块:
- 最终,无论上面的try代码是成功了还是发生了异常,都走finally语句:
- 如果workerStarted为false,即新增的工作线程最终没能启动成功,那么调用addWorkerFailed对此前可能做出的改变进行“回滚”。否则finally中什么也不做。
- 返回新增的工作线程已启动的标志workerStarted,如果已启动线程,表示新增工作线程成功,否则表示新增工作线程失败。
/**1. 检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,2. 首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。3. 如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。4. 5. @param firstTask 新线程应首先运行的任务,如果没有,则为null,将会从队列中获取任务6. 当线程数量少于corePoolSize时,总会启动一个线程来执行新添加的任务,或者当队列已满时,同样可能需要新启动要给线程来执行新添加的任务7. @param core 如果为true,那么使用corePoolSize作为线程数量边界大小,否则使用maximumPoolSize作为线程数量边界大小8. 这里不是用具体的参数值而是boolean类型,是因为,比较大小的时候需要随时获取最新值9. @return 添加Worker成功,返回true;否则返回false*/
private boolean addWorker(Runnable firstTask, boolean core) {/** 1 开启一个死循环,相当于自旋* 检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,* 如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环*/retry:for (; ; ) {//获取此时的ctl值int c = ctl.get();//通过ctl获取此时线程池的状态rsint rs = runStateOf(c);// Check if queue empty only if necessary./** 如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行态* 并且 !(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空),即这三个中至少有一个不成立* 那么直接返回false,adderWorker失败** 分析一下:* 1 首先是要线程状态不是RUNNING运行态了,那么就是后面的状态比如SHUTDOWN、STOP等,此时才有可能直接返回,运行态是不会在这里返回的* 2 随后,如果线程状态rs等于SHUTDOWN,那么不一定返回,因为此时任务队列可能存在任务,此时可以新增线程去执行;如果大于SHUTDOWN,那么一定返回,即所有放弃任务* 继续,如果firstTask为null,那么不一定返回,因为此时可能存在新添加到队列的任务,需要去执行;如果不为null,那么一定是新添加的任务,那么不接受该任务,直接返回* 继续,如果workQueue.isEmpty()返回false,那么不一定返回,因为此时可能存在新添加到队列的任务,需要去执行;如果为空,那么同样直接返回,因为线程池停止且没有任务可执行了** 即,如果非RUNNING状态,并且是SHUTDOWN状态且firstTask为null(不是新添的任务)并且任务队列不为空* 那么对于SHUTDOWN状态,此时需要线程去执行已经添加到任务队列中的任务,因此不会在这里返回;如果是其他状态,那么直接返回false即可*/if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null &&!workQueue.isEmpty()))return false;/** 到这一步,说明通过判断线程状态不需要返回,但是这里的判断没有加锁,因此这是不一定的,继续校验** 内部再开启一个循环:* 首先校验线程数量,如果不符合规则,那么直接返回false* 随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步* 失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验* 如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可*/for (; ; ) {//获取此时的线程数量wcint wc = workerCountOf(c);/** 如果wc大于等于CAPACITY最大线程数* 或者 wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)* 那么直接返回false,表示线程数量不符合要求** 分析一下:* 1 如果线程数量大于等于最大线程数量,那么肯定直接返回false,这表示线程数量达到了最大值,不能够添加新线程了* 2 如果线程数量大于等于边界线程数量,那么同样直接返回false,这表示不符合最初调用addWorker的情景条件,不能够添加新线程了* 在调用addWorker方法新增线程之前,如果线程数小于corePoolSize,那么core参数为true;如果如果线程数大于等于corePoolSize,那么core参数为false* 如果core参数为true,通常希望在新增线程之后线程数要小于等于corePoolSize;如果core参数为false,那么要求在新增线程之后线程数要小于等于maximumPoolSize* 用于将两种情况区分开来,同时在其他地方,比如核心线程预启动方法中非常有用*/if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//到这一步,表示满足线程数量要求//那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1if (compareAndIncrementWorkerCount(c))//CAS操作成功之后,直接break retry跳出外层循环,进入下一步,新增Workerbreak retry;//到这里,表示CAS失败,重新获取最新的ctl值cc = ctl.get(); // Re-read ctl//如果线程池运行状态不等于rs,线程池的状态改变了,我们直到线程池的状态是不可逆的,那么//continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop//到这里,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可}}//到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker的逻辑//workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动boolean workerStarted = false;//workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入boolean workerAdded = false;//w表示需要新增的Worker对象,初始化为nullWorker w = null;try {//新建一个Worker,传入firstTask参数,作为将要执行的第一个任务//在构造器中还会通过线程工厂初始化一个新线程w = new Worker(firstTask);//获取w内部的新线程tfinal Thread t = w.thread;//如果t不为nullif (t != null) {final ReentrantLock mainLock = this.mainLock;//下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要加锁//获取mainLock锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.//获取锁之后首先进行的是再次的检查,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止//获取此时的线程池运行状态值rsint rs = runStateOf(ctl.get());/** 如果rs小于SHUTDOWN,即属于RUNNING状态* 或者 rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)** 满足这两个条件中的一个,才可以真正的开启线程*/if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {/** 继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法* 那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常* 这里也要求线程工厂返回的线程,仅仅是一个Thread对象即可,不能够start启动而帮倒忙!*/if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法workers.add(w);//获取workers的数量s,就表示目前工作线程数量int s = workers.size();//如果s大于largestPoolSize,即大于历史最大线程数量if (s > largestPoolSize)//那么largestPoolSize更新为slargestPoolSize = s;//workerAdded置为true,表示新增的工作线程已加入workers集合workerAdded = true;}} finally {//最终,无论上面的代码是成功了还是发生了异常,都需要解锁mainLock.unlock();}//解锁成功之后,如果新增的工作线程已加入workers集合,说明操作完全没问题,一切正常if (workerAdded) {//那么启动线程t,这是线程t将会执行Worker中的run方法t.start();//workerStarted置为true,表示新增的工作线程已启动workerStarted = true;}}} finally {//最终,无论上面的代码是成功了还是发生了异常,都走finally语句块//如果workerStarted为false,即新增的工作线程最终没能启动成功if (!workerStarted)//那么调用addWorkerFailed对此前可能做出的改变进行“回滚”addWorkerFailed(w);}//返回新增的工作线程已启动的标志,如果已启动,表示新增工作线程成功,否则表示新增工作线程失败return workerStarted;
}
2.1 addWorkerFailed添加Worker失败处理
addWorkerFailed方法仅仅在addWorker方法中被调用,表示添加Worker失败时的回滚操作,传递的参数就是代表新增Worker的w变量。
addWorkerFailed的详细步骤:
- 获取mainLock锁;
- 如果w不为null,即Worker被添加到了workers中,那么从workers中移除w;
- 调用decrementWorkerCount,循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 此时线程池可能不是RUNNING状态,那么调用tryTerminate方法尝试将线程池状态设置为TERMINATED彻底结束线程池。
- 最终解锁。
/**1. 仅仅在addWorker方法中被调用,处理添加Worker失败时的回滚:2. 1 如果Worker被添加到了workers中,那么移除Worker3. 2 workerCount自减14. 3 此时线程池可能不是RUNNING或者SHUTDOWN状态,那么尝试将线程池状态设置为TERMINATED彻底结束线程池5. 6. @param w 新建的Worker*/
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;//获取mainLock锁mainLock.lock();try {//如果w不为nullif (w != null)//从workers中移除w,这是BlockingQueue的方法workers.remove(w);//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。decrementWorkerCount();//此时线程池可能不是RUNNING状态,那么尝试将线程池状态设置为TERMINATED彻底结束线程池tryTerminate();} finally {mainLock.unlock();}
}
2.1.1 tryTerminate尝试终止线程池
tryTerminate方法在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用,这几个方法被调用时,一般都是涉及到线程池状态改变、移除workers集合中的线程、移除任务队列中的任务的情况,该方法用于尝试终止线程池(将线程池状态将尝试转换为TERMINATED)。
tryTerminate的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 开启一个死循环,尝试终止线程池:
- 获取此时最新的ctl值c;
- 线程池状态校验:如果处于RUNNING状态,不能终止线程池;或者如果运行状态值大于等于TIDYING,不需要该请求去终止线程池;或者如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池。以上三个条件满足一种,即可立即返回。
- 到这里,表示:线程池位于STOP状态,或者线程池位于SHUTDOWN状态且workQueue任务队列为空,上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断。
- 如果workerCount线程计数不为0,那么不能终止线程池。此这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断;当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断。
- 执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要调用interruptIdleWorkers (ONLY_ONE)尝试对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们都被中断并移除。随后返回。
- 到这里表示工作线程数为0,那么表示可以尝试更改线程池状态,进一步停止线程池。首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程。
- 开启一个try代码块:
- 尝试将ctl的值CAS的从c更改为TIDYING状态的值,即转换为TERMINATED状态。如果CAS成功:
- 在另一个try块中执行terminated()钩子方法,该方法是空实现,可以由子类重写。
- 无论terminated是否抛出异常,最终都执行finally块:
- 将ctl的值CAS的从TIDYING更改为TERMINATED状态的值,即转换为TERMINATED状态,表示线程池被彻底停止。
- 唤醒所有调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭。
- 方法返回。
- 尝试将ctl的值CAS的从c更改为TIDYING状态的值,即转换为TERMINATED状态。如果CAS成功:
- 最终需要在finally块中解锁。
- 到这一步还没有返回,表示CAS操作更改TIDYING失败了,可能是其他线程操作成功了,因此继续下一次循环。如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出。这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成。
/*** 以下两种情况,线程池状态将尝试转换为TERMINATED* 1 SHUTDOWN状态的线程,当任务队列为空并且线程池工作线程数workerCount为0时* 2 STOP状态的线程池,线程池中工作线程数workerCount为0时* <p>* 该方法不是私有的,因为子类ScheduledThreadPoolExecutor也会调用该方法*/
final void tryTerminate() {/*开启一个死循环,尝试终止线程池*/for (; ; ) {//获取此时最新的ctl值cint c = ctl.get();/** 如果处于RUNNING状态,不能终止线程池* 或者 如果运行状态值大于等于TIDYING,不需要该请求去终止线程池* 或者 如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池** 以上三个条件满足一种,即可立即返回*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))return;/** 到这里,表示:* 1 线程池位于STOP状态* 2 或者线程池位于SHUTDOWN状态,且workQueue任务队列为空** 上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断** 如果线程数不为0,那么不能终止线程池* 这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断* 当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断* 执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们中断*/if (workerCountOf(c) != 0) { // Eligible to terminate/** 那么调用interruptIdleWorkers(ONLY_ONE)尝试中断最多一个线程** tryTerminate在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用* 这几个方法被调用时,一般都是需要改变线程池状态、移除workers集合的线程、移除任务队列中的任务的情况* 这里是补偿式的尝试中断此时又处于空闲状态的线程,用来方便的传播SHUTDOWN、STOP信号,因为被中断的线程会在processWorkerExit方法中继续调用该方法* 防止当前所有线程都在等待任务,中断任何任意一个的线程就还可以确保自SHUTDOWN状态开始以来新到达的Worker最终也会退出。* 为了保证所有线程中断,仅需要中断一个空闲的线程即可,后续中断状态会进行传播式的调用*/interruptIdleWorkers(ONLY_ONE);//返回return;}final ReentrantLock mainLock = this.mainLock;//到这里表示工作线程数为0,那么表示可以更改线程池状态,进一步停止线程池。//首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程mainLock.lock();try {//尝试将ctl的值CAS的从c更改为TIDYING状态的值//即删除全部wc工作线程计数部分的值,转换为TERMINATED状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//如果成功try {//执行terminated()钩子方法,该方法是空实现,可以由子类重写terminated();} finally {//无论terminated是否抛出异常,最终,都将ctl的值CAS的从TIDYING更改为TERMINATED状态的值//即转换为TERMINATED状态ctl.set(ctlOf(TERMINATED, 0));//唤醒因为调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭termination.signalAll();}//返回return;}} finally {//解锁mainLock.unlock();}// else retry on failed CAS/** 到这一步,表示CAS操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)失败,可能是其他线程操作成功了,因此继续下一次循环* 如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出* 这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成*/}
}
2.1.1.1 interruptIdleWorkers中断空闲线程
尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程,以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow()),或者配置被更改的信息(比如setMaximumPoolSize())。
shutdown()方法或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()或者tryTerminate等方法中会调用。
代码很简单,主要是理解onlyOne参数的意思,如果为true,那么最多中断一个Woker;如果为false,那么尝试中断所有空闲线程。代码中循环workers集合,如果当前Worker没有被中断,并且尝试获取Worker锁成功(此前没有获取Worker锁),那么就是中断该线程,随后根据onlyOne参数确定是否跳出集合。
/*** 尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程* 以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow())* 或者配置被更改的信息(比如setMaximumPoolSize())** @param onlyOne 如果为true,那么最多中断一个Woker。* 在tryTerminate方法中启动了终止状态但是还存在Worker的时候会调用并传递true* 补偿式的尝试中断此时处于空闲状态的线程,用来方便的传播SHUTDOWN、STOP信号,* 因为被中断的线程会在processWorkerExit方法中继续调用该方法* 防止当前所有线程都在等待任务,中断任何任意一个的线程就还可以确保自SHUTDOWN状态开始以来新到达的Worker最终也会退出。* 为了保证所有线程中断,仅需要中断一个空闲的线程即可,后续中断状态会进行传播式的调用* <p>* 如果为false,那么尝试中断所有空闲线程* 在shutdown()或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()等方法中会调用*/
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;//获取mainLock锁mainLock.lock();try {/*遍历workers集合*/for (Worker w : workers) {//获取w的线程tThread t = w.thread;//如果t没有没中断,并且尝试获取Worker锁成功if (!t.isInterrupted() && w.tryLock()) {try {//如果获取锁成功,那么表示此线程就是空闲线程,那么线程t被中断t.interrupt();} catch (SecurityException ignore) {} finally {//释放Worker锁w.unlock();}}//如果onlyOne为true,那么就跳出循环了,否则就一直循环遍历所有Workerif (onlyOne)break;}} finally {//释放mainLock锁mainLock.unlock();}
}
3 Worker线程包装类
线程池中的一个工作线程就被包装为一个Worker的对象,实现了Runnable 接口,可以看作线程任务。
Worker还继承了AbstractQueuedSynchronizer(AQS),自己实现了简单的不可重入的独占锁,state=0 表示锁未被获取状态, state=l 表示锁己经被获取的状态。这里的实现的独占“锁”,实际上就是用来控制该线程是否可以被中断的,获取锁的线程可以看作是正在工作中的线程,没有获取到锁的线程可以看作空闲线程。
不可重入是为了保护正在执行任务的线程(已经获取到了锁)对应的的w锁不会被被其他比如setCorePoolSize、setMaximumPoolSize、shutdown等线程池控制方法再次获取并中断。
另外刚创建Worker时state状态则被设置为-l,这是为了避免该新增的Worker的线程在运行runWorker()方法前就被中断(包括shutdown和shutdownNow),即还没处理过任务是不允许被中断的。
它的构造器中会调用newThread方法,该方法需要传递一个线程任务给线程工厂,然后线程工厂将返回一个线程,该线程启动后会执行这个线程任务。可以看到这里传递的就是this,这个this代指该Worker对象本身,因为Worker实现了Runnable,因此实际上返回的thread在start启动之后,会执行对应Worker的run方法。
关于run方法以及与锁相关的方法介绍请看代码注释:
/**1. 线程池中的一个工作线程就被包装为一个Worker的对象2. 实现了Runnable 接口,可以看作线程任务。3. <p>4. 继承了AbstractQueuedSynchronizer(AQS),自己实现了简单的不可重入的独占锁,state=0 表示锁未被获取状态, state=l 表示锁己经被获取的状态5. 这里的实现的独占“锁”,实际上就是用来控制该线程是否可以被中断的,获取锁的线程可以看作是正在工作中的线程,没有获取到锁的线程可以看作空闲线程6. 不可重入是为了保护正在执行任务的线程(已经获取到了锁)对应的的w锁不会被被其他比如setCorePoolSize、setMaximumPoolSize、shutdown等线程池控制方法再次获取并中断7. 另外刚创建Worker时state状态则被设置为-l,这是为了避免该新增的Worker的线程在运行runWorker()方法前就被中断(包括shutdown和shutdownNow),即还没处理过任务是不允许被中断的*/
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {/*** 此类永远不会序列化,提供的serialVersionUID是用来来抑制javac警告的。*/private static final long serialVersionUID = 6138294804551838833L;/*** 此Worker对应的工作线程,由线程工厂创建*/final Thread thread;/*** 要运行的初始任务,可能为null*/Runnable firstTask;/*** 用来统计该Worker对应的工作线程完成的任务数*/volatile long completedTasks;/*** 通过给定的第一个任务和线程工厂创建一个Worker** @param firstTask 第一个任务,如果没有就是null*/Worker(Runnable firstTask) {//首先将state的值设置为-1,用于禁止中断,直到runWorker调方法被调用(执行了unlock,将state变成)setState(-1); // inhibit interrupts until runWorker//firstTask赋值this.firstTask = firstTask;//从线程工厂获取工作线程//newThread方法需要传递一个线程任务给线程工厂,然后线程工厂将返回一个线程,该线程启动后会执行这个线程任务//可以看到这里传递的就是this,这个this代指某个Worker对象本身,因为Worker实现了Runnable,//因此实际上返回的thread在start启动之后,会执行对应Worker的run方法this.thread = getThreadFactory().newThread(this);}/*** 重写的run方法*/public void run() {//调用runWorker方法,参数传递Worker对象本身runWorker(this);}//与锁相关的方法/*** 解锁的方法* 仅仅是将获取锁的线程变量置为null,随后将state置为0而已,不会进行任何校验*/public void unlock() {//调用了release方法,这里传递的参数1实际上没什么用release(1);}/*** 重写的tryRelease方法,将会被release方法调用,获取锁* <p>* 和一般锁实现不一样,仅仅是将获取锁的线程变量置为null,随后将state置为0而已** @param unused 参数,这里没用到* @return 永远返回true*/protected boolean tryRelease(int unused) {//将获取锁的线程变量置为nullsetExclusiveOwnerThread(null);//将state置为0setState(0);//返回truereturn true;}/*** 是否被锁住了,state为-1或者1都会返回true** @return true 是 fale 否*/public boolean isLocked() {return isHeldExclusively();}/*** 是否被锁住了,state为-1或者1都会返回true** @return true 是 fale 否*/protected boolean isHeldExclusively() {return getState() != 0;}/*** 获取锁*/public void lock() {//调用acquireacquire(1);}/*** 尝试获取锁** @return true 成功 false 失败*/public boolean tryLock() {//调用tryAcquire尝试一次return tryAcquire(1);}/*** 重写的tryAcquire方法,将会被acquire方法调用,尝试获取锁* 尝试CAS的将state的值从0改为1,CAS成功表示获取到了锁** @param unused 参数,这里没用到* @return true 获取成功 false 获取失败*/protected boolean tryAcquire(int unused) {//尝试CAS的将state的值从0改为1,CAS成功表示获取到了锁if (compareAndSetState(0, 1)) {//成功之后将当前线程设置为获得锁的线程setExclusiveOwnerThread(Thread.currentThread());//返回turereturn true;}//返回faslereturn false;}/*** 仅仅被interruptWorkers->shutdownNow方法调用,用于中断线程* 如果当前Worker的线程t已经执行runWorker方法(即state不为-1),并且还没有没中断,那么中断t*/void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}
3.1 runWorker执行工作
runWorker方法是Worker的run方法中调用的方法,实际上就是由工作线程执行任务的核心逻辑,到这里,这个方法的执行者将从execute的调用线程变成工作线程。
大概逻辑如下:
- 工作线程将会从此前的firstTask任务开始执行,如果没有,那么从任务队列通过getTask拉取任务来执行。
- 如果firstTask和getTask都返回null,那么表示该任务线程将会退出,对应的Worker将被清理。
- 在执行每一个任务之前,会获取对应的Worker锁,获取了w锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态。此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize、shutdown等其他控制操作获取,即该工作线程此时不能被中断,但是shutdownNow方法除外。
- 每次都会调用任务前置方法beforeExecute,如果重写了该方法并且抛出了异常,那么任务不会再被执行,并且该线程及其Worker将被清理;每次任务被执行后,都会执行afterExecute后置方法,如果重写了该方法并且抛出了异常,那么该线程及其Worker将被清理。
runWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 获取当前工作线程wt,获取第一个要执行任务w.firstTask赋给的task变量,随后w.firstTask置null,释放引用;
- 调用unlock“解锁”。这的解锁操作仅仅是将state设置为0,到此该线程将支持被interrupt中断;
- completedAbruptly表示线程执行过程中是否发生异常的标志位,初始化为true;
- 开启一个try代码块:
- 开启一个循环,下面就是线程不断循环处理任务的逻辑。循环条件是:task不为null,或者调用getTask方法,将的返回值赋给task,此task不为null,那么可以继续循环处理获取的任务;否则退出循环,该线程以及Worker将会被清理:
- 获取对应Worker锁,就是将state从0变成1,获取了锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态。此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize、shutdown等其他控制操作获取,即该工作线程此时不能被中断,但是shutdownNow方法除外。
- 进行一个判断的表达式操作。如果线程池状态值大于等于STOP,此时工作线程应该被无条件被中断,表达式结果为true;线程池状态值小于STOP,但是线程被中断了,那么恢复被中断的线程,将中断标志位改为false,表达式结果为false。
- 如果表达式为true,将当前线程中断,仅仅是设置中断标志位而已。
- 在一个try块中执行任务。首先执行前置方法beforeExecute,默认空实现,自定义的子类可以实现自己的逻辑。如果抛出了异常,那么该任务将不会被执行,,该线程以及Worker将会被清理。
- 使用thrown变量记录任务执行过程中抛出的异常。
- 在一个try块中, 调用task.run,到这里才是真正的执行任务。途中捕获各种异常赋给thrown。
- 无论task.run有没有抛出异常,都会在finally中执行后置方法afterExecute,默认空实现,自定义的子类可以实现自己的逻辑。如果抛出了异常,那么该任务将不会被执行,,该线程以及Worker将会被清理。
- 无论上面的代码有没有抛出异常,都会执行finally语句块:
- 将task变量置null,表示该任务执行完毕,无论是真的成功了还是失败了。
- 此Worker记录的完成任务数量completedTasks自增1
- 如果上面没有抛出异常,那么到此一个任务执行完毕,继续下一次循环,从任务队列中拉取任务执行!
- 到这一步跳出了循环,表示task为null,并且通过getTask重任务队列获取的任务也为null,即没有任务可执行了,循环结束,这是正常行为,将completedAbruptly置为false。
- 开启一个循环,下面就是线程不断循环处理任务的逻辑。循环条件是:task不为null,或者调用getTask方法,将的返回值赋给task,此task不为null,那么可以继续循环处理获取的任务;否则退出循环,该线程以及Worker将会被清理:
- 执行finally语句块的情况:可能是执行过程中抛出了异常,或者getTask返回null,而getTask返回null则可能是各种各样的情况,比如超时、线程池被关闭等。此时该线程和Worker将被清理!
- 调用processWorkerExit方法,传递Worker对象以及是否发生异常的标志位。processWorkerExit方法会将该Worker移除workers集合,并且根据completedAbruptly决定是否新建Worker添加到workers集合。
/*** 工作线程执行任务的核心逻辑,大概就是循环进行任务处理,以及Worker销毁的操作** @param w 工作线程对应的包装类*/
final void runWorker(Worker w) {//获取当前工作线程wtThread wt = Thread.currentThread();//获取第一个要执行的task记录下来Runnable task = w.firstTask;//w.firstTask置空,释放引用w.firstTask = null;//解锁,这的解锁操作仅仅是将state设置为0,到此该线程将支持被interrupt中断w.unlock(); // allow interrupts//线程执行过程中是否发生异常的标志位,初始化为trueboolean completedAbruptly = true;try {/** 开启一个循环,这就是线程处理任务的逻辑* 如果task不为null* 或者task为null,通过getTask从任务队列获取的任务不为null* 以上条件满足一个,即可继续循环,否则结束循环* */while (task != null || (task = getTask()) != null) {//获取对应Worker锁,就是将state从0变成1,此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize等其他控制操作获取//获取了w锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt/** 两个表达式:* 1 (runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))* 如果 此时运行状态值大于等于指定状态值STOP,那么第一个表达式满足,由于短路法不会执行那个第二个表达式* 或者 调用Thread.interrupted()判断线程确实被中断了,如果确实被中断了则该方法还会清除中断标志位,并且此时运行状态值大于等于指定状态值STOP* 2 !wt.isInterrupted()* 当前工作线程没有被中断** 上面两个表达式都满足,表示由于线程池状态值大于等于STOP,此时工作线程应该被无条件被中断,将当前线程中断* 另外,如果不满足无条件中断的要求(线程池状态值小于STOP),但是线程被中断了,还会恢复被中断的线程,将中断标志位改为false*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//将当前线程中断,仅仅是设置中断标志位而已。wt.interrupt();try {//任务开始执行的前置方法,默认空实现,自定义的子类可以实现自己的逻辑//如果抛出了异常,那么该任务将不会被执行,该线程以及Worker将会被清理beforeExecute(wt, task);//thrown记录任务执行过程中抛出的异常Throwable thrown = null;try {//这里才是真正的执行任务task.run();} catch (RuntimeException x) {thrown = x;throw x;} catch (Error x) {thrown = x;throw x;} catch (Throwable x) {thrown = x;throw new Error(x);} finally {//任务执行完毕的后置方法,默认空实现,自定义的子类可以实现自己的逻辑//传递执行的任务以及抛出的异常,这个方法也可能抛出新的异常,这将导致该线程停止afterExecute(task, thrown);}} finally {//无论上面有没有抛出异常,都会执行finally语句块//将task变量置空,表示该任务执行完毕,无论是真的成功了还是失败了。task = null;//此Worker记录的完成任务数量自增1w.completedTasks++;//解锁,将state置为0w.unlock();}}//到这一步,表示task为null,并且通过getTask重任务队列获取的任务也为null,即没有任务可执行了//这是正常行为,completedAbruptly置为falsecompletedAbruptly = false;} finally {//无论上面有没有抛出异常,都会执行finally语句块。可能是执行过程中抛出了异常,或者getTask返回null//getTask返回null,则可能是各种各样的情况,比如超时、比如线程池被关闭等//调用processWorkerExit方法,传递Worker对象以及是否发生异常的标志位//processWorkerExit方法会将该Worker移除workers集合,并且根据completedAbruptly决定是否新建Worker添加到workers集合processWorkerExit(w, completedAbruptly);}
}
3.1.1 getTask拉取任务
getTask方法的主要作用是从阻塞队列中拉取任务,是由工作线程调用的。将会在一个循环中拉取并返回一个任务,由于是阻塞队列,在没有任务时该方法会可能会超时阻塞或者一直阻塞。以下情况将会直接返回null:
- 具有超过maximumPoolSize的线程数量,可能在运行时动态的设置了maximumPoolSize,将maximumPoolSize调小了,此时需要丢弃部分工作线程;
- 线程池处于STOP及其之后的状态,无论还有没有任务,无条件清除全部工作线程;
- 线程池处于SHUTDOWN状态,且 workQueue 为空,队列中的任务已执行完毕,清除工作线程;
- 如果线程数大于corePoolSize,则对超过的线程在keepAliveTime超时之后还没获取到任务就会返回null,如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么对全部线程应用超时时间,这里返回null用于清除多余的工作线程,控制线程数量。
getTask的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- timedOut变量代表上一次的poll()操作是否超时的标志,初始化为fale,表示未超时;
- 开启一个死循环,相当于自旋,从任务队列中拉取任务:
- 获取此时的ctl值c,获取此时的运行状态rs;
- 检查线程池状态: 如果线程池状态值大于等于SHUTDOWN,并且(线程池状态值大于等于STOP,或者任务队列为空)。即如果线程池处于STOP及其之后的状态,或者线程池处于SHUTDOWN状态,且 workQueue 为空。那么表示线程池被关闭,且线程不可以继续执行下去。
- 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 返回null,getTask方法结束,随后该Worker会被清理。
- 到这一步,表示线程池状态符合要求,但是状态是可能随时变化的。获取线程数量wc。
- timed变量表示对工作线程是否应用超时等待的标志。如果allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),表示所有线程都应用超时等待,或者wc大于核心线程数量,那么可以对超过corePoolSize的线程应用超时等待。以上情况满足一种,timed即为true,否则为false。
- 校验线程数量以及是否超时: (如果wc大于最大线程数,表示在线程池运行时动态的将maximumPoolSize调小了,或者上一次的poll()操作已经超时,并且可以对工作线程应用超时等待),并且(如果wc大于1,或者任务队列为空)。
- 这两种情况都满足,表示就可能会返回null。尝试CAS的将ctl的WorkerCount线程数量部分自减1;成功之后返回null,随后该Worker会被清理,getTask方法结束。
- CAS失败之后,继续下一次循环,即这种关闭不是需要立即完成的。因为没有使用锁,可能有多个线程同时超时,此时需要控制因为超时返回的线程数量满足要求,比如某一批CAS成功的超时线程返回之后,其他CAS失败的超时线程在下一次循环时就可能因为数量不达标不会应用超时了。这里也是为什么本次的超时操作要等待到下一循环次才可能返回的原因,因为每次循环都会获取最新的wc和timed值。
- 到这一步,表示线程池的状态、线程数量、以及超时时间满足要求,开始真正的拉取任务。开启一个try代码块:
- 判断timed是否为true,如果为true,那么需要对线程应用超时等待,调用workQueue的超时poll方法,在超时时间范围内等待获取并移除任务(队头),如果超时时间没有获取道任务,那么返回null;如果为false,那么不需要对线程应用超时等待,调用workQueue的take方法,获取并移除任务(队头),没有获取到任务将会一直等待。返回值使用r变量接收。
- 到这一步,表示拉取的方法返回了,如果返回值r不为null,表示拉取到了任务,那么返回该任务,getTask方法结束。
- 到这一步还没有返回,表示没有拉取到任务,属于等待超时,那么timedOut设置为true。继续下一次循环,下一次循环中将可能会返回null,也可能不会。
- 使用catch捕获try块中抛出的InterruptedException。如果捕获到了,那么timedOut设置为false,此时不算拉取超时,继续下一次循环。在shutdown或者其他调整核心参数的方法关闭空闲线程的时候,就是设置中断标志,会将等待的线程唤醒,下一次循环中该空闲线程可能会由于线程状态的原因而返回。如果发生了其他异常,那么由于不能捕获异常而,直接退出该方法,getTask方法结束,随后该Worker会被清理。
/*** 尝试循环从任务阻塞队列拉取新任务的方法,以下情况将会直接返回null:* 1 具有超过maximumPoolSize的线程数量,可能在运行时动态的设置了maximumPoolSize,将maximumPoolSize调小了,此时需要丢弃部分工作线程* 2 线程池处于STOP及其之后的状态,无论还有没有任务,无条件清除全部工作线程* 3 线程池处于SHUTDOWN状态,且 workQueue 为空,队列中的任务已执行完毕,清除工作线程* 4 如果线程数大于corePoolSize,则对超过的线程在keepAliveTime超时之后还没获取到任务就会返回null,如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么对全部线程应用超时时间* 用于清除多余的工作线程,控制线程数量。** @return 任务,或者null,表示该工作线程以及Worker都需要被清理,并且workerCount线程计数会自减1*/
private Runnable getTask() {//上一次的poll()操作是否超时的标志,初始化为fale,表示未超时boolean timedOut = false; // Did the last poll() time out?/*开启一个死循环,相当于自旋,从任务队列中拉取任务*/for (; ; ) {//获取此时的ctl值cint c = ctl.get();//获取此时的运行状态rsint rs = runStateOf(c);// Check if queue empty only if necessary./** 如果线程池状态值大于等于SHUTDOWN,并且(线程池状态值大于等于STOP,或者任务队列为空)* 即如果线程池处于STOP及其之后的状态,或者线程池处于SHUTDOWN状态,且 workQueue 为空* 那么表示线程池被关闭,且线程不可以继续执行下去*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。decrementWorkerCount();//返回null,随后该Worker会被清理return null;}//到这一步,表示线程池状态符合要求,但是状态是可能随时变化的//获取线程数量wcint wc = workerCountOf(c);// Are workers subject to culling?/** timed表示对工作线程是否应用超时等待的标志* 如果allowCoreThreadTimeOut为true,表示所有线程都应用超时等待* 或者 wc大于核心线程数量,那么可以对超过corePoolSize的线程应用超时等待* 以上情况满足一种,timed即为true,否则为false*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/** 如果wc大于最大线程数,表示在线程池运行时动态的将maximumPoolSize调小了,或者上一次的poll()操作已经超时,并且可以对工作线程应用超时等待* 并且 wc大于1,或者任务队列为空** 这两种情况都满足,表示就可能会返回null** 这里也能看出来,如果此时wc为1,即当前线程是唯一的工作线程,并且此时任务队列还不为null* 那么当前线程不会因为超时而返回,因为还需要至少一条线程去执行任务队列中的任务*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//尝试CAS的将ctl的WorkerCount线程数量部分自减1;if (compareAndDecrementWorkerCount(c))//成功之后返回null,随后该Worker会被清理return null;/** CAS失败之后,继续下一次循环,即这种关闭不是需要立即完成的。* 因为没有使用锁,可能有多个线程同时超时,此时需要控制因为超时返回的线程数量满足要求.* 比如某一批CAS成功的超时线程返回之后,其他CAS失败的超时线程在下一次循环时就可能因为数量不达标不会应用超时了。* 这里也是为什么本次的超时操作要等待到下一循环次才可能返回的原因,因为每次循环都会获取最新的wc和timed值.*/continue;}//到这一步,表示线程池的状态、线程数量、以及超时时间满足要求,开始拉取任务try {/** 判断timed是否为true* 如果为true,那么需要对线程应用超时等待,调用workQueue的超时poll方法,* 在超时时间范围内等待获取并移除任务(队头),如果超时时间没有获取道任务,那么返回null* 如果为false,那么不需要对线程应用超时等待,调用workQueue的take方法,* 获取并移除任务(队头),没有获取到任务将会一直等待** 返回值使用r变量接收*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//到这一步,表示拉取的方法返回了,如果返回值r不为null,表示拉取到了任务,那么返回任务if (r != null)return r;//到这一步还没有返回,表示没有拉取到任务,属于等待超时,那么timedOut设置为true//继续下一次循环,下一次循环中将可能会返回null,也可能不会。timedOut = true;} catch (InterruptedException retry) {//如果在拉取过程中等待时被中断,那么会抛出InterruptedException异常,这里将捕获这个异常//此时不算拉取超时,timedOut设置为false,继续下一次循环。在shutdown方法关闭空闲线程的时候,就是设置中断标志,会将等待的线程唤醒,下一次循环中该空闲线程可能会由于线程状态的原因而返回timedOut = false;//发生的其他异常,则直接退出该方法}}
}
3.1.2 processWorkerExit清理/补充Worker
当在runWorker的任务执行过程中抛出异常,或者getTask返回null,getTask返回null,则可能是各种各样的情况,比如超时、比如线程池被关闭等,那么这个Worker被丢弃,此时需要调用processWorkerExit方法对于无用的Worker进行清理,该方法仅仅会被Worker对象的线程调用,即也是由工作线程调用的。
processWorkerExit方法需要传递Worker对象以及是否发生异常的标志位completedAbruptly。该方法会将参数Worker移除workers集合,并且根据线程池状态以及completedAbruptly决定是否补充新Worker。
在移除Worker之后,如果此时线程池状态为RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止,或者不是应为抛出异常而被停止,但此时的线程数量小于最小线程数min。这两情况都会补充一个新Worker。 这就是我们常说的:线程池中的线程抛出异常之后,会自动补充开启一个线程的原理,现在我们也能明白不是无条件开启的。
processWorkerExit的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 如果completedAbruptly为true,表示因为异常才调用该方法,此时workerCount还没有来得及自减1,这里需要补上:
- 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 获取mainLock锁;
- 更新线程池已完成的任务数量,当前的值加上终止线程记录的任务执行数量,只有在某个Worker工作线程终止时才会更新。
- 从workers集合中移除该Worker。
- 最终解锁。
- 此时线程池可能不是处于RUNNING状态,调用tryTerminate尝试彻底终止线程池。
- 获取此时的ctl值c。如果线程池状态值小于等于STOP,即处于RUNNING或者SHUTDOWN状态,那么可能需要补充线程。
- 继续判断如果completedAbruptly为false,表示不是因为抛出异常被停止。
- 使用min变量表示线程池需要保持的最小线程数。如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么表示对所有线程应用超时,那么min=0;如果allowCoreThreadTimeOut为fasle,那么表示对超过核心线程数量的线程线程应用超时,那么min=corePoolSize。
- 如果min为0,并且任务队列不等于空,那么最小线程数应该为1,因为此时需要至少一个线程去执行任务队列中的任务。min设置为1。
- 如果线程数量大于等于最小线程数min,那么不需要补充线程,直接return结束方法;否则表示不需要补充线程,进入下一步。
- 到这一步,表示:处于RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止,或者处于RUNNING或者SHUTDOWN状态,并且此时的线程数量小于最小线程数min。这两种情况满足一种,即需要补充一个Worker,那么调用addWorker方法尝试新增一个Worker。
- 继续判断如果completedAbruptly为false,表示不是因为抛出异常被停止。
/*** 对于无用的Worker进行清理,该方法仅仅会被Worker对象的线程调用* 将参数Worker移除workers集合,并且根据线程池状态以及completedAbruptly决定是否补充新Worker。** @param w Worker* @param completedAbruptly 如果此Worker因抛出异常而停止,则为true,否则为false*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果completedAbruptly为true,表示因为异常才调用该方法// 此时workerCount还没有来得及自减1,这里需要补上if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;//获取mainLock锁mainLock.lock();try {//更新线程池已完成的任务数量,当前的值加上终止线程记录的任务执行数量,只有在某个Worker工作线程终止时才会更新completedTaskCount += w.completedTasks;//从workers中移除该Workerworkers.remove(w);} finally {//最终解锁mainLock.unlock();}//此时线程池可能不是处于RUNNING状态,调用tryTerminate尝试彻底终止线程池tryTerminate();//获取此时的ctl值cint c = ctl.get();/** 如果c的状态值小于等于STOP,即处于RUNNING或者SHUTDOWN状态,那么可能需要补充线程。*/if (runStateLessThan(c, STOP)) {//如果completedAbruptly为false,表示不是因为抛出异常被停止if (!completedAbruptly) {//min表示线程池需要保持的最小线程数// 如果allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么表示对所有线程应用超时,那么min=0// 如果allowCoreThreadTimeOut为fasle,那么表示对超过核心线程数量的线程线程应用超时,那么min=corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min为0,并且任务队列不等于空,那么最小线程数应该为1,因为此时需要之手啊一个线程去执行任务队列中的任务if (min == 0 && !workQueue.isEmpty())//min设置为1min = 1;//如果此时的线程数量大于等于最小线程数min,那么不需要补充线程,直接return结束方法//否则表示不需要补充线程,进入下一步if (workerCountOf(c) >= min)return; // replacement not needed}/** 到这一步,表示:* 1 处于RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止* 2 或者 处于RUNNING或者SHUTDOWN状态,并且此时的线程数量小于最小线程数min** 这两种情况满足一种,即需要补充一个Worker,那么调用addWorker尝试新增一个Worker*/addWorker(null, false);}
}
Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】相关推荐
- Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池
详细介绍了Executors线程池工具类的使用,以及四大内置线程池. 系列文章: Java Executor源码解析(1)-Executor执行框架的概述 Java Executor源码解析(2)-T ...
- 13.ThreadPoolExecutor线程池之submit方法
jdk1.7.0_79 在上一篇<ThreadPoolExecutor线程池原理及其execute方法>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法 ...
- ScheduledThreadPool 源码解析——定时类线程池是如何工作的
文章目录 引言 一.ScheduledThreadPool 使用示例 1. 延时类的定时任务 `schedule` 2. 延时类,固定周期执行任务 `scheduleAtFixedRate` 3. 延 ...
- 【EventBus】EventBus 源码解析 ( 事件发送 | 线程池中执行订阅方法 )
文章目录 一.EventBus 中主线程支持类 二.EventBus 中 AsyncPoster 分析 三.AsyncPoster 线程池 Runnable 任务类 一.EventBus 中主线程支持 ...
- 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 )
文章目录 一.线程池 execute 方法源码解析 二.线程池 execute 方法完整源码及注释 一.线程池 execute 方法源码解析 进入 ThreadPoolExecutor 中 , 查看线 ...
- mysql 线程池源码模块_易语言Mysql线程池2.0模块源码
易语言Mysql线程池2.0模块源码 易语言Mysql线程池2.0模块源码 系统结构:GetThis,初始化,关闭类线程,线程_测试,其他_附加文本,连接池初始化,取mysql句柄,释放mysql句柄 ...
- Java线程池execute()方法源码解析
先看作者给出的注释来理解线程池到底有什么作用 * Thread pools address two different problems: they usually * provide improve ...
- 一文带你熟透Java线程池的使用及源码
在单个线程使用过程中遇到的问题(new Thread().start): 线程的频繁创建与销毁 线程执行数据多且高频,频繁CPU上下文切换,造成CPU的资源浪费 以上种种问题,让我们不禁想到,怎么复用 ...
- ThreadPoolExecutor线程池相关源码分析
目录 前言 一.线程池状态 二.Worker类 三.线程池运行过程 1.execute 2.addWorker 3.runWorker 4.getTask 5.processWorkerExit 四. ...
最新文章
- php安卓传输图片到mysql_php – Android应用程序将图像发送到MySQL
- 李彦宏称AI可让人们获得永生
- django部署iiswin10_基于Windows平台的Django在本地部署和腾讯云服务器上部署的方法教程(一)...
- fs_struct和file_struct关系
- 使用javap分析Java的字符串操作
- qt 无法打开shell32_在Qt中用默认程序打开文件
- 百度链接解析_【集合】百度分享链接解析的方法总结
- ConcurrentHashMap 源码
- 手把手带你入门 Docker Compose
- PaddleOCR-release-2.3\deploy\cpp_infer\src识别中文时出现乱码
- C++基础——使用字符串作为函数模板的实参
- Leetcode207---课程表(逆拓扑排序)
- linux系统fsck.ext4,Ext4文件系统fsck后损坏修复过程一例
- AS更换背景主题以及背景图片
- 制作Excel图表背景
- 3029. 【NOIP2011DAY2】观光公交
- 滴滴开源Android插件框架
- 手动配置协议和服务器POP,在outlook上添加账户并介绍邮件协议相关知识
- sql导入数据以及列表编号设置自动填充
- 常用又有趣的网站大合集
热门文章
- 埃瓦里斯特·伽罗瓦Évariste Galois
- Power BI中如何处理相同名称的客户
- oracle 不使用结果缓存,为什么Oracle 12.1.0.2会跳过结果缓存表上的函数调用?
- 怎么确定电视吊架安装位置,电视支架安装讲解
- RJ-45双绞线的制作和测试-网络实验1
- 套接字、UDP通信、TCP通信、TCP\IP协议簇
- 文件的上传和下载(一)
- Java获取今天是几号
- 高德地图搜索,点击地图获取经纬度
- window10 安装语言包出现“很抱歉,我们无法安装此功能。你可以稍后重试。错误代码: 0x80070422”