ThreadPoolExecutor源码解析及工作机制

首先介绍前提条件及本文用词说明

线程中断 interrupt()只是设置线程中断位,线程并没有被真正被中断,还是RUNNABLE状态

线程池中线程 == Worker线程(实际是Worker.thread)

线程池执行的任务 == task(也就是调用execute(Runnable r)传入的r)

线程池在调用shutDown()方法以后会拒绝接受新task,队列里的task会继续执行完,调用shutDownNow()方法后已经getTask()成功的task会运行完,然后剩下的task都将不会执行,task有wait,sleep,park的都会抛出InterruptedException,take()阻塞的线程也会唤醒,最后全部结束线程池为TERMINATED状态,返回未执行的任务集合。

private static final int COUNT_BITS = Integer.SIZE - 3;         // 后面低29位来源
// ctl贯穿全文标志,32位,高三位记录状态低29位记录WorkerCount数量,初始状态位RUNNING,Worker数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;        // Worker线程数容量,2^29 - 1
5种状态,RUNNING < SHUTDOWN < STOP<TIDYING < TERMINATED
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
后面常用到的基础方法,runStateOf(int c)获取当前线程池状态,workerCountOf(int c)获取当前Worker数量
// 返回线程池状态,ctl高3位存储线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }  // ~CAPACITY:11100000000000000000000000000000
// 返回Worker数量,ctl低29位存储Worker数量
private static int workerCountOf(int c)  { return c & CAPACITY; }   //  CAPACITY:00011111111111111111111111111111
// 任何数与0 | 运算就等于本身不改变值,所以这里暂时没什么意义
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 存放Worker
private final HashSet<Worker> workers = new HashSet<Worker>();
// 运行过程中最大的线程数
private int largestPoolSize;
// 完成的task总数
private long completedTaskCount;
// 默认的RejectedExecutionHandler,它的做法是超出队列而无法新增Worker就抛异常
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

worker类

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/** Thread this worker is running in.  Null if factory fails. */// 真正运行的线程final Thread thread;/** Initial task to run.  Possibly null. */// 真正执行的任务Runnable firstTask;/** Per-thread task counter */// 该Worker完成的总任务数volatile long completedTasks;Worker(Runnable firstTask) {// 防止被interrupts,interrupts时采用CAS将state从0改为1setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}// 采用独占锁,且不可重入public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

1.构造方法

各个参数具体使用场景后面会写到

