点击上方 好好学java ,选择 星标 公众号

重磅资讯、干货,第一时间送达
今日推荐:SQL 语法速成手册文末点击阅读原文,去B站看视频,别忘记关注哦
个人原创100W+访问量博客:点击前往,查看更多

来源:jianshu.com/p/704a6c5d337c


一、概述

线程池,顾名思义就是存放线程的池子,池子里存放了很多可以复用的线程。

如果不用类似线程池的容器,每当我们需要执行用户任务的时候都去创建新的线程,任务执行完之后线程就被回收了,这样频繁地创建和销毁线程会浪费大量的系统资源。

因此,线程池通过线程复用机制,并对线程进行统一管理,具有以下优点:

  • 降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;

  • 提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。

ThreadPoolExecutor是线程池框架的一个核心类,本文通过对ThreadPoolExecutor源码的分析(基于JDK 1.8),来深入分析线程池的实现原理。

二、ThreadPoolExecutor类的属性

先从ThreadPoolExecutor类中的字段开始:

附上我历时三个月总结的 Java 面试 + Java 后端技术学习指南,笔者这几年及春招的总结,github 1.4k star,拿去不谢!

下载方式

1. 首先扫描下方二维码

2. 后台回复「Java面试」即可获取

// 线程池的控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//值为29,用来表示偏移量private static final int COUNT_BITS = Integer.SIZE - 3;//线程池的最大容量,其值的二进制为:00011111111111111111111111111111(29个1)private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 线程池的运行状态,总共有5个状态,用高3位来表示private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;//任务缓存队列,用来存放等待执行的任务private final BlockingQueue<Runnable> workQueue;//全局锁,对线程池状态等属性修改时需要使用这个锁private final ReentrantLock mainLock = new ReentrantLock();//线程池中工作线程的集合,访问和修改需要持有全局锁private final HashSet<Worker> workers = new HashSet<Worker>();// 终止条件private final Condition termination = mainLock.newCondition();//线程池中曾经出现过的最大线程数private int largestPoolSize;//已完成任务的数量private long completedTaskCount;//线程工厂private volatile ThreadFactory threadFactory;//任务拒绝策略private volatile RejectedExecutionHandler handler;//线程存活时间private volatile long keepAliveTime;//是否允许核心线程超时private volatile boolean allowCoreThreadTimeOut;//核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0private volatile int corePoolSize;//最大池大小,不得超过CAPACITYprivate volatile int maximumPoolSize;//默认的任务拒绝策略private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");private final AccessControlContext acc;

在ThreadPoolExecutor类的这些属性中,线程池状态是控制线程池生命周期至关重要的属性,这里就以线程池状态为出发点进行研究。

通过上面的源码可知,线程池的运行状态总共有5种,其值和含义分别如下:

  • RUNNING:  高3位为111,接受新任务并处理阻塞队列中的任务

  • SHUTDOWN: 高3位为000,不接受新任务但会处理阻塞队列中的任务

  • STOP:高3位为001,不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务

  • TIDYING:  高3位为010,所有任务都已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法

  • TERMINATED: 高3位为011,terminated()方法已经执行结束

然而,线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数:

// 通过与的方式,获取ctl的高3位,也就是线程池的运行状态private static int runStateOf(int c)     { return c & ~CAPACITY; }//通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量private static int workerCountOf(int c)  { return c & CAPACITY; }//通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctlprivate static int ctlOf(int rs, int wc) { return rs | wc; }//SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态private static boolean isRunning(int c) {return c < SHUTDOWN;}

