ThreadPoolExecutor 应用 & 源码解析

文章目录

  • ThreadPoolExecutor 应用 & 源码解析
    • 一、线程池相关介绍
      • 1.1 为什么有了JDK提供的现有的创建线程池的方法(Executors类中的方法),然而还需要自定义线程池
        • ThreadPoolExecutor 提供的七个核心参数大致了解
        • JDK提供的几种拒绝策略
    • 二、ThreadPoolExecutor的应用
    • 三、ThreadPoolExecutor源码解析
      • 3.1 线程池的核心属性
      • 3.2 线程池的有参构造
      • 3.3 execute 方法
        • 3.3.1 execute方法本体
      • 3.3.2 execute 方法的执行流程图
        • 3.3.3 addWorker方法
          • 3.3.3.1 addWorkerFailed , 启动工作线程失败执行方法
      • 3.4 Worker工作线程类
        • 3.4.1 runWorker
        • 3.4.2 getTask()
      • 3.5 关闭线程池方法
        • 3.5.1 shutdownNow()
          • advanceRunState
          • interruptWorkers
        • 3.5.2 shutdown();
          • interruptIdleWorkers()
        • 3.5.3 tryTerminate
    • 四、线程池整体流程图

一、线程池相关介绍

1.1 为什么有了JDK提供的现有的创建线程池的方法(Executors类中的方法),然而还需要自定义线程池

前面演示的Executors中的构建线程池的方式(newXXX一类的方法),大多数都是基于ThreadPoolExecutor new来创建出来的

ThreadPollExecutor的构造器中 一共提供了七个参数 , 每个参数都是非常核心的参数 , 在线程池去执行任务时每个参数都有对任务决定性的作用
如果直接使用JDK提供的Executors中的方法来创建线程池,其中可以自定义设置的核心参数只有两个,这样的话会导致线程池的控制粒度很粗。在阿里规范中也推荐自己去自定义线程池,手动new线程池 指定参数
自定义线程池,可以细粒度的控制线程池,针对一些参数的设置可以在后期有效的帮助我们排查问题(ThreadFactory)

ThreadPoolExecutor 提供的七个核心参数大致了解

public ThreadPoolExecutor(// 1. 核心工作线程个数(当前任务执行结束后,这个线程不会被销毁 , 结束后这个线程会执行take方法死等未来要处理的任务)int corePoolSize,// 2. 最大工作线程个数(池中一共可以有多少个工作线程=核心数+非核心数) , 非核心工作线程执行完任务后会执行 poll(time,unit) 方法过段时间自动销毁// 例如 corePollSize = 2,maximumPoolSize=5,那么就表示线程池中最多有5个工作线程存在 其中2个是核心线程 , 剩下3个是非核心线程int maximumPoolSize,// 3. 非核心工作线程,在阻塞队列中等待的时间 keepAliveTimelong keepAliveTime,// 4. 上边keepAliveTime时间的单位TimeUnit unit,// 5. 在没有核心线程处理任务时,会把任务扔到此阻塞队列中等待非核心工作线程处理,具体选择哪种阻塞队列需要根据业务来选择BlockingQueue<Runnable> workQueue,// 6. 创建线程的工厂 , 一般用来指定线程池中线程的名称 (只用来构建Thread对象 而不是核心线程Worker对象)ThreadFactory threadFactory,// 7. 当线程池无法处理加入的任务时,执行拒绝策略RejectedExecutionHandler handler){}

JDK提供的几种拒绝策略

  • AbortPolicy: 这个拒绝策略会在线程池无法处理任务时,直接抛出一个异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}
  • CallerRunsPolicy: 这个拒绝策略会在线程池无法处理任务时, 会将当前提交过来的任务让调用线程池的线程去处理该任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {// 让提交给线程池任务的线程去处理该任务 , 这种则是同步的方式r.run();}
}
  • DiscardPolicy: 这个拒绝策略会在线程池无法处理任务时,会直接丢弃掉这个任务(不做任何处理)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 空空如也、不做任何处理
}
  • DiscardOldestPolicy: 这个拒绝策略会在线程池无法处理任务时,会将当前阻塞队列中 最早加入的任务丢弃掉然后把本次提交的任务放进去
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {// 将阻塞队列中最早加入的任务移除e.getQueue().poll();// 再次将本次提交的任务提交到线程池中e.execute(r);}
}
  • 自定义线程池的拒绝策略

    • 根据自己的业务,可以将任务存储到数据库中,也可以做其他操作

