线程池ThreadPoolExecutor原理剖析


文章目录

  • 线程池ThreadPoolExecutor原理剖析
    • 1. 介绍
    • 2. 结构
      • (1). 线程池状态
      • (2). 线程转换
      • (3). 线程池参数
      • (4). 线程池类型
      • (5). 其他
    • 3. 源码分析
      • (1). public void execute(Runnable command)
      • (2). 工作线程Worker的执行
      • (3). shutdown操作
      • (4). shutdownNow操作
      • (5). awaitTermination操作
    • 4. 总结

文章创作参考于Java并发-----第8章 Java并发包中线程池ThreadPoolExecutor原理剖析

1. 介绍

线程池主要解决两个问题:

  1. 当执行大量异步任务是线程池能提供良好的性能,如果不使用线程池,每当需要执行异步任务时直接new一个线程来运行,需要很大的开销.
  2. 线程池提供了一种资源限制和管理的手段,比如限制线程的个数,动态新增线程等.

2. 结构

​  Executors实际上是一个工具类,里面提供了很多静态方法,这些方法根据用户选择返回不同的线程池实例.

​  成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程的个数(高3位用来表示线程池状态,低29位用来表示线程个数).

(1). 线程池状态

状态 含义
RUNNING(111) 接受新任务并处理阻塞队列里的任务
SHUTDOWN(000) 拒绝新任务,但是处理阻塞队列中的任务
STOP(001) 拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务
TIDYING(010) 所有任务都执行完之后当前线程池活动线程数为0,将调用terminated()方法
TERMINATED(011) 终止状态,调用terminated()方法并完成后的状态

(2). 线程转换

状态转换 转换途径
RUNNING —> SHUTDOWN 调用shutdown()方法
RUNNING或SHUTDOWN —> STOP 调用shutdownNow()方法
SHUTDOWN —> TIDYING 线程池和任务队列都为空
TIDYING —> TERMINATED 当terminated()hook方法执行完成时

(3). 线程池参数

参数 含义
corePoolSize 线程池核心线程个数
workQueue 等待执行的任务的阻塞队列
maximunPoolSize 最大线程数量
ThreadFactory 创建线程的工厂
keeyAliveTime 存活时间

(4). 线程池类型

类型名称 类型特点
newFixedThreadPoll 带参数构造,参数为核心线程数和最大线程数,阻塞队列长度为Integer.MAX_VALUE
newSingleThreadExecutor 核心线程数和最大线程数都为1,但是阻塞队列长度为Integer.MAX_VALUE
newCachedThreadPoll 按需创建线程,初始个数为0,最多为Integer.MAX_VALUE,根据线程存活时间进行回收,特殊点在于加入同步队列的任务马上就会被执行,同步队列最多只能有一个元素

(5). 其他

  • mainLock是独占锁,用来空值Worker线程操作的原子性
  • termination是该锁对应的条件队列
  • Worker继承AQS和Runnable接口,是具体承载任务的对象.继承了AQS,内部实现了简单的不可重入锁,其中state=0表示锁空闲,state=1表示锁被占用.state=-1是创建时的默认状态,为了避免才运行runWorker()方法时被中断.

3. 源码分析

(1). public void execute(Runnable command)

​  提交任务command带线程池进行执行.