public ThreadPoolExecutor(int corePoolSize,                             // 核心线程数int maximumPoolSize,                      // 最大线程数long keepAliveTime,                       // 保持存活时间(这里不管设置什么默认用的纳秒,因为workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS))TimeUnit unit,                            // 时间单位BlockingQueue<Runnable> workQueue,        // 阻塞队列ThreadFactory threadFactory,              // 线程构造工厂RejectedExecutionHandler handler) {       // 拒绝策略if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

2.execute()方法

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 新增一个Worker,新增成功则直接返回execute方法结束,这里core为true(worker数量上限为corePoolSize)if (addWorker(command, true))return;// 新增失败则再次获取ctlc = ctl.get();}// RUNING状态 且 offer成功(workQueue满了offer返回false)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// workQueue满了继续增加Worker,以maximumPoolSize为上限,若失败采取拒绝策略else if (!addWorker(command, false))reject(command);}

3.addWorker(boolean core)

// core为true时,Worker数量上限为corePoolSize// core为false时,Worker数量上限为maximumPoolSizeprivate boolean addWorker(Runnable firstTask, boolean core) {// 循环直到采用CAS将ctl+1成功,或者满足下面【1】、【2】条件退出方法retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 【1】非RUNING的时候,状态为SHUTDOWN且firstTask为空(执行task发生用户异常processWorkerExit()方法补偿的Worker线程),workQueue不为空才可以添加worker,否则返回false,添加失败if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取worker数量int wc = workerCountOf(c);// 【2】如果wc大于等于CAPACITY(2^30-1)或者大于等于指定的最大数量,返回false,添加失败if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 采用CAS将ctl+1,操作成功则退出循环retryif (compareAndIncrementWorkerCount(c))break retry;// 上面加一操作失败,重新获取ctl值c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// worker是否已启动boolean workerStarted = false;// worker是否已添加到队列boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 线程池ThreadPoolExecutor对象锁final ReentrantLock mainLock = this.mainLock;// 上锁目的://          1)添加worker到workers(HashSet线程不安全)//          2)保证在添加worker到workers的过程中runState不会被其他线程改变(shutDown()方法),mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// RUNNING状态 或者 SHUTDOWN状态且firstTask为空if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查worker.thread是否已启动if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动worker线程t.start();workerStarted = true;}}} finally {// worker Start失败则进行失败处理if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

4.addWorkerFailed(Worker w)  Worker启动失败回滚处理方法

// Worker启动失败回滚处理方法
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;// 上锁目的:// 1)从workers(HashSet线程不安全)移除worker,// 2)workers移除和ctl - 1,原子操作mainLock.lock();try {if (w != null)workers.remove(w);decrementWorkerCount();// 这里调用tryTerminate()的原因// A线程addWorker()的时候将Worker添加到workers,这时候另一个线程B调用了shutDown()方法,由于此时workers不为空,所以tryTerminate()不做任何操作// A线程接着start Worker线程,但是失败了所以无法进入runWorker()方法结束时调用的processWorkerExit()方法,也就无法调用tryTerminate()来terminatetryTerminate();} finally {mainLock.unlock();}}

5.runWorker(Worker w) 线程池中线程真正run()真正运行的逻辑

// 线程池中线程真正run()真正运行的逻辑
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 构造函数初始化的时候Worker.state == -1,不允许interrupts,现在改为0,允许被interruptsw.unlock(); // allow interrupts// if the worker died due to user exception// 当前Worker是否非正常结束(循环结束了会赋值为false)boolean completedAbruptly = true;try {// task==null说明是Worker运行task发生用户异常进入processWorkerExit方法,补进来的Worker线程while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 检查状态,如下情况都满足(状态大于SHUTDOWN)则中断Worker线程// 1)ctl >= STOP (STOP、TIDYING、TERMINATED) 或者 Worker线程被中断,立即重置线程状态(停止中断),停止中断后 ctl >= STOP// 2)Worker线程没被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {// Worker线程真正执行firstTask业务逻辑task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// Worker线程正常结束completedAbruptly = false;} finally {// Worker线程结束处理 processWorkerExit(w, completedAbruptly);}}

6.getTask() Worker线程运行在循环体中开始位置调用getTask()获取任务

private Runnable getTask() {// 获取task是否超时标志,初始为false(还没取肯定为false)boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 一下条件均满足则返回空// 1)状态 >= SHUTDOWN// 2)状态 >= STOP 或者workQueue为空// 上面条件 等价与//                  SHUTDOWN状态且workQueue为空(队列都为空了getTask()肯定返回空啦) 或者 > SHUTDOWN状态(以上状态肯定不能继续执行任务了,所以getTask()返回空)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// WorkerCount数量 - 1(为什么要 - 1,因为getTask()方法返回空,调用这个方法的Worker线程就会退出循环结束线程了)decrementWorkerCount();return null;}// *** 能走到这里说明要么(1)、RUNING状态(2)、SHUTDOWN状态但是workQueue不为空,还有任务没有执行完 ***int wc = workerCountOf(c);// Are workers subject to culling?// 从workQueue获取任务是否有时间限制(allowCoreThreadTimeOut默认false)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 以下条件均满足则执行WorkerCount - 1// 1)wc > maximumPoolSize 或者 timed && timedOut// 2)wc > 1 或者 workQueue为空// 这一步目的://    减少多余的Worker线程(满足下面2个条件任意一个就是多余Worker线程),必须满足wc > 1 || workQueue.isEmpty()(因为Worker至少要留一个执行task,所以要 > 1(maximumPoolSize可能为0),或者workQueue为空了,这个时候==1也可以)//          1)大于maximumPoolSize//          2)timedOut == true(上次获取超时了) 且 allowCoreThreadTimeOut == true ,即使wc <= maximumPoolSizeif ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// allowCoreThreadTimeOut == false 且 wc <= corePoolSize时才采用一直阻塞获取Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// 走到这里说明肯定调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),没取到值超时了返回空(因为take()会一直阻塞下去)timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

7.processWorkerExit(Worker w, boolean completedAbruptly)

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 正常结束WorkerCount不减一(正常结束因为getTask()方法返回空,在返回空之前会进行workerCount的减一。我们这里只用执行WorkerExit退出就行了,workerCount与本方法无关,异常的时候修改是为了保证workerCount与Worker数一直,因为后面会补一个Worker进来)// 用户异常减一(后面会补一个空firstTask的Worker进来),否则数量不一致if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();int c = ctl.get();// RUNNING 或者 SHUTDOWN状态,执行下面处理,否则直接返回if (runStateLessThan(c, STOP)) {// 如果不是用户异常导致进入processWorkerExitif (!completedAbruptly) {// 最小必须Worker线程数(allowCoreThreadTimeOut == true时,Worker线程可以减为0)int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果workQueue不为空,还有任务没完成,min至少为1if (min == 0 && ! workQueue.isEmpty())min = 1;// 当前Worker线程数  >= min不做处理,直接返回;如果小执行后面的addWorker(null, false) 一般不会小于;if (workerCountOf(c) >= min)return; // replacement not needed}// 添加一个空Worker线程,且此次添加Worker线程上线为maximumPoolSizeaddWorker(null, false);}}

8.tryTerminate()

final void tryTerminate() {for (;;) {int c = ctl.get();// 满足任意条件则返回,结束方法// 1)RUNNING状态// 2)大于等于TIDYING状态(TIDYING、TERMINATE)// 3)SHUTDOWN状态且workQueue不为空if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// *下面的只有STOP状态 或者 SHUTDOWN状态且workQueue为空*// workerCount不为0,也就是idle Worker,可以terminate掉if (workerCountOf(c) != 0) { // Eligible to terminate// 中断所有Workers线程interruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;// 上锁,防止其他线程修改ctlmainLock.lock();try {// 采用CAS将ctl设置成TIDYING状态,修改失败则重新进入循环if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {// 将ctl设置成TERMINATED状态ctl.set(ctlOf(TERMINATED, 0));// 唤醒termination Conditiontermination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}

9.shutdown()

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将ctl高3位设置成SHUTDOWN对应的值,低29位设置成workerCount对应的值advanceRunState(SHUTDOWN);// 中断一个非中断且没锁的Worker线程interruptIdleWorkers();// ScheduledThreadPoolExecutor的钩子方法onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}

10.shutdownNow()

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将ctl高3位设置成STOP对应的值,低29位设置成workerCount对应的值// STOP状态,runWorker()方法循环执行真正task前都会检查状态,如果是STOP状态不会执行task,循环下次getTask()的时候,getTask()会返回空,结束Worker线程advanceRunState(STOP);// 中断所有Worker线程interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}

ThreadPoolExecutor源码解析相关推荐

  1. 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等

    线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...

  2. 面试官系统精讲Java源码及大厂真题 - 37 ThreadPoolExecutor 源码解析

    37 ThreadPoolExecutor 源码解析 当你做成功一件事,千万不要等待着享受荣誉,应该再做那些需要的事. -- 巴斯德 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可 ...

  3. Java 1.7 ThreadPoolExecutor源码解析

    Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池: public static ExecutorService newFixedThread ...

  4. ThreadPoolExecutor源码解析(二)

    1.ThreadPoolExcuter运行实例 首先我们先看如何新建一个ThreadPoolExecutor去运行线程.然后深入到源码中去看ThreadPoolExecutor里面使如何运作的. pu ...

  5. ThreadPoolExecutor源码解析(一)

    1.ThreadPoolExcuter原理说明 首先我们要知道为什么要使用ThreadPoolExcuter,具体可以看看文档中的说明: 线程池可以解决两个不同问题:由于减少了每个任务的调用开销,在执 ...

  6. python threading ThreadPoolExecutor源码解析

    future: 未来对象,或task的返回容器 1. 当submit后: def submit(self, fn, *args, **kwargs):with self._shutdown_lock: ...

  7. Java 线程池ThreadPoolExecutor的应用与源码解析

    ThreadPoolExecutor 工作原理 假设corePool=5,队列大小为100,maxnumPoolSize为10 向线程池新提交一个任务,会根据ThreadFactory创建一个新的线程 ...

  8. Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】

    基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...

  9. 【多线程】ThreadPoolExecutor类源码解析----续(二进制相关运算)

    前言 在之前阅读 ThreadPoolExecutor 源码的时候,发现代码里用到了一些二进制相关的位运算之类的代码,看起来有些费劲了,所以现在大概总结了一些笔记,二进制这东西吧,不难,就跟数学一样, ...

最新文章

  1. python keyerror(0)
  2. grep与正则表达式基础
  3. 虚拟交换机软件_H3C交换机IRF配置介绍
  4. [Python] Tkinter的食用方法_02_LabelFrame RadioButton CheckButton
  5. Java中多线程的性能比较
  6. 假如把支付宝存储服务器炸了,里面的钱还在么?
  7. UIWebView捕获内部web点击事件
  8. Redis内存缓存系统入门
  9. 代码评审常见问题总结【持续更新】
  10. 20145209 2016-2017-2 《Java程序设计》第4周学习总结
  11. 学术分享 | 没有导师的指导,研究生如何阅读文献、提出创见、写论文?
  12. [故障解决]Could not get a resource from the pool。
  13. 交换机cad图例_各种弱电系统的CAD图纸,包含图例、大样图、系统图及原理图等...
  14. Java游戏神秘岛,【寻找高玩】服务端整合问题
  15. 使用Google Analytics(分析)进行用户体验研究的5种方法
  16. jsp微信二维码收款_java实现微信支付之扫码支付
  17. 使用邮箱教学|邮箱是什么?办公中是怎么使用的
  18. VR和AR将如何发展下去?哪个更有前景?
  19. 15 条实用 Linux/Unix 磁带管理命令
  20. Win10中找不到gpedit.msc

热门文章

  1. 如何正确使用机器学习中的训练集、验证集和测试集?
  2. css浮动float:left|right
  3. 传统企业的移动电商平台实践
  4. 量化金融分析师学习笔记——中篇完结
  5. python print()方法基本用法,print()格式化输出
  6. Eureka如何实现自我保护机制
  7. WordPress多功能新闻积分商城主题LensNews (免费分享)
  8. 国家网络安全工程师、大数据网络运维工程师就业前景好
  9. 视觉神经网络模型优秀开源工作:PyTorch Image Models(timm)库
  10. protobuf初识