二、ThreadPoolExecutor的应用

public class TestThreadPollExecutor {private static final AtomicInteger counter = new AtomicInteger(1);public static void main(String[] args) {// 1. 构建自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(5),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread returnThread = new Thread(r);// 这里可以设置线程的一些属性returnThread.setName("Test-ThreadPoolExecutor-" + counter.getAndIncrement());return returnThread;}},new MyRejectExecution());// ===============// 2. 让线程池处理任务// ===============// 2.1 没有返回结果的任务threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "-没有返回结果的处理方式-Runnable");});// 2.2 有返回结果的任务Future<String> handleResultFuture = threadPool.submit(() -> {System.out.println(Thread.currentThread().getName() + "-有返回结果的处理方式-Callable");return "处理的返回结果!";});try {System.out.println(Thread.currentThread().getName() + "-拿到返回结果=>" + handleResultFuture.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 3. 局部变量的线程池 用完之后需要 shutdownthreadPool.shutdown();/*输出结果:Test-ThreadPoolExecutor-1-没有返回结果的处理方式-RunnableTest-ThreadPoolExecutor-2-有返回结果的处理方式-Callablemain-拿到返回结果=>处理的返回结果!*/}/*** 自定义拒绝策略*/private static class MyRejectExecution implements RejectedExecutionHandler{@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("自定义拒绝策略,根据自己的业务逻辑来实现具体的拒绝策略功能!");}}}

三、ThreadPoolExecutor源码解析

3.1 线程池的核心属性

/** *  1. ctl 这个就是线程池最核心的属性,其实就是一个int类型的数值,只是套了一层AtomicInteger,在进行运算时是原子性的,下方所有的内容都是在对ctl这个属性做操作*  2. ctl 表示的是线程池的两个核心状态: <1> 线程池的状态  <2> 工作线程的个数(不区分核心数和非核心数)*     <1.> 线程池状态:   ctl代表的int类型值的高3位 , 表示当前线程池的状态*     <2.> 工作线程数量:  ctl的低29位表示,工作线程个数*     因此需要进行位运算来操作这两个状态,下方就提供了一些位运算的属性和计算这两种状态的位运算方法*  ctl的默认值通过 ctlOf(RUNNING,0)返回的结果作为默认值 , 即 (111 | 00000000 00000000 00000000 00000000) ==> 11100000 00000000 00000000 00000000*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 常量COUNT_BITS 一般表示的值就是 29 为了更方便的操作 int 的低29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00000000 00000000 00000000 00000001 => 1
// 00100000 00000000 00000000 00000000 => 1 << 29
// 00011111 11111111 11111111 11111111 => 1 << 29 - 1
// CAPACITY 表示当前线程池中 可以记录到的工作线程的做大值
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// =============== 线程池状态相关属性 ==================// 可以看到线程池的状态有5种状态 , 当且仅当线程池状态为-1时 表示当前线程池没有问题 可以处理任务 其他都是有问题的状态// -1二进制表示 : 11111111 11111111 11111111 11111111
// <1> RUNNING(-1): 高3位是 111
// 此时可以处理任务,也可以操作阻塞队列种的任务
private static final int RUNNING    = -1 << COUNT_BITS;
// <2> SHUTDOWN(0): 高3位是 000
// **此时不会接收新的任务,但是正在处理的任务和阻塞队列中的任务都会执行完毕**
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// <3> STOP(1) : 高3位是 001
// **此时不接收新的任务,而且会调用正在处理的任务线程的中断方法,阻塞队列种的任务一个不管**
// 中断方法:(前面直到中断并不会直接结束线程而是由线程自己决定什么时候中断,只是告诉线程一声(改变中断标志位))
private static final int STOP       =  1 << COUNT_BITS;
// <4> TIDYING(2) : 高3位是 010
// 这个状态是从SHUTDOWN或者STOP状态转变过来的一个过度状态 , 表示当前线程池即将要关闭了,但是还未关闭
private static final int TIDYING    =  2 << COUNT_BITS;
// <5> TERMINATED(3) : 高3位是 011
// 线程池关闭的真正的状态 , 这个状态是由 TIDYING 转换过来的,转换过来之后为了执行一个钩子方法terminated()
private static final int TERMINATED =  3 << COUNT_BITS;// 根据传入的ctl值 拿到ctl高3位的值 - 也即当前运行状态
// ~CAPACITY ==> 11100000 00000000 00000000 00000000
// ctl与此值执行&运算会拿到当前ctl所代表的线程池的运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 同理 拿到当前ctl低29位的值 代表当前线程池中的工作线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 传入运行状态rs , 和工作线程个数ws , 让这两个值组成一个新的ctl值
// 两个值经过 | 运算会把他们的二进制位拼起来 组成一个新的ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 线程池状态及其转换流程图

    • 线程池状态及其转换流程图

3.2 线程池的有参构造

/*** 无论调用ThreadPollExecutor的哪个有参构造 最终都会调用这个构造器* * 重点: 核心线程数是允许为0的* */
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// corePoolSize可以 == 0// 最大工作线程数 必须 >  0// 最大工作线程数 必须 >= 核心线程数// 非核心线程的空闲时间 必须 >= 0if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();// 校验阻塞独立额、线程工厂、拒绝策略 均不允许为nullif (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 系统资源访问相关内容,和线程池核心业务无关this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();// 各种赋值 , 赋值给成员变量// 后边会大量出现 Doug Lea的编码习惯 将这些成员变量作为局部变量进行操作this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;// 转换为纳秒,为了更精确this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

3.3 execute 方法

1.execute方法是提交任务到线程池的核心方法,很重要
2.线程池的执行流程就是execute方法 内部做了哪些判断

3.3.1 execute方法本体

// command 就是提交过来的任务
public void execute(Runnable command) {// 提交的任务不能为nullif (command == null)throw new NullPointerException();// 拿到当前线程池 ctl属性 , 为了后续判断线程池状态判断int c = ctl.get();/***  workerCountOf(c) : 拿到当前线程池的工作线程个数*  线程池的核心线程是懒加载的 , 只有用到了才会创建*  拿到工作线程个数 < 核心线程数 表示线程池还可以添加核心线程 那么就添加核心线程来处理此任务*/if (workerCountOf(c) < corePoolSize) {// addWorker(任务,是否为核心线程)// addWorker 返回true 表示添加工作线程成功 , 返回false 则添加工作线程失败// 添加失败的原因比如 addWorker 执行时线程池执行了 shutdownNow()方法 中断所有线程此时就不能再添加工作线程了返回false ||| 或者并发判断工作线程数量时产生并发问题// addWorker 会基于线程池的状态、以及工作线程个数来判断能否添加工作线程if (addWorker(command, true))// 工作线程构建出来、并且任务也交给工作线程处理了 本次提交任务直接返回truereturn;// ================ 到这里说明添加工作线程失败 (线程池状态或者工作线程个数发生了变化导致添加失败) ===============// 此时ctl已经由于并发原因被改变了 所以重新获取ctlc = ctl.get();}// ====== 不能添加核心线程 或者 添加核心线程失败了 继续走下边的逻辑 ============/*** isRunning(c) : 判断当前线程池的状态是不是RUNNING状态(前边讲过线程池只有是RUNNING状态时才可以添加任务)*                如果线程池状态是RUNNING状态:则可以添加当前任务到阻塞队列中 , 否则继续执行下边else的逻辑* workQueue.offer(command): 只要这段代码执行成功,那么线程池内正在等待任务的工作线程(getTask方法内poll、take方法阻塞着的线程),就会拿到任务然后去执行*                           因此这一步不会添加新的工作线程而是使用池内有的工作线程去执行任务(除非是第一次添加任务还没有工作线程会添加一个非核心的空任务工作线程去执行任务)*/if (isRunning(c) && workQueue.offer(command)) {// 添加任务到阻塞队列中成功则走if里边的代码// 由于这个if逻辑并没有加锁,所以ctl可能会出现并发修改问题 因此 当任务添加到阻塞队列之后,重新检查线程池的状态是否发生了改变// 重新获取ctlint recheck = ctl.get();/**如果线程池的状态不是 RUNNING 状态了 , 那么就将本次添加的任务从阻塞队列中移除*/if (!isRunning(recheck) && remove(command))// 执行拒绝策略reject(command);// =========== 上边的if判断没有执行 程序运行到这里说明 此时阻塞队列中已经存在了本次添加的任务command =================// 如果工作线程是0个,那么就需要添加一个工作线程去处理这个任务// 发生这种情况有两种情况: // 1.构建线程池时 核心线程数为0  // 2.设置核心线程允许超时allowCoreThreadTimeOut属性设置为trueelse if (workerCountOf(recheck) == 0)// 添加一个 不需要任务且是非核心的工作线程 , 目的是处理在阻塞队列中无法被处理的任务addWorker(null, false);}// 线程池的状态不是RUNNING状态 或者 状态是RUNNING但是将任务添加到阻塞队列中 失败了走这个逻辑/*** 到这里 可能由于调用了shutdown、或者shutdownNow方法改变了线程池状态,或者是阻塞队列满了无法添加此任务* 如果是shutdown状态*/// 添加非工作线程成功 直接结束本次execute方法else if (!addWorker(command, false))// 添加失败,执行拒绝策略reject(command);
}

3.3.2 execute 方法的执行流程图