接下来,我们看一下线程池状态的所有转换情况,如下:

  • RUNNING -> SHUTDOWN:调用shutdown(),可能在finalize()中隐式调用

  • (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()

  • SHUTDOWN -> TIDYING:当缓存队列和线程池都为空时

  • STOP -> TIDYING:当线程池为空时

  • TIDYING -> TERMINATED:当terminated()方法执行结束时

通常情况下,线程池有如下两种状态转换流程:

  1. RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED

  2. RUNNING -> STOP -> TIDYING -> TERMINATED

三、ThreadPoolExecutor类的构造方法

通常情况下,我们使用线程池的方式就是new一个ThreadPoolExecutor对象来生成一个线程池。接下来,先看ThreadPoolExecutor类的构造函数:

//间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy和默认的线程工厂public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);//间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicypublic ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);//间接调用最后一个构造函数,采用默认的默认的线程工厂public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);//前面三个分别调用了最后一个,主要的构造函数public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

接下来,看下最后一个构造函数的具体实现:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {//参数合法性校验if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();//参数合法性校验if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();//初始化对应的属性this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

下面解释下一下构造器中各个参数的含义:

1. corePoolSize

线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。

2. maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。

3. keepAliveTime

线程空闲时的存活时间。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

4. unit

keepAliveTime参数的时间单位。

5. workQueue

任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。

一般来说,这里的BlockingQueue有以下三种选择:

  • SynchronousQueue:

    一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。

    因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。

  • LinkedBlockingQueue:

    基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。

    因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。

6. threadFactory

线程工厂,创建新线程时使用的线程工厂。

7. handler

任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:

  • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认策略;

  • CallerRunsPolicy:由调用execute方法的线程执行该任务;

  • DiscardPolicy:丢弃任务,但是不抛出异常;

  • DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。

    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

四、线程池的实现原理

1. 提交任务

线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。

submit()方法的实现有以下三种:

public Future<?> submit(Runnable task);public <T> Future<T> submit(Runnable task, T result);public <T> Future<T> submit(Callable<T> task);

下面以第一个方法为例简单看一下submit()方法的实现:

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

submit()方法是在ThreadPoolExecutor的父类AbstractExecutorService类实现的,最终还是调用的ThreadPoolExecutor类的execute()方法,下面着重看一下execute()方法的实现。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();//获取线程池控制状态int c = ctl.get();// (1)//worker数量小于corePoolSizeif (workerCountOf(c) < corePoolSize) {//创建worker,addWorker方法boolean参数用来判断是否创建核心线程if (addWorker(command, true))//成功则返回return;//失败则再次获取线程池控制状态c = ctl.get();}//(2)//线程池处于RUNNING状态,将任务加入workQueue任务缓存队列if (isRunning(c) && workQueue.offer(command)) {// 再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了int recheck = ctl.get();//线程池不处于RUNNING状态,且将任务从workQueue移除成功if (! isRunning(recheck) && remove(command))//采取任务拒绝策略reject(command);//worker数量等于0else if (workerCountOf(recheck) == 0)//创建workeraddWorker(null, false);}//(3)else if (!addWorker(command, false))  //创建workerreject(command);  //如果创建worker失败,采取任务拒绝策略}

execute()方法的执行流程可以总结如下:

  • 若线程池工作线程数量小于corePoolSize,则创建新线程来执行任务

  • 若工作线程数量大于或等于corePoolSize,则将任务加入BlockingQueue

  • 若无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量小于maximumPoolSize,则创建新的线程来执行任务

  • 若工作线程数量达到maximumPoolSize,则创建线程失败,采取任务拒绝策略

可以结合下面的两张图来理解线程池提交任务的执行流程。

2. 创建线程

从execute()方法的实现可以看出,addWorker()方法主要负责创建新的线程并执行任务,代码实现如下:

//addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,表示是否创建核心线程
//该方法的返回值代表是否成功新增一个线程private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// (1)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//线程数超标,不能再创建线程,直接返回if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS操作递增workCount//如果成功,那么创建线程前的所有条件校验都满足了,准备创建线程执行任务,退出retry循环//如果失败,说明有其他线程也在尝试往线程池中创建线程(往线程池提交任务可以是并发的),则继续往下执行if (compareAndIncrementWorkerCount(c))break retry;//重新获取线程池控制状态c = ctl.get();// 如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环if (runStateOf(c) != rs)continue retry;//如果只是CAS操作失败的话,进入内层的for循环就可以了}}//到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务//worker是否已经启动boolean workerStarted = false;//是否已将这个worker添加到workers这个HashSet中boolean workerAdded = false;Worker w = null;try {//创建一个worker,从这里可以看出对线程的包装w = new Worker(firstTask);//取出worker中的线程对象,Worker的构造方法会调用ThreadFactory来创建一个新的线程final Thread t = w.thread;if (t != null) {//获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//重新获取线程池的运行状态int rs = runStateOf(ctl.get());//小于SHUTTDOWN即RUNNING//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//worker里面的thread不能是已启动的if (t.isAlive())throw new IllegalThreadStateException();//将新创建的线程加入到线程池中workers.add(w);int s = workers.size();// 更新largestPoolSizeif (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//线程添加线程池成功,则启动新创建的线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//若线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCountif (! workerStarted)addWorkerFailed(w);}//返回线程是否启动成功return workerStarted;}

因为代码(1)处的逻辑不利于理解,我们通过(1)的等价实现来理解:

if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
//等价实现
rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())

其含义为,满足下列条件之一则直接返回false,线程创建失败:

  • rs > SHUTDOWN,也就是STOP,TIDYING或TERMINATED,此时不再接受新的任务,且中断正在执行的任务

  • rs = SHUTDOWN且firstTask != null,此时不再接受任务,但是仍会处理任务缓存队列中的任务

  • rs = SHUTDOWN,队列为空

多说一句,若线程池处于 SHUTDOWN, firstTask 为 null,且 workQueue 非空,那么还得创建线程继续处理任务缓存队列中的任务。

总结一下,addWorker()方法完成了如下几件任务:

1. 原子性的增加workerCount

2. 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中

3. 启动worker对应的线程

4. 若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount

3. 工作线程的实现

从addWorker()方法的实现可以看出,工作线程的创建和启动都跟ThreadPoolExecutor中的内部类Worker有关。下面我们分析Worker类来看一下工作线程的实现。

Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable

Worker的主要字段就下面三个,代码也比较简单。

//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来final Thread thread;//worker所对应的第一个任务,可能为空Runnable firstTask;//记录当前线程完成的任务数volatile long completedTasks;

Worker的构造函数如下。

Worker(Runnable firstTask) {//设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断setState(-1);//初始化第一个任务this.firstTask = firstTask;//利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this//也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法this.thread = getThreadFactory().newThread(this);}

Worker类继承了AQS类,重写了其相应的方法,实现了一个自定义的同步器,实现了不可重入锁。

//是否持有独占锁protected boolean isHeldExclusively() {return getState() != 0;}//尝试获取锁protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {//设置独占线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁protected boolean tryRelease(int unused) {//设置独占线程为nullsetExclusiveOwnerThread(null);setState(0);return true;}//获取锁public void lock()        { acquire(1); }//尝试获取锁public boolean tryLock()  { return tryAcquire(1); }//释放锁public void unlock()      { release(1); }//是否持有锁public boolean isLocked() { return isHeldExclusively(); }

Worker类还提供了一个中断线程thread的方法。

void interruptIfStarted() {Thread t;//AQS状态大于等于0,worker对应的线程不为null,且该线程没有被中断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}

再来看一下Worker类的run()方法的实现,会发现run()方法最终调用了ThreadPoolExecutor类的runWorker()方法。

public void run() {runWorker(this);}

4. 线程复用机制

通过上文可以知道,worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。

final void runWorker(Worker w) {//获取当前线程Thread wt = Thread.currentThread();//获取w的firstTaskRunnable task = w.firstTask;//设置w的firstTask为nullw.firstTask = null;// 释放锁,设置AQS的state为0,允许中断w.unlock();//用于标识线程是否异常终止,finally中processWorkerExit()方法会有不同逻辑boolean completedAbruptly = true;try {//循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行while (task != null || (task = getTask()) != null) {//进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中断w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||  //若线程池状态大于等于STOP,那么意味着该线程要中断(Thread.interrupted() &&      //线程被中断runStateAtLeast(ctl.get(), STOP))) &&  //且是因为线程池内部状态变化而被中断!wt.isInterrupted())           //确保该线程未被中断//发出中断请求wt.interrupt();try {//开始执行任务前的Hook方法beforeExecute(wt, task);Throwable thrown = null;try {//到这里正式开始执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//执行任务后的Hook方法afterExecute(task, thrown);}} finally {//置空task,准备通过getTask()获取下一个任务task = null;//completedTasks递增w.completedTasks++;//释放掉worker持有的独占锁w.unlock();}}completedAbruptly = false;} finally {//到这里,线程执行结束,需要执行结束线程的一些清理工作//线程执行结束可能有两种情况://1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束//2.任务执行过程中发生了异常//第一种情况,getTask()返回null,那么getTask()中会将workerCount递减//第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理processWorkerExit(w, completedAbruptly);}}

runWorker()方法是线程池的核心,实现了线程池中的线程复用机制,来看一下

runWorker()方法都做了哪些工作:

1. 运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行;

2. 获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁;

3. 在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook方法;

4. 执行通过getTask()方法获取到的任务

5. 线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作

从runWorker()方法的实现可以看出,runWorker()方法中主要调用了getTask()方法和processWorkerExit()方法,下面分别看一下这两个方法的实现。

getTask()的实现

getTask()方法用来不断地从任务缓存队列获取任务并交给线程执行,下面分析一下其实现。

private Runnable getTask() {//标识当前线程是否超时未能获取到task对象boolean timedOut = false;for (;;) {//获取线程池的控制状态int c = ctl.get();//获取线程池的运行状态int rs = runStateOf(c);//如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//获取worker数量int wc = workerCountOf(c);//标识当前线程在空闲时,是否应该超时回收// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize)if ((wc > maximumPoolSize || (timed && timedOut))  //或者获取任务超时&& (wc > 1 || workQueue.isEmpty())) {  //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个工作线程)if (compareAndDecrementWorkerCount(c))//线程池工作线程数量递减,方法返回null,回收线程return null;//线程池工作线程数量递减失败,跳过剩余部分,继续循环continue;}try {//如果允许超时回收,则调用阻塞队列的poll(),只在keepAliveTime时间内等待获取任务,一旦超过则返回null//否则调用take(),如果队列为空,线程进入阻塞状态,无限时等待任务,直到队列中有可取任务或者响应中断信号退出Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//若task不为null,则返回成功获取的task对象if (r != null)return r;// 若返回task为null,表示线程空闲时间超时,则设置timeOut为truetimedOut = true;} catch (InterruptedException retry) {//如果此worker发生了中断,采取的方案是重试,没有超时//在哪些情况下会发生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()timedOut = false;}}}

接下来总结一下getTask()方法会在哪些情况下返回:

1. 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象

2. 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象

3. 线程池状态大于等于STOP,返回null,回收线程

4. 线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程

5. worker数量大于maximumPoolSize,返回null,回收线程

6. 线程空闲时间超时,返回null,回收线程

processWorkerExit()的实现

processWorkerExit()方法负责执行结束线程的一些清理工作,下面分析一下其实现。

private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果用户任务执行过程中发生了异常,则需要递减workerCountif (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {//将worker完成任务的数量累加到总的完成任务数中completedTaskCount += w.completedTasks;//从workers集合中移除该workerworkers.remove(w);} finally {//释放锁mainLock.unlock();}//尝试终止线程池tryTerminate();//获取线程池控制状态int c = ctl.get();if (runStateLessThan(c, STOP)) {  //线程池运行状态小于STOPif (!completedAbruptly) {  //如果用户任务执行过程中发生了异常,则直接调用addWorker()方法创建线程//是否允许核心线程超时int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//允许核心超时并且workQueue阻塞队列不为空,那线程池中至少有一个工作线程if (min == 0 && ! workQueue.isEmpty())min = 1;//如果工作线程数量workerCount大于等于核心池大小corePoolSize,//或者允许核心超时并且workQueue阻塞队列不为空时,线程池中至少有一个工作线程,直接返回if (workerCountOf(c) >= min)return;//若不满足上述条件,则调用addWorker()方法创建线程}//创建新的线程取代当前线程addWorker(null, false);}}

processWorkerExit()方法中主要调用了tryTerminate()方法,下面看一下tryTerminate()方法的实现。

final void tryTerminate() {for (;;) {//获取线程池控制状态int c = ctl.get();if (isRunning(c) ||    //线程池的运行状态为RUNNINGrunStateAtLeast(c, TIDYING) ||    //线程池的运行状态大于等于TIDYING(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  //线程池的运行状态为SHUTDOWN且阻塞队列不为空//不能终止,直接返回return;//只有当线程池的运行状态为STOP,或线程池运行状态为SHUTDOWN且阻塞队列为空时,可以执行到这里//如果线程池工作线程的数量不为0if (workerCountOf(c) != 0) {//仅仅中断一个空闲的workerinterruptIdleWorkers(ONLY_ONE);return;}//只有当线程池工作线程的数量为0时可以执行到这里final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {  //CAS操作设置线程池运行状态为TIDYING,工作线程数量为0try {//执行terminated()钩子方法terminated();} finally {//设置线程池运行状态为TERMINATED,工作线程数量为0ctl.set(ctlOf(TERMINATED, 0));//唤醒在termination条件上等待的所有线程termination.signalAll();}return;}} finally {//释放锁mainLock.unlock();}//若CAS操作失败则重试}}

tryTerminate()方法的作用是尝试终止线程池,它会在所有可能终止线程池的地方被调用,满足终止线程池的条件有两个:首先,线程池状态为STOP,或者为SHUTDOWN且任务缓存队列为空;其次,工作线程数量为0。

满足了上述两个条件之后,tryTerminate()方法获取全局锁,设置线程池运行状态为TIDYING,之后执行terminated()钩子方法,最后设置线程池状态为TERMINATED。

至此,线程池运行状态变为TERMINATED,工作线程数量为0,workers已清空,且workQueue也已清空,所有线程都执行结束,线程池的生命周期到此结束。

5. 关闭线程池

关闭线程池有两个方法,shutdown()和shutdownNow(),下面分别看一下这两个方法的实现。

shutdown()的实现

shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。

public void shutdown() {final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {//检查shutdown权限checkShutdownAccess();//设置线程池运行状态为SHUTDOWNadvanceRunState(SHUTDOWN);//中断所有空闲workerinterruptIdleWorkers();//用onShutdown()钩子方法onShutdown();} finally {//释放锁mainLock.unlock();}//尝试终止线程池tryTerminate();}

shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。

shutdown()方法调用了interruptIdleWorkers()方法中断所有空闲的worker,其实现如下。

private void interruptIdleWorkers() {interruptIdleWorkers(false);}//onlyOne标识是否只中断一个线程private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {//遍历workers集合for (Worker w : workers) {//worker对应的线程Thread t = w.thread;//线程未被中断且成功获得锁if (!t.isInterrupted() && w.tryLock()) {try {//发出中断请求t.interrupt();} catch (SecurityException ignore) {} finally {//释放锁w.unlock();}}//若只中断一个线程,则跳出循环if (onlyOne)break;}} finally {//释放锁mainLock.unlock();}}

shutdownNow()的实现

shutdownNow()方法将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务。

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {//检查shutdown权限checkShutdownAccess();//设置线程池运行状态为STOPadvanceRunState(STOP);//中断所有workerinterruptWorkers();//将任务缓存队列中等待执行的任务取出并放到list中tasks = drainQueue();} finally {//释放锁mainLock.unlock();}//尝试终止线程池tryTerminate();//返回任务缓存队列中等待执行的任务列表return tasks;}

shutdownNow()方法与shutdown()方法相似,不同之处在于,前者设置线程池的运行状态为STOP,之后中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表。

shutdownNow()方法调用了interruptWorkers()方法中断所有的worker(并非只是空闲的worker),其实现如下。

private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {//遍历workers集合for (Worker w : workers)//调用Worker类的interruptIfStarted()方法中断线程w.interruptIfStarted();} finally {//释放锁mainLock.unlock();}}

五、总结

至此,我们已经阅读了线程池框架的核心类ThreadPoolExecutor类的大部分源码,由衷地赞叹这个类很多地方设计的巧妙之处:

  • 将线程池的运行状态和工作线程数量打包在一起,并使用了大量的位运算

  • 使用CAS操作更新线程控制状态ctl,确保对ctl的更新是原子操作

  • 内部类Worker类继承了AQS,实现了一个自定义的同步器,实现了不可重入锁

  • 使用while循环自旋地从任务缓存队列中获取任务并执行,实现了线程复用机制

  • 调用interrupt()方法中断线程,但注意该方法并不能直接中断线程的运行,只是发出了中断信号,配合BlockingQueue的take(),poll()方法的使用,打断线程的阻塞状态

其实,线程池的本质就是生产者消费者模式,线程池的调用者不断向线程池提交任务,线程池里面的工作线程不断获取这些任务并执行(从任务缓存队列获取任务或者直接执行任务)。

读完本文,相信大家对线程池的实现原理有了深刻的认识,比如向线程池提交一个任务之后线程池的执行流程,一个任务从被提交到被执行会经历哪些过程,一个工作线程从被创建到正常执行到执行结束的执行过程,等等。

最后,再附上我历时三个月总结的 Java 面试 + Java 后端技术学习指南,笔者这几年及春招的总结,github 1.4k star,拿去不谢!

下载方式

1. 首先扫描下方二维码

2. 后台回复「Java面试」即可获取

深入分析线程池的实现原理相关推荐

  1. java set和get原理_Java线程池的实现原理和使用

    为什么用线程池 在我们进行开发的时候,为了充分利用系统资源,我们通常会进行多线程开发,实现起来非常简单,需要使用线程的时候就去创建一个线程(继承Thread类.实现Runnable接口.使用Calla ...

  2. future java 原理_Java线程池FutureTask实现原理详解

    前言 线程池可以并发执行多个任务,有些时候,我们可能想要跟踪任务的执行结果,甚至在一定时间内,如果任务没有执行完成,我们可能还想要取消任务的执行,为了支持这一特性,ThreadPoolExecutor ...

  3. 深入源码分析Java线程池的实现原理

    转载自   深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...

  4. Java多线程系列(五):线程池的实现原理、优点与风险、以及四种线程池实现

    为什么需要线程池 我们有两种常见的创建线程的方法,一种是继承Thread类,一种是实现Runnable的接口,Thread类其实也是实现了Runnable接口.但是我们创建这两种线程在运行结束后都会被 ...

  5. java并发编程——线程池的工作原理与源码解读

    2019独角兽企业重金招聘Python工程师标准>>> 线程池的简单介绍 基于多核CPU的发展,使得多线程开发日趋流行.然而线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以 ...

  6. 深入源码,深度解析Java 线程池的实现原理

    java 系统的运行归根到底是程序的运行,程序的运行归根到底是代码的执行,代码的执行归根到底是虚拟机的执行,虚拟机的执行其实就是操作系统的线程在执行,并且会占用一定的系统资源,如CPU.内存.磁盘.网 ...

  7. 基于java洗浴中心管理系统_Java小白也能听懂的线程池的内部原理:老王的洗浴中心...

    餐厅的约会 餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:"经常听你说线程池,到底线程池到底是个什么原理?"我楞了一下,心里想女朋友今天是怎么了,怎么突 ...

  8. Java线程池的实现原理,你清楚么?

    点击关注公众号,实用技术文章及时了解 来源:blog.csdn.net/u013332124/article/details/79587436 原理概述 其实java线程池的实现原理很简单,说白了就是 ...

  9. Java并发编程-线程池底层工作原理

    线程池底层工作原理 1.线程池的底层工作流程 1.1.线程池的底层工作原理图 1.2.银行办理业务案例 1.3.线程池的底层工作流程总结 2.线程池用哪个?生产中如何设置合理参数 2.1.在工作中单一 ...

最新文章

  1. android id 重名_Android App 自定义权限重名不能安装解决办法
  2. 过拟合、欠拟合与正则化
  3. Struts2配置时报struts-default.xml:65:72异常的解决方法
  4. 解除计算机软件开发协议书,计算机软件著作权转让的协议书
  5. ubantu mysql允许外部链接_ubuntu 下mysql 设置允许远程连接
  6. 检测对抗样本_避免使用对抗性T恤进行检测
  7. ASP.NET Core 5.0 Web API 自动集成Swashbuckle
  8. loadrunner11完整卸载
  9. mac os cmake安装
  10. 第 14 章 享元模式
  11. 编译OpenCV:cv2.cpp:23:33: fatal error: numpy/ndarrayobject.h: 没有那个文件或目录
  12. 客户端提示“使用代理软件”并断网怎么解决?
  13. 如何去除win 10右键菜单的“使用skype共享”
  14. ca 手机抓包_抓包安卓7以上ca证书安装方法
  15. Hibernate的Disjunction和Conjunction
  16. [VUE3]vue2.x中slot-scope插槽在vue3.x中的写法(以elementPlus和AntDesign为例)
  17. BUUCTF [安洵杯 2019]easy_serialize_php
  18. 模型预测控制系列讲解(二):模型预测控制算法发展进程
  19. Java中求集合交集、并集、差集
  20. ue编辑C语言灰色,UltraEdit如何实行列模式编辑

热门文章

  1. 使用 Android 实现联网
  2. 用VS2010调试微软开放的部分源码
  3. VB.NET(2005)中关于dll调用的错误信息(转)
  4. 求助啊,被STM32的CAN折磨的疯了
  5. nrf51822笔记之密码配对过程梳理
  6. Qt之debug和写log文件
  7. class_create
  8. C++ Primer 5th笔记(chap 17 标准库特殊设施)未格式化的输入/输出操作
  9. Hyperledger Fabric 链码(2) 接口
  10. TLSNotary中心化预言机(3) 下一代技术----PADVA