public void execute(Runnable command) {// 如果传入的任务为空,抛出异常if (command == null)throw new NullPointerException();// 获取线程池的状态和线程数量int c = ctl.get();// 线程池未满,则添加新线程并更新.addWorker()方法在后面讲解.if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 如果线程处于RUNNING状态,将任务添加到阻塞队列if (isRunning(c) && workQueue.offer(command)) {// 检查任务添加到阻塞队列后的线程池状态int recheck = ctl.get();// 如果这时线程池状态改变了,删除刚刚添加的任务,并执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 如果线程池状态没有改变,但是为空,添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果线程不是RUNNING状态或者添加任务失败,都说明阻塞队列已满// 尝试开启一个新线程,如果失败就执行拒绝策略else if (!addWorker(command, false))reject(command);
}// core如果为true,请使用corePoolSize(核心线程数)作为绑定,否则使用maximumPoolSize(最大线程数)。
// 代码很长,其实就做了两件事。
//     1)才用循环CAS操作来将线程数加1;
//     2)新建一个线程并启用。
private boolean addWorker(Runnable firstTask, boolean core) {retry:// 第一部分 循环CAS操作,将线程池中的线程数+1.for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查队列的状态是否是非RUNNING状态// 是SHUTDOWN状态时  传入的任务不为空 或 工作队列为空// 如果符合以上描述,返回false// 因为SHUTDOWN状态下不接受新任务,且当工作队列为空时说明阻塞队列也为空了,这是会转换成TIDYING状态if (rs >= SHUTDOWN &&!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;// 自旋增加线程个数for (;;) {int wc = workerCountOf(c);// 线程个数超限返回false// CAPACITY最大线程个数,极限值,非用户定义if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS增加线程个数,只能有一个线程成功,成功的线程进入下一块内容if (compareAndIncrementWorkerCount(c))break retry;// 没成功的线程查看线程池状态是否发生变化// 如果发生变化,调到外层循环重新获取线程状态// 如果没有变化,内层自旋c = ctl.get();if (runStateOf(c) != rs)continue retry;}}// 第二部分 新建线程,并加入到线程池workers中。// 能运行到这里说明CAS操作成功了,boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 封装任务为Worker对象,并抽取出线程w = new Worker(firstTask);final Thread t = w.thread;// 上一步成功执行if块内的代码// 否则返回falseif (t != null) {// 上锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 重新获取线程池状态int rs = runStateOf(ctl.get());// 如果当前线程是RUNNABLE状态  或者  是SHUTDOWN状态但传入的任务为空if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {// 判断添加的任务状态,如果已经开始丢出异常(外部手动启动线程)if (t.isAlive())throw new IllegalThreadStateException();// 将新建的线程加入到线程池中workers.add(w);int s = workers.size();// 修正最大池深度的值if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {// 解锁mainLock.unlock();}// 添加任务成功就启动任务if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}


通过excute()提交任务至线程池时

  1. 当前线程数量<线程池基本大小(corepoolsize)时,即使有空闲进程,也要新建线程执行新任务
  2. 当前线程数量>=线程池基本大小(corepoolsize)时
    1. 任务队列(workQueue)未满时,将任务放入任务队列
    2. 任务队列已满时
      1. 当前线程数量<最大线程数量(maximunpoolsize)时,新建线程,处理任务
      2. 当前线程数量=最大线程数量时,通过拒绝策略(handle)处理该任务

用户线程提交任务到线程池的模型图如下

  • 用户线程:相当于生产者
  • workers:相当于消费者,保存线程的容器,里面存放了worker线程,HashSet类型
  • 任务队列(workQueue):被提交,但尚未被执行

(2). 工作线程Worker的执行

​  用户提交任务到线程池之后,是由Worker来执行的.

​  Worker相当于一个自己带锁的线程.

// Worker的构造方法
Worker(Runnable firstTask) {// 现将state设置为-1是防止在运行前被中断(shutdownNow方法中断所有线程).setState(-1);this.firstTask = firstTask;// 由工厂生产线程this.thread = getThreadFactory().newThread(this);
}// 执行任务
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 将state设置为0,允许中断w.unlock();boolean completedAbruptly = true;try {// 当前Worker的调度任务不为空或者能获取到任务while (task != null || (task = getTask()) != null) {w.lock();// 线程池处于STOP状态或者当线程被中断时处于STOP状态// 但是现在并没有被中断// 处于以上情景时,发出中断请求if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))&& !wt.isInterrupted())wt.interrupt();// 执行扩展接口代码try {// 开始执行任务前的Hook(钩子)// 在本类中为空实现,可以在子类中加入诸如事务控制之类的动作beforeExecute(wt, task);Throwable thrown = null;try {task.run();// 发现异常就向上一层抛出} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 同上afterExecute(task, thrown);}} finally {// 每次运行完任务,将任务标记为null,并计数task = null;w.completedTasks++;w.unlock();// 解锁}}completedAbruptly = false;} finally {// completedAbruptly为true,说明之前抛出异常了,会进行清理工作processWorkerExit(w, completedAbruptly);}
}
// 该方法不断的(从等待队列中)向Worker输入任务
private Runnable getTask() {boolean timedOut = false;for (;;) {// 获取线程池状态int c = ctl.get();int rs = runStateOf(c);// 如果线程池状态为SHUTDOWN时队列为空 或者 为STOP或TERMINATE状态if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 回收线程,线程数量-1decrementWorkerCount();return null;}// 重新得到线程数int wc = workerCountOf(c);// 标识,含义为当前线程空闲,应该回收// allowCoreThreadTimeOut含义://该值为true,则线程池数量最后销毁到0个。//该值为false,超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。// 当线程数超过核心线程池大小或者销毁机制为销毁到0(可能跟STOP状态有关)时,回收该线程boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 如果线程数目大于最大线程数目,或者允许超时回收或者超时,则跳出循环,继续去阻塞队列中取任务// (如果线程数超过线程池大小 || (标识为应该回收&&还有work可以获取)) && (有线程||work队列非空)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 注:poll不阻塞,take阻塞// 当标记为清除时,尝试获取一个Worker,否则阻塞获取一个WorkerRunnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// r!=null 说明标记为不清除,而且得到了workif (r != null)return r;// r==null 说明没有获得work   队列为空了  没有work可以获取了timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}// 将w清理出worker队列(线程池)
// Worker w:要执行退出的Worker对象
// boolean completedAbruptly:是否用户异常退出,true为异常退出。
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 是否是意外退出,如果是,将WorkerCount--if (completedAbruptly)decrementWorkerCount();// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 给总完成任务数计数,然后将worke移除线程completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();// 解锁}// 调用tryTemiate,进行判断当前的线程池是否处于SHUTDOWN状态,如果是终止线程tryTerminate();// 这一整块的含义是判断当前线程数是否小于核心线程个数,如果小于则添加新的线程.int c = ctl.get();// 判断当前的线程池状态,如果当前线程池状态比STOP大的话,就不处理if (runStateLessThan(c, STOP)) {// 判断是否是意外退出,如果不是意外退出的话,那么就会判断最少要保留的核心线程数if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果最少保留的Worker数为0的话,那么就会判断当前的任务队列是否为空,如果任务队列不为空的话而且线程池没有停止,那么说明至少还需要1个线程继续将任务完成。if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果当前运行的Worker数比当前所需要的Worker数少的话,那么就会调用addWorker,添加新的Workerif (workerCountOf(c) >= min)return;}addWorker(null, false);}
}

(3). shutdown操作

​  调用shutdown方法后,线程池就不会再接受新任务了,但是继续执行之前添加到任务队列中的任务.

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查当前调用shutdown命令的线程是否有关闭线程的权限checkShutdownAccess();// 设置线程状态为SHUTDOWNadvanceRunState(SHUTDOWN);// 设置中断interruptIdleWorkers();onShutdown();} finally {mainLock.unlock();}// 判断当前的线程池是否处于SHUTDOWN状态,如果是终止线程tryTerminate();
}// 多个线程一起设置时,一个线程设置成功退出,其他线程自旋一次,在下一次判断时状态已经被修改,也退出
private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}// 中断空闲的线程
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历每一个工作线程for (Worker w : workers) {Thread t = w.thread;// 如果工作线程没有被中断,且没有正在运行,那么补充一个中断标志// 正在获取任务的线程没有锁,tryLock会返回falseif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}// 判断当前的线程池是否处于SHUTDOWN状态,如果是终止线程
final void tryTerminate() {// 自旋执行for (;;) {// 首先获取线程状态int c = ctl.get();// 如果是RUNNING和TIDYING状态 或者 是SHUTDOWN状态但队列不为空// 说明当前不应该被终止if (isRunning(c) || runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()) )return;// 如果线程数不为0,中断每一个线程(关闭所有空闲线程)if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 使用CAS设置状态为SHUTDOWN(000)if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 钩子方法terminated();} finally {//钩子方法执行完毕将线程池状态设置为TERMINATED,释放所有条件等待的线程ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

(4). shutdownNow操作

​  调用shutdownNow()方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列中的任务,正在执行的任务会被中断,该方法会立刻返回,返回值为这时候队列中被对其的任务列表.

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 权限检查checkShutdownAccess();// 修改线程池状态为STOPadvanceRunState(STOP);// 中断所有线程interruptWorkers();// 将中断的任务队列移动到tasks中,稍后返回tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}
private List<Runnable> drainQueue() {// 获取工作队列BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();// 移动队列内节点q.drainTo(taskList);// 如果有剩余节点,if (!q.isEmpty()) {// 手动一个一个移动for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}

(5). awaitTermination操作

​  调用该方法之后,当前线程会被阻塞,知道线程池转台变为TERMINATED或者等待超时才返回.(等同于终止这个线程直到线程池被终止)

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {// 包装参数,加锁long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 自旋for (;;) {// 是TERMINATED状态,则不需要进行任何操作了if (runStateAtLeast(ctl.get(), TERMINATED))return true;// 如果传入时间为无效时间,返回falseif (nanos <= 0)return false;// 如果时间有效,条件阻塞nanos微秒,唤醒后重新判断当前线程池状态nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}
}

4. 总结

Java并发包中的线程池工作原理如下:

​  类中有一个任务队列,用于存放当前线程都被占用,但是仍然要进行的任务.线程被Worker执行,并且自旋式的不断从任务队列获取新的任务,直到任务队列为空,并且当前Worker空闲,就会被移出线程池.

​  调用shutdown()方法后,会停止接收任务,并中断空闲的线程(当前正在获取任务)

​  调用shutdownNow()方法后,停止接收任务,并中断所有线程.将任务队列返回.

​  调用awaitTermination()方法后,将当前线程挂起直至超时或者线程池被销毁.

Java并发编程--线程池ThreadPollExecutor原理探究相关推荐

  1. Java 并发编程 -- 线程池源码实战

    一.概述 小编在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都 ...

  2. Java并发编程——线程池的使用

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统 ...

  3. java workerdone_【架构】Java并发编程——线程池的使用

    前言 如果我们要使用线程的时候就去创建一个,这样虽然非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为 ...

  4. java并发编程——线程池的工作原理与源码解读

    2019独角兽企业重金招聘Python工程师标准>>> 线程池的简单介绍 基于多核CPU的发展,使得多线程开发日趋流行.然而线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以 ...

  5. Java并发编程-线程池底层工作原理

    线程池底层工作原理 1.线程池的底层工作流程 1.1.线程池的底层工作原理图 1.2.银行办理业务案例 1.3.线程池的底层工作流程总结 2.线程池用哪个?生产中如何设置合理参数 2.1.在工作中单一 ...

  6. Java并发编程——线程池初步

    概述: 线程池机制是事先创建一些线程等待服务端程序的调用,这些线程保存在一个数组结构中,称为"线程池".当服务器有任务执行时,就从线程池中取出一个线程并给其分配任务,当线程任务执行 ...

  7. java并发测试 线程池,Java并发编程——线程池

    1.任务与执行策略间的隐性耦合 一些任务具有这样的特征:需要或者排斥某种特定的执行策略.对其他任务具有依赖性的任务,就会要求线程池足够大,来保证它锁依赖任务不必排队或者不被拒绝:采用线程限制的任务需要 ...

  8. java线程池_Java 并发编程 线程池源码实战

    作者 | 马启航 杏仁后端工程师.「我头发还多,你们呢?」 一.概述 笔者在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写 ...

  9. java excutorthread_JAVA 线程池ThreadPoolExcutor原理探究

    概论 线程池(英语:thread pool):一种线程使用模式.线程过多会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务.这避免了在处理短时间 ...

  10. Java并发编程—定时器Timer底层原理

    原文作者:妮蔻 原文地址:Java并发编程笔记之Timer源码分析 目录 一.timer问题复现 二.Timer 实现原理分析 timer在JDK里面,是很早的一个API了.具有延时的,并具有周期性的 ...

最新文章

  1. mysql mac客户端: sequel,mysql-workbench
  2. MySQL Percona Toolkit--pt-osc与online DDL选择
  3. 云炬Android开发笔记 5-9,10拦截器功能设计与实现
  4. 看了《隐秘的角落》才知道,掉头发有多可怕!10个掉头发最快的专业!快看看你中枪了没有!...
  5. 从零开始学Pytorch(三)之多层感知机的实现
  6. 鹅厂优文 | ReactJS一点通
  7. git clone 出现fatal: unable to access ‘https://github 类错误解决方法
  8. 关于原型污染漏洞的完整指南
  9. html中隐式转换成数字,详解JS中的隐式类型转换
  10. 项目集成sentry
  11. Word文档《Document SAFER 2》
  12. 网站内链外链批量抓取工具
  13. im即时通讯源码+软件+app附详细封装视频搭建教程
  14. icom对讲机写频线定义_ICOM对讲机的常见故障和使用中的问题
  15. inkscape裁剪
  16. Sap hana 升级思路
  17. 动环监控设备维护与故障处理,动环监控系统调试
  18. python 正则表达式 前瞻_【正则表达式】前瞻,后顾,负前瞻,负后顾
  19. VScode 常用必备插件
  20. javascript-解析xml文件-在html中实现二级联动分析及案例

热门文章

  1. kali钓鱼(超详细)
  2. html如何实现雪花飘落,如何使用HTML5 canvas实现雪花飘落
  3. Sharepoint 2010 学习资源总结
  4. 1小时学会通过Java Swing Design设计java图形化
  5. v-if和v-show的使用和特点
  6. prusai3打印机使用教程_【打印虎原创】Prusa_i3_3D打印机校准图解教程-基础篇
  7. 明朝崇祯十年丁丑科状元刘同升后裔在松滋
  8. win7 64 下Vim与Vundle,pathogen的安装【转】
  9. Android.mk编译错误 FAILED: ninja: unknown target ‘MODULES-IN-packages-apps-XXXX‘
  10. 内存泄漏工具asan