  • 1.找的图片
  • 2.画的流程图 执行流程图

3.3.3 addWorker方法

/*** firstTask: 就是传入过来待执行的任务* core: true则添加的是核心线程,false则添加非核心线程* addWorker方法 可以分为两个模块去看 *          第一个模块就是对线程池状态、工作线程数量做校验模块*          第二个模块就是添加工作线程模块* */
private boolean addWorker(Runnable firstTask, boolean core) {// ================================================================================// ==================== addWorker第一块 对线程池的状态信息做校验 =======================// ===============================================================================// 为了从内层for循环,跳出外层for循环 使用标签语法retry:// =======================================================// **外层在校验线程池的状态信息 , 内层for循环在校验线程池工作线程数**// =======================================================for (;;) {// 拿到ctl、根据ctl线程池状态rsint c = ctl.get();int rs = runStateOf(c);// rs >= SHUTDOWN 说明线程池状态不是RUNNING,即当前线程池的状态不是正常的 , 当前不能添加新的任务// 如果线程池状态为SHUTDOWN,并且阻塞队列中有任务,那么就需要添加一个工作线程去处理阻塞队列中的任务 ( 满足前边的addWorker(null,false) )if (rs >= SHUTDOWN &&// 到这里状态已经不是RUNNING状态了,只有SHUTDOWN状态才可以继续处理阻塞队列中的任务// 如果三个条件有任意一个不满足返回false,配合! 代表不需要添加工作线程的// 1.不是SHUTDOWN状态(而是STOP状态),2. 任务不是空的,3. 阻塞队列是空的 |||| 满足这三个条件 则不添加工作线程! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))// 由于线程池状态的原因 本次添加工作线程失败return false;// =======================================================// ** 内层循环做工作线程数量的判断操作**// =======================================================for (;;) {// 根据ctl获取当前 工作线程个数 wcint wc = workerCountOf(c);// 如果工作线程数量 >= 最大容量 if (wc >= CAPACITY ||// 基于core判断能否添加 核心线程或非核心线程// 如果要添加核心线程 工作线程 >= 核心线程数量 则不可以再添加核心线程 直接返回false// 如果要添加非核心线程那么 当前工作线程数 >= 设置的最大工作线程数 那么也不可以添加 直接返回falsewc >= (core ? corePoolSize : maximumPoolSize))// 由于工作线程个数的问题 本次添加工作线程失败return false;// ===== 上边数量校验通过 证明本次addWorker是可以添加工作线程 =====// CAS自增ctl , 然后break跳出第一模块校验参数模块if (compareAndIncrementWorkerCount(c))// 跳出两层for循环 进入addWorker第二模块(添加工作线程模块)break retry;// ============== 到这说明 CAS自增ctl属性失败 ctl发生了并发现象 =============// 再次拿到ctl , 此时拿到其他线程更改过后的ctlc = ctl.get();// 根据新的ctl获取线程池状态 如果发现状态和进入此方法时状态不同 那么就重新执行 第一模块校验参数模块// 如果状态没变 那么重新执行内层for循环即可 重新基于CAS自增ctlif (runStateOf(c) != rs)// 重新执行第一模块校验模块continue retry;}}// ================================================================================// ======================= addWorker第二块,添加工作线程、启动工作线程 ====================// ================================================================================// 工作线程启动成功标志位 默认false未启动boolean workerStarted = false;// 工作线程添加成功标志位 默认false未添加boolean workerAdded = false;// 工作线程对象,默认为nullWorker w = null;try {// 创建工作线程对象并且把任务传递进去 , 如果传过来的firstTask不是空的 那么就会先处理带过来的任务 , 如果没有那么才回去阻塞队列中查找任务w = new Worker(firstTask);/*** Worker()构造器内部  this.thread = getThreadFactory().newThread(this);* 获取工作线程对象里边由ThreadFactory创建出来的 Thread对象*/final Thread t = w.thread;// 这里Thread如果是null,那么可能是 new Worker() 时内部使用提供的ThreadFactory对象创建出来的Thread是nullif (t != null) {// 加锁 , 因为下方会操作成员变量可能会出现线程安全问题// 下方会操作成员变量 workers、largestPoolSizefinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 再次获取当前线程池状态 rsint rs = runStateOf(ctl.get());// rs < SHUTDOWN : 说明线程池只能是RUNNING状态 , 状态正常执行if逻辑// rs == SHUTDOWN && firstTask == null , 只能是前边的 addWorker(null,false) 用来添加一个非核心工作线程来处理阻塞队列中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 到这里边说明可以添加工作线程// 如果线程已经启动过了 那么抛出异常// 这里只可能是ThreadFactory里边生成Thread是已经启动了的线程if (t.isAlive())throw new IllegalThreadStateException();// private final HashSet<Worker> workers = new HashSet<Worker>();// 校验线程没有问题 , 把当前工作线程添加到 workes 工作线程集合中workers.add(w);// 获取 工作线程数量int s = workers.size();// private int largestPoolSize; 表示线程池最大线程个数记录// 当前工作线程个数 > 原来的最大线程个数if (s > largestPoolSize)// 替换最大线程个数largestPoolSize = s;// 添加工作线程成功workerAdded = true;}} finally {mainLock.unlock();}// 添加成功 启动工作线程if (workerAdded) {// 启动工作线程t.start();// 启动成功标志workerStarted = true;}}} finally {// 如果工作线程启动失败 , 那么这个工作线程也就没有意义了 执行addWorkerFailed方法if (! workerStarted)addWorkerFailed(w);}// 返回工作线程是否启动成功return workerStarted;
}
3.3.3.1 addWorkerFailed , 启动工作线程失败执行方法
// 启动工作线程失败 做补偿操作
private void addWorkerFailed(Worker w) {// 需要操作workes 所以需要加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)// 将这个工作线程从 workes移除workers.remove(w);// 将 ctl -1 , 代表去掉了一个工作线程个数 (addWorker对ctl+1操作是在 第一模块校验时已经+1了)decrementWorkerCount();// 工作线程启动失败 可能是因为状态改变了 判断是不是可以走 TIDYING -> TERMINATED 结束线程池tryTerminate();} finally {mainLock.unlock();}
}

3.4 Worker工作线程类

/*** 1. Worker继承了AQS,可以实现一些锁的机制,这里的目的就是为了控制工作线程的中断* 2. Worker实现了Runnable,内部的Thread对象在执行start时,会执行Worker中的run方法逻辑* */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{// ====================================================================// ================================ Worker 管理任务模块 =================// ====================================================================// 由线程工厂构建出来的线程对象final Thread thread;// 当前Worker要执行的任务Runnable firstTask;// 记录当前工作线程处理了多少个任务volatile long completedTasks;// 工作线程只有一个创建方式 就是此有参构造器Worker(Runnable firstTask) {// 将state值设置为 -1 , 表示当前Worker不允许被执行中断方法setState(-1);// 赋值任务this.firstTask = firstTask;// 利用线程工厂创建Thread对象// 看到传入的Runnable是当前Worker对象 , 所以Worker里边的thread线程启动后实际执行的是 Worker的run方法逻辑this.thread = getThreadFactory().newThread(this);}// 上边可知 Worker里边的thread启动后 实际执行的就是这个run方法的逻辑 , 执行的就是runWorker()方法@Overridepublic void run() {runWorker(this);}// ====================================================================// =========================== Worker 管理线程中断模块(AQS) ==============// ====================================================================protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }/***  这个方法是中断工作线程时,执行的方法*/void interruptIfStarted() {Thread t;/*** getState() >= 0 , 上边新建Worker时state设置为-1 因此-1表示当前Worker不允许执行中断操作* thread 不为空 , 并且 t 未中断 则可以执行中断线程* */if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// state不是-1 , t不为空,t未中断 , 则可以执行中断线程t.interrupt();} catch (SecurityException ignore) {}}}
}

3.4.1 runWorker

// 能够运行这个方法的唯一方式就是: Worker的run方法 , 因此运行这个方法的一定是线程池的工作线程
final void runWorker(Worker w) {// 拿到当前线程 wtThread wt = Thread.currentThread();Runnable task = w.firstTask;// 已经拿到了task 将工作线程中存储任务的字段设置为nullw.firstTask = null;// 将Worker中的 state设置为0 , 代表当前工作线程给可以被中断w.unlock(); // allow interruptsboolean completedAbruptly = true;try {/** * 这个while循环 就是要让这个工作线程一直处于工作状态 拿到任务->处理任务 , 当结束了while循环则考虑干掉当前工作线程了*  * 如果传入的工作线程Worker 带的有任务 那么就执行带过来的任务 * 如果传入的工作线程Worker 没有带任务 那么就去阻塞队列中拿取任务**/while (task != null || (task = getTask()) != null) {// 执行当前Worker的lock方法, 表示当前工作线程已经开始工作了 不允许中断当前线程// 使用shutdown()关闭线程池时 会检查Worker是否正在运行 正在运行的工作线程则不会被中断w.lock();// 比较ctl>=STOP,如果满足这个条件 则说明线程池已经到了STOP甚至已经终结了// 1. 线程池到了STOP状态,并且当前线程还未中断 , 则需要中断当前线程(if里的逻辑)// 2. 线程池状态不是STOP,线程已经中断并且再次查看线程池状态变为了STOP 那么就再次中断线程// 下面的判断其实就是表达了,如果线程池是 >= STOP状态 那么就必须确保这个线程是处于中断状态的if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted()){wt.interrupt();}try {// 前置钩子函数beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 后置勾子函数afterExecute(task, thrown);}} finally {// 任务执行完成 , 丢掉任务task = null;// 当前工作线程处理任务数量++操作w.completedTasks++;// 执行unlock 将state改为0 , shutdown方法才可以中断这个工作线程w.unlock();}}completedAbruptly = false;} finally {// 结束掉当前工作线程processWorkerExit(w, completedAbruptly);}
}
  • processWorkerExit 结束工作线程方法
/*** @param w , 当前要执行结束的工作线程对象* @param completedAbruptly 突然完成标志位,如果runWorker中判定是突然完成那么就会传入true*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果是突然完成的,表示是异常结束工作线程if (completedAbruptly)// 减少工作线程数量decrementWorkerCount();// 下方需要操作共享变量 所以拿到锁并加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将当前工作线程的 完成任务数量合并到线程池的完成任务数completedTaskCount += w.completedTasks;// 将当前工作线程从workes中移除掉workers.remove(w);} finally {mainLock.unlock();}// 工作线程结束 查看是否是线程池状态改变了tryTerminate();// 拿到ctlint c = ctl.get();// 这个判断表示 如果当前线程池状态 < STOP , 即只能为 RUNNING、SHUTDOWN状态时if (runStateLessThan(c, STOP)) {// 不是突然完成的(正常结束工作线程) 可以进入这个ifif (!completedAbruptly) {// 拿到当前线程池的 可能的最小核心线程数量 , 如果允许核心线程超时 那么最小的核心线程就是0 否则永远为corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果可能的核心线程数最小为0个 并且队列中不是空的if (min == 0 && ! workQueue.isEmpty())// 设置最小核心线程为1个 用来处理阻塞队列中的任务min = 1;// 比较 工作线程数量 >= 当前可能的最小核心线程数量 说明有线程处理任务 正常returnif (workerCountOf(c) >= min)return; // replacement not needed}// 异常结束工作线程,为了避免出现问题 添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程addWorker(null, false);}
}

3.4.2 getTask()

/*** 这个方法就是从线程池的阻塞队列中获取要执行的任务 * *    第一块就是 检查线程池状态和工作线程数量的校验 , 如果不满足直接返回null则拿取任务失败上层runWorker方法的while结束*    第一块校验通过了 才能执行第二块 从阻塞队列中拿取任务*/
private Runnable getTask() {// 默认为false,默认没有超时boolean timedOut = false;for (;;) {// 拿到ctl 、根据ctl获取线程池状态int c = ctl.get();int rs = runStateOf(c);// 如果线程池状态是STOP,那么不需要再处理阻塞队列中的任务了,直接返回null// 如果线程池状态是SHUTDOWN,并且阻塞队列是空的 那么也不需要再处理阻塞队列任务了if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 扣减工作线程个数decrementWorkerCount();return null;}// 根据ctl拿到工作线程个数int wc = workerCountOf(c);// 1. 核心线程允许超时 timed为true// 2. 当前工作线程数已经大于了核心线程数 timed为trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if (// 工作线程>最大线程数(一般不会满足 看为false即可)// 工作线程<=核心线程 那么一定是false// 第一次进入getTask方法的for循环一定不会走这个if里边,第二次就有可能了(wc > maximumPoolSize || (timed && timedOut))&& // 在满足上边条件的前提下 , 当前执行的线程是非核心线程并且timeOut超时标记位是true// 确保工作线程除了自己还有至少一个 , 或者阻塞队列是空的 , 那么就可以尝试减少工作线程数量 返回null由上层方法结束掉当前工作线程了(wc > 1 || workQueue.isEmpty())) {// CAS减少工作线程数if (compareAndDecrementWorkerCount(c))return null;// 若CAS失败再次执行for循环continue;}// 工作线程从阻塞队列中拿任务try {// 是核心线程timed是false , 非核心是trueRunnable r = timed ?// 非核心,等待一会 keepAliveTimeworkQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 核心线程,使用take方法死等workQueue.take();// 从阻塞队列中拿到任务不为null,拿到任务返回任务然后去执行即可if (r != null)return r;// 到这,没拿到任务 timeOut设置为true , 所以再次经过for循环就可以退出这个方法返回null结束当前线程了timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

3.5 关闭线程池方法

3.5.1 shutdownNow()

  • 该方法会将线程池状态从 RUNNING -> STOP , 立即中断所有的工作线程 , 并且不会处理阻塞队列中的任务
// shutdownNow 不会处理阻塞队列的任务所以会将任务返回给客户端
public List<Runnable> shutdownNow() {// 返回的任务List<Runnable> tasks;// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池状态修改为 STOP advanceRunState(STOP);// shutdownNow不管工作线程是否再执行任务都会执行 中断工作线程interruptWorkers();// 将阻塞队列中的任务放到tasks中tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}
advanceRunState
private void advanceRunState(int targetState) {// 死循环必然会修改状态for (;;) {int c = ctl.get();// 如果当前线程池状态已经 >= 将要修改的状态 则不需要修改了if (runStateAtLeast(c, targetState) ||// 将targetState重新和工作线程数量组成新的ctl 并将ctl修改为这个新组装的值ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}
interruptWorkers
// shutdownNow方法 立即执行中断工作线程操作
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 执行所有Worker的interruptIfStarted方法// 所以shutdownNow会立即中断正在运行的工作线程for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}

3.5.2 shutdown();

  • 该方法会将线程池状态从 RUNNING -> SHUTDOWN , 不会立即中断正在干活的工作线程 , 并且会等待阻塞队列中的任务处理完成后再关闭线程池
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池状态 变为 SHUTDOWN状态advanceRunState(SHUTDOWN);// 中断处于空闲状态的工作线程interruptIdleWorkers();// 勾子函数 自己实现onShutdown();} finally {mainLock.unlock();}// 尝试结束线程池tryTerminate();
}
interruptIdleWorkers()
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
// 中断空闲的工作线程
private void interruptIdleWorkers(boolean onlyOne) {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 线程目前没有中断 , 那么就去获取Worker的锁资源 都满足的话才能够执行线程中断操作// 工作线程在执行任务时会先把当前工作线程的 state变为1 所以这里tryLock正在运行的工作线程是失败的(所以这里只会中断空闲的工作线程)if (!t.isInterrupted() && w.tryLock()) {// 进到这里 说明当前工作线程是空闲线程try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}

3.5.3 tryTerminate

// 查看当前线程池是否可以变为TERMINATED状态
final void tryTerminate() {for (;;) {int c = ctl.get();// 如果是RUNNING状态 直接return// 如果现在的状态 >= TIDYING 说明即将变为TERMINATED状态 那么也可以return结束方法// 如果是SHUTDOWN状态并且阻塞队列还有任务,那么也不可以变为TERMINATED状态 return结束if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果还有工作线程 if (workerCountOf(c) != 0) {// 中断空闲的工作线程 只中断一个interruptIdleWorkers(ONLY_ONE);// 本次尝试结束 , 等下次工作线程为0个之后 再进来尝试改变状态return;}// 加锁 , 可以执行Condition的释放操作final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将线程池状态修改为 TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 执行勾子函数// 可以在线程池结束时做一些额外操作 自己实现terminated();} finally {// 修改状态为TERMINATED状态ctl.set(ctlOf(TERMINATED, 0));// 唤醒正在等待线程池结束的线程termination.signalAll();}return;}} finally {mainLock.unlock();}}
}

四、线程池整体流程图

线程池整体流程

JUC(十)-线程池-ThreadPoolExecutor分析相关推荐

  1. Java线程池ThreadPoolExecutor使用和分析

    Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) Java线程池ThreadPoolExecutor使用和分析(三 ...

  2. c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析

    前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...

  3. Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

    相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池Thr ...

  4. Java 线程池(ThreadPoolExecutor)原理分析与使用 – 码农网

    线程池的详解 Java 线程池(ThreadPoolExecutor)原理分析与使用 – 码农网 http://www.codeceo.com/article/java-threadpool-exec ...

  5. Tomcat线程池监控及线程池原理分析

      目录         一.背景         二.tomcat线程池监控         三.tomcat线程池原理         四.总结 一.背景 我们都知道稳定性.高可用对于一个系统来讲 ...

  6. JAVA线程池的分析和使用

    1. 引言 合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行.第三:提 ...

  7. java 线程池原理分析

    一.为什么使用线程池 1.降低资源消耗,减少线程创建和销毁次数,每个工作线程可以重复利用,执行多个任务 2.可根据系统承受能力,调整工作线程的数目,防止消耗过多的内存 二.java 线程池使用 Exe ...

  8. 聊聊并发(三)——JAVA线程池的分析和使用

    1. 引言 合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行.第三:提 ...

  9. Java并发—线程池ThreadPoolExecutor基本总结

    原文作者:Matrix海子 原文地址:Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线 ...

最新文章

  1. 【iOS】sqlite3的使用(増删改查)
  2. 二叉树剪枝_决策树,生成剪枝,CART算法
  3. python 程序停止打印日志_优雅停止 SpringBoot 服务,拒绝 kill -9 暴力停止!
  4. Oracle数据库,当DML操作时执行触发器记录日志
  5. 提高网站性能的常见方法
  6. mysql教程排序_MySQL中的排序函数field()实例详解
  7. Java内部类实例测试及总结
  8. list numpy array tensor转换
  9. SQL Server用户权限详解
  10. JS实现数据库连接并查询
  11. 自动升级WordPress失败解决方法
  12. 利用递归层次遍历句法结构树(Stanfordcorenlp及nltk)
  13. 小米4android8.0root,小米8青春版获取root权限的教程
  14. html修改网站图标,分享内容,分享图标等
  15. 【计算机网络】思科实验(3):使用三层交换机实现跨VLAN间的通信
  16. 同步时序逻辑电路分析——数电第六章学习
  17. Python---统计《三国演义》中出现次数较高的人物
  18. mysql 模糊查找表名
  19. python读写xls
  20. matlab直流输电,基于MATLAB/Simulink的高压直流输电系统的仿真研究

热门文章

  1. Anonympy——使用Python进行数据匿名化
  2. 导出富文本格式word
  3. constantlayout布局
  4. android 动态替换logo
  5. [附源码]java毕业设计校园超市进销存管理系统
  6. C语言实现9宫格数独
  7. Failed to determine a suitable driver class
  8. 在Controller注入Service报错的解决方法
  9. DHTMLXGantt in Flutter DHTMLXGantt
  10. 一个软件网络连接异常_拥有苹果电脑后,最应该预装的7款Mac应用软件