ThreadPoolExecutor线程池


说起线程池大家一定都不陌生,其实在很多地方都有应用。
当我们解释为什么使用线程池的时候,我们都会说线程池可以减少线程创建的开销,节省系统的资源。
但其实线程池真正做到我们说的那样,还和我们设置的参数有关。我们都知道在我们使用线程池的时候,开头要搞很多参数。

 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue){}

这些个参数,相信你们一定不会陌生,在使用的时候还要考虑半天,这参数我该怎么设置好呢?

那我们首先来了解这些个参数的意义,这些参数大多对应的也是线程池中的成员,既然线程池的源码中对于往线程池中添加一个任务采用的是这样 addWorker , Worker, workerCountOf.

看到Woker这样的字眼,我觉得采用工人这个词来可能会更加的清晰。

先说一个误区:在我们new ThreadPoolExecutor()的时候设置核心线程数,不是说我们new 完之后这些个线程就已经存在,而是之后我们向线程池采用execute添加任务(Runnable)时才会创建线程,核心线程的数量只能说明是一个数量上的限制,当超过这个数量就是另当别论了。

其实采用execute添加Runnable任务的时候,是将任务Runnable交给一个Worker,然后启动该worker,进行任务的执行。
所以线程池中的线程,我下面都采用工人(Worker)这个字眼来代替.

1. private volatile int corePoolSize;

例如 corePoolSize = 5
线程池始终保持有5个工人,不管这些个工人是在核心工人数没有到达5这个阀值时创建的工人,还是由于已经有5个工人在执行任务,已经创建的5人都在忙的状态,而此时阻塞队列也满了,线程池会创建新的工人(也即后创建的工人)。

不管是先创建的还是后创建的,不会说等所有工人都没有任务执行的时候保留先创建的5个,而解除后创建的工人这一说。而是始终保持所有创建的工人中的5个。

2. private volatile int maximumPoolSize;
线程池能容纳的最多的工人数量,

3. private volatile long keepAliveTime;
当工人的数量大于核心工人的数量的时候,不管是哪个工人执行完自己手头的任务,去阻塞队列进行取任务的时候,都会等待这个时间,如果阻塞队列中有任务,那就万事大吉,这个工人还能用,如果阻塞队列中迟迟没有任务,那么该工人就采用这个时间等等,时间过了,没有任务,那么该工人就该下岗了。

4. TimeUnit unit 这个不是成员,
例如unit = TimeUnit.MILLISECONDS,是采用毫秒进行计算
就是由用户来设置,对于 keepAliveTime采用怎样的单位进行计算,计算出最终的等待时间,

5. private final BlockingQueue workQueue;
用于存储任务的阻塞队列,当核心数量的工人都在执行任务的时候,再来新的任务,就会添加到这个阻塞队列中。

正儿八经线程池工作原理

情况1 : 当工人的数量没有超过核心工人数量的这个阀值的时候

来一个任务新建一个工人,等待工人去执行,那么此时来说,阻塞队列肯定为空,如果此时没有任何的新任务进来,那么工人执行完自己的任务也不闲着,会去阻塞队列中去看一眼,看有没有任务等待执行,那么此时肯定是没有的,所以当前执行完自己任务的工人就会阻塞起来。

情况2 : 当工人数量达到核心工人数量的时候,但阻塞队列没有满的时候

此时是不会针对新来任务创建工人的,而是将任务加到阻塞队列中,所以针对 情况1 的工人阻塞就可以处理了,因为 情况2 是不会创建新的工人,只是将任务加入到阻塞队列,所以如果在 情况1 由于工人没有任务可执行,工人阻塞起来,那么在此时,由于阻塞队列有任务,就会唤醒那些阻塞起来的工人,阻塞队列中加一个任务,唤醒一个工人。那么工人就有任务可执行了,(这也就是我们为什么说线程池节省系统资源的原因,因为工人没闲着,只要有活就干,相当于一个线程执行完一个任务,线程不会消亡而是在任务队里中再拿任务执行,可以做到线程的循环使用)。

情况3:当工人达到核心工人数量的时候,阻塞队列也已满,而且核心数量的这哥几个工人还都手头有任务没有执行完。

此时如果有任务来,线程池就不得不创建工人了,没办法,任务都堆积如山了,那么此时新创建的该工人就执行当前任务。
如果此时由任务来,会出现两种情况:
3.1 :该新建工人还在执行任务,之前的工人也还都在执行任务,阻塞队列还是满的,那么线程池就再创建一个工人执行新任务,如果一直是我说的这种情况,(不管是之前的工人还是现在的工人都在执行任务,并且阻塞队列里的任务还都是满的话),那么对于新任务就创建一个工人,直到工人的数量大于线程池容纳最多工人的数量,此时线程池就不接任务了,同时也会执行拒绝策略 (但是一般这种情况很少出现)。

3.2:新创建的工人或者之前的工人有一个或多个已经将手头的任务执行完了,那么此时阻塞队列就会有空位,因为那些执行完任务的工人会去阻塞队列去拿任务执行,那么此时来的新任务就进入阻塞队列,而不是重新创建工人执行了。

对于上边的 情况3 来说,不管是情况3 下两种情况的哪一种情况,都说明已经建立了新的工人(比核心数量多的工人),顾名思议就是,此时的工人的数量已经超过核心工人数量。

对于 情况3 的第一种情况来说属于极端情况,那当然极端情况就是线程池不再接受新的任务了,就算不接受新的任务,那么线程池也会面临下面的问题。
迟早工人们(工人数量大于核心工人的数量)会完成自己的手头工作,去阻塞队列中拿任务执行,阻塞队列任务也会被拿完的情况(这种情况当然是我们的正常情况,哪有说工人执行任务的速度还赶不上添加任务的速度,那这效率也太慢了).

此时分为两种情况,只要有工人完成手中的任务,去阻塞队列中拿任务,拿到任务执行就完事了,随着所有工人渐渐的完成自己手中任务都挨个去阻塞队列中拿任务的时候,总有人会因为阻塞队列为空而拿不到任务,此时我们说的那个线程池中的变量keepAliveTime派上用场了,如果某个工人等了keepAliveTime时间,阻塞队列还为空,线程池就会让该工人下岗了,但对于这个下岗来说,有一个限制就是corePoolSize,所以这还得看那些工人的命,要是某一个工人去阻塞队列中拿任务,没有拿到,并且此时总的工人数量大于和核心工人数量,就让该工人下岗,如果小于核心工人的数量,那算这个工人命好没有被迫下岗(这也就是我前面说的,corepollSize的作用就是用来维持工人的数量的,没有什么核心工人这一说,也就是核心线程。只是说线程池要留够corepollSize这个数量的工人,至于留谁那看工人的命了,也就是刚才说过的。).
那么现在的局面也就回到了我们 情况2 的局面。

这就是整个线程池的工作原理,也是一个不断轮回的过程,直到线程池关闭,线程池关闭也讲究善后工作,那就是对这些工作的工人和任务的处理了。善后工作这篇文章不做说明,可能会在之后的博文中进行讲解,拒绝策略方面的内容。

对于我上面说的那几种情况,你们可以对照源码多走几遍,就意思自己采用手工过程,验证一下上边的情况。
对于线程池工作的原理,上边就是我看源码总结出来的,下面我们跟着源码再大致看看。

源码解析;

入口方法execute(Runnable)

 public void execute(Runnable command) {if (command == null)//这判断不用多说,你们肯定都晓得,throw new NullPointerException();int c = ctl.get();
//这个ctl是 AtomicInteger这个类的一个对象,这个类中有一个int value的成员,用这个AtomicInteger其实想用value,
//ctl.get()返回的就是value值,同时这个值在我们new ThreadPoolExecutor的时候默认值为-1 << 29
//采用这个值,一是想采用value的前三位表示线程池的状态,后29位表示线程池中工人的个数。if (workerCountOf(c) < corePoolSize) {//workerCountOf(c)就是去除value就是取出value的后29 判断工人数量。//当前的工人的数量还没有到核心工人数量这个阀值的时候//采用addWorker创建一个工人,来执行任务。 //下面会说到这个addworker方法if (addWorker(command, true))//return;c = ctl.get();}//当目前工人的数量大于核心工人数量的时候,直接到这个判断// isRunning(c)通过value判断线程池的状态,是否在运行状态,在运行转状态才有后话//线程池处于运行状态,将新任务直接加入到阻塞队列中      if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//这里会出现一个情况就是,当加入新任务成功之后,此时线程池的状态不再为运行状态,那就意味值线程池要关闭。//那就试图将新添加的这个任务从阻塞队列中移除,如果删除成功,就执行拒绝策略(线程池关闭的善后工作)//如果删除失败,也没关系,反正线程池已经不工作,对于阻塞队列中的任务也会进行善后处理的。if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//当线程池中工人的数量大于核心工人数量,并且,阻塞队列也满的情况下//创建新的工人执行任务,addWorker(command, false)返回true//如果当前总的工人数量大于线程池最大工人的数量,那么此时创建工人就会失败,addWorker(command, false)返回false,则执行拒绝策略。       else if (!addWorker(command, false))reject(command);}

addworker()

private boolean addWorker(Runnable firstTask, boolean core) {retry://这个外层for循环就是用来判断当前是否满足创建工人的条件for (;;) {int c = ctl.get();int rs = runStateOf(c);//rs为当前线程池的状态//如果当前线程池的状态为关闭状态,并且阻塞队列为空的话,就不满足创建一个新工人的条件,//显然只要线程池的状态为Running状态,也就是运行态,当然不会进入到这个判断,也就离创建工人的可能性更进了一步if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//判断当前线程池中的工人的数量int wc = workerCountOf(c);//对于下面这个判断是针对不同的情况,当大于核心工人数量,返回false,表明当前添加的任务不再创建工人执行,而是加入阻塞队列汇中//当大于最大工人的数量的时候,返回false,那时候说明要执行拒绝策略 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;//程序到这里表明可以创建工人的条件满足,将ctl中的value值加一,if (compareAndIncrementWorkerCount(c))break retry;//直接跳出外层的forc = ctl.get();  if (runStateOf(c) != rs)continue retry;}//外层for循环结束//程序到这里表明创新工人的条件成立,可以创建工人了boolean workerStarted = false;                boolean workerAdded = false;Worker w = null;                       try {//将任务交给工人,并且创建一个工人,Worker类继承了Runnable//将任务交个工人,那么会将firstTask赋值为Worker中的 Runnable firstTask成员。//由于worker本身是一个Runnable所以,在创建工人的时候,工人将自己本身赋值给Worker中的Thread thread成员,//worker中的run方法调用了runWorker(this),               w = new Worker(firstTask);final Thread t = w.thread;//w.thread,就是工人本身。在之后你们看到的他 t.start//就说明工人等待机会执行任务。那么t.start 会有一个线程的开启,并且之后会执行t中的run方法,因为上边说了t就是worker对象本身//所以会执行runworker(this)方法,表明工人开始执行任务了,而this就是worker对象//而在runworker(this)方法中就执行了this中的firsttask的run方法,也就是该woker对象所拥有的任务firstTask的run方法,//也就是真正执行了任务firstTask。if (t != null) {final ReentrantLock mainLock = this.mainLock;//这个锁不用说了,因为线程中有一个变量, HashSet<Worker> workers  //如果工人创建成功的话,需要将工人加入到这个workers中,为了多线程安全的问题,采用mainLock.lock(); 保证在添加工人和删除工人的同步。  mainLock.lock();try {            int rs = runStateOf(ctl.get());//获取当前的线程池的状态,if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){if (t.isAlive())throw new IllegalThreadStateException();//将工人添加到workers中               workers.add(w);int s = workers.size();if (s > largestPoolSize)//largestPoolSize默认为零,添加工人的时候,对其值进行更改记录。largestPoolSize = s;workerAdded = true;}} finally {//释放锁的过程,让其他想要添加或者删除工人的线程可以持有锁进行添加或者删除。mainLock.unlock();}//当工人添加好之后//t.start就是创建一个线程,在上面 new woker的时候已经说过了//等待线程运行的时候,会执行woker中的firstTask的run方法。相当于真正执行任务if (workerAdded) {t.start();//不是立马执行任务,先创键线程,等待cpu调度线程执行任务,我也比作是工人执行任务。workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

runWoker()
runwoker在前面创建工人的时候,已经提到过了。当创建完工人之后有一个t.start 的过程,那么这个 t 就是Woker对象,t.start在之后创建的线程会在之后运行会调用 t 中run方法,也就是Woker中的run方法,也就会运行runwoker()并且参数就是Woker对象

private final class Worker extends AbstractQueuedSynchronizer  implements Runnable{public void run() {runWorker(this);}
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();//w.firstTask就是woker中的任务,也就是工人携带的任务,//当一个线程运行runworker的时候,就会执行工人携带的任务,也相当于工人执行了任务。Runnable task = w.firstTask;//先将工人携带的任务设置为null,然后执行任务w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//当前线程执行完一个task,会在循环采用getTask在阻塞队列中拿任务执行,//也相当于一个工人执行完任务,再拿任务执行。while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {//在线程中此方法并没有具体的实现,空方法,不必管beforeExecute(wt, task);Throwable thrown = null;try {//执行task.run方法,也是真正执行任务了task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//线程池并没有实现,空方法afterExecute(task, thrown);}} finally {task = null;//当前线程执行完一个任务,也相当于一个工人执行完任务//工人记录自己完成的任务数量。w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//当上边的while循环结束,程序就会运行到这里//让工人下岗的操作,相当于将工人在workers中删除processWorkerExit(w, completedAbruptly);//while循环结束相当于是,在取任务的时候没取到,返回了null//上边的while循环才会结束//下面我们就来看看getTask方法。   }}

getTask();

 private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();int rs = runStateOf(c);//获取线程池的状态   if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//当线程池为关闭状态的时候 先通过decrementWorkerCount();将工人数量减一//然后返回null,那么runwoker中的while循环就会结束,并且删除工人decrementWorkerCount();return null;}// 获取线程池中工人的数量。  int wc = workerCountOf(c);//allowCoreThreadTimeOut默认是false,表示不允许核心数量的工人等待超时而进行下岗。//所以当工人的数量小于核心工人的数量的时候,timed = false//当工人的数量大于核心工人的数量的时候,timed = trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//在这里就是决定工人是否下岗的条件//当此时工人总数量大于核心工人的数量的时候,并且由于工人在阻塞队列中没有取到任何的任务,//那么此时满足这个if的判断,就会将工人的数量减一,并且返回null//那么在runwoker的while循环就会结束,就会让工人下岗if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}//当timed为true的时候,表明此时的工人数量大于核心工人的数量,//此时对于任何工人来说采用poll,原因是如果当阻塞队列中没有任务的时候,这些取任务的工人还有等待的时机//当等待时间到了,还没有获取到任务的话,将返回null,这样timedOut = true;那么在次循环的时候,就会将获取任务的工人删除,//因为当工人数量大于核心工人数量,并且阻塞队列中没有任务,线程池没有必要维持那么多工人,维持核心数量个数的工人就好。//当timed为false的时候表明此时的工人数量小于核心工人的数量,//采用take的原因是,如果此时由工人去阻塞队列中拿任务,但是阻塞对列中没有任务的时候,会将当前线程阻塞,//也就是是说工人只是暂停工作,当阻塞队列中有任务加入的话,当前线程接着运行从阻塞队列中拿任务执行//因为线程池要维持核心数量的工人数,此时工人的数量小于核心工人数量,所以不会像poll一样返回null,然后剔除这个工人,只是将其暂停。try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;}catch (InterruptedException retry) {timedOut = false;}}}

ThreadPoolExecutor线程池 —————— 开开开山怪相关推荐

  1. ThreadPoolExecutor线程池,shutdown和shutdownNow关闭线程池方式对比,以及确保线程池能够彻底关闭的一种方式

    1. ThreadPoolExecutor线程池 1.1 创建线程池,构造方法的几个参数说明及创建如下. 1.2 shutdown方式关闭线程池 a. 空闲且能interrupt表示该线程处于阻塞等待 ...

  2. 13.ThreadPoolExecutor线程池之submit方法

    jdk1.7.0_79  在上一篇<ThreadPoolExecutor线程池原理及其execute方法>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法 ...

  3. Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】

    基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...

  4. ThreadPoolExecutor线程池核心参数详解

    理解ThreadPoolExecutor线程池的corePoolSize.maximumPoolSize和poolSize 我们知道,受限于硬件.内存和性能,我们不可能无限制的创建任意数量的线程,因为 ...

  5. 手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理

    摘要:从手写线程池开始,逐步的分析这些代码在Java的线程池中是如何实现的. 本文分享自华为云社区<手写线程池,对照学习ThreadPoolExecutor线程池实现原理!>,作者:小傅哥 ...

  6. ThreadPoolExecutor线程池原理

    ThreadPoolExecutor线程池原理 线程池原理 1. 线程池的简单介绍 1.1 线程池是什么 1.2 线程池解决的核心问题是什么 2. 线程池的实现原理 2.1 线程池的执行流程 2.2 ...

  7. ThreadPoolExecutor 线程池和redisson加上手动事务踩的坑

    ThreadPoolExecutor 线程池和redisson加上手动事务踩的坑 一.具体活动 0.线程池 1.redisson锁 依赖 2.redisson锁 config文件 3.redisson ...

  8. ThreadPoolExecutor(线程池)的参数

    构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit u ...

  9. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  10. ThreadPoolExecutor线程池的理解与应用

    一.线程池与进程池的区别 两个关键词:线程,进程/池 池:可以将其理解为一种容器,有其固定的大小 什么时候用线程池/进程池,分两个问题讨论 1.什么时候用池:当程序中的任务并发数远远大于计算机的承受能 ...

最新文章

  1. 将pdf转为html,使用pdfdom将pdf转为html
  2. 无线AP如何区分来宾(流动)用户和正常用户?
  3. kaggle中的Two-Stage比赛规则以及metadata数据的使用规定
  4. 【TypeScript系列教程07】变量声明
  5. 斗鱼Q3财报:移动端季度平均MAU再创新高至6190万,付费用户720万
  6. 如何弄ad装配图_[分享][BCW]上海西康路189弄——“世界最美购物中心”幕墙的诞生...
  7. 中国人终于开始排队了
  8. 疫情下的国内云市场,正是大展拳脚的好时机!
  9. 算法笔记_什么是数据结构_向量vector
  10. 机器学习(一)绪论、算法总结
  11. SQL基础语句汇总-学习
  12. Linux BT下载(8)-种子解析模块设计与实现2
  13. 基于51单片机的智能温控风扇设计
  14. Python爬虫实战之爬取链家广州房价_02把小爬虫变大
  15. 网页音乐播放器 音乐播放器 html+css+js
  16. 【企业】掌握理查德·费曼学习法,提高学习效率
  17. 怎么在html中把3个单元格合并成2个,Excel表格怎么将一个单元格拆分成2个?将多个单元合并成一个的方法...
  18. SAP MM 条件技术透析
  19. 速写中的颈部肌肉怎么表现?详细画法看这里~
  20. IOS不兼容超出部分省略号 且页面显示不起作用 行数限定无作用

热门文章

  1. SCARA——OpenGL入门学习五六(三维变换、动画)
  2. 最优传输论文(二十六):Sliced Wasserstein Discrepancy for Unsupervised Domain Adaptation论文原理
  3. 一张图看晕人民的名义
  4. 【Linux / 数据库】项目实战:tpshop项目在Linux系统环境搭建
  5. 六年工作经验总结分享,希望可以帮到你
  6. 微信表情符号写入案件判决
  7. 0基础运营小白如何写出10W+,六招搞定!
  8. 时间刻度线css,纯CSS时间轴列表
  9. rosetta_ddg 使用-rosetta 2020版
  10. Android电源键亮灭屏流程