我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了:

这两个方法又什么区别呢?

他们背后的原理是什么呢?

线程池中线程超过了coresize后会怎么操作呢?

为了解决这些疑问我们需要分析java线程池的原理。

1 基本使用

1.1 继承关系

平常我们在创建线程池经常使用的方式如下:

ExecutorService executorService = Executors.newFixedThreadPool(5);

看下newFixedThreadPool源码, 其实Executors是个工厂类,内部是new了一个ThreadPoolExecuto:

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

参数的意义就不介绍了,网上有很多内容,看源码注释也可以明白。

线程池中类的继承关系如下:

2 源码分析

2.1 入口

将一个Runnable放到线程池执行有两种方式,一个是调用ThreadPoolExecutor#submit,一个是调用ThreadPoolExecutor#execute。其实submit是将Runnable封装成了一个RunnableFuture,然后再调用execute,最终调用的还是execute,所以我们这里就只从ThreadPoolExecutor#execute开始分析。

2.2 ctl和线程池状态

ThreadPoolExecutor中有个重要的属性是ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示状态,低29位表示线程池中线程的多少

private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29

private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29为减1,即最终得到为高3位为0,低29位为1的数字,作为掩码,是二进制运算中常用的方法

private static final int RUNNING = -1 << COUNT_BITS; // 高三位111

private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000

private static final int STOP = 1 << COUNT_BITS; // 高三位001

private static final int TIDYING = 2 << COUNT_BITS; // 高三位010

private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011

// Packing and unpacking ctl

private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即计算线程池状态

private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即计算线程数量

private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl

ThreadPoolExecutor中使用32位Integer来表示线程池的状态和线程的数量,其中高3位表示状态,低29位表示数量。如果对二进制运行不熟悉可以参考:二进制运算。从上也可以看出线程池有五种状态,我们关心前3中状态

RUNNING 接收task和处理queue中的task

SHUTDOWN 不再接收新的task,但是会处理完正在运行的task和queue中的task,不会interrupt正在执行的task,其实调用shutdown后线程池处于该状态

STOP 不再接收新的task,也不处理queue中的task,同时正在运行的线程会被interrupt。调用shutdownNow后线程池会处于该状态。

2.3 execute

明白了ctl和线程池的状态后我们来具体看下execute的处理逻辑

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // 线程数量小于coresize,那么就调用addWorker

if (addWorker(command, true)) // 这里知道,返回true就不往下走了

return;

c = ctl.get();

}

// 不满足上述条件,即线程数量 >= coreSize,或者addWorker返回fasle,那么走下面的逻辑

if (isRunning(c) && workQueue.offer(command)) { // 可以看到是往blockingqueue中放task

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 如果不满足上述条件,即blockingqueue也放不进去,那么就走下面的逻辑

else if (!addWorker(command, false))

reject(command);

}

从上面的代码我们可以看到线程池处理线程的基本思路是: 如果线程数量小于coresize那么就执行task,否则就放到queue中,如果queue也放不下就走下面addWorker,如果也失败了,那么就调用reject策略。当然还涉及一些细节,需要进一步分析。

2.4 addWorker

execute中反复调用的是addWorker

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c); // 计算线程池状态

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && // 先忽略

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c); // 线程数量

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize)) // 可见如果超过了运行的最大线程数量则返回false

return false;

if (compareAndIncrementWorkerCount(c)) // 如果成功,线程数量肯定加1

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

final ReentrantLock mainLock = this.mainLock;

w = new Worker(firstTask); // 将task封装成了Worker

final Thread t = w.thread; // 来获取worker的thread

if (t != null) {

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w); // 将worker添加到hashset中报存,关闭的时候要使用

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) { // 经过一些检查, 启动了work的thread

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w); // 如果线程启动失败,则将线程数减1

}

return workerStarted;

}

上面的代码看起来比较复杂,但是如果我们忽略具体的细节,从大致思路上看,其实也比较简单。上面代码的主要思路就是:除了一些状态检查外,首先将线程数量加1,然后将runnable分装成一个worker,去启动worker线程,如果启动失败则再将线程数量减1。返回false的原因可能是线程数量大于允许的数量。所以addWorker调用成功,则会启动一个work线程,且线程池中线程数量加1

2.5 worker

woker是线程池中真正的线程实体。线程池中的线程不是自定义的Runnable实现的线程,而是woker线程,worker在run方法里调用了自定义的Runnable的run方法。

Worker继承了AQS,并实现了runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this); // 这个时候回头看看addWorker中t.start(), 就明白了启动的实际是一个Woker线程,而不是用户定义的Runnable

}

public void run() {

runWorker(this);

}

}

Worker中firstTask存储了用户定义的Runnable,thread是以他自身为参数的Thread对象。getThreadFactory()默认返回是Executors#DefaultThreadFactory,用来新建线程,并定义了线程名称的前缀等:

static class DefaultThreadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-"; //

}

public Thread newThread(Runnable r) { // 调用后新建一个线程

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

2.6 runWoker

Worker的run方法调用了runWorker,并将自身作为参数传了进去,下面看看问题的关键:runWorker:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) { // 注意这里的while循环,这里很关键。这里注意,如果两个条件都满足了,那么线程就结束了

w.lock(); // 注意worker继承了AQS,相当于自己实现了锁,这个在关闭线程的时候有用

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run(); // 仅仅是回调了Runnable的run方法

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null; // 重点,task执行完后就被置位null

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly); // 注意while循环结束后worker线程就结束了

}

}

runWorker中有个while循环,while中判断条件为(task != null || (task = getTask()) != null)。假设我们按照正常的逻辑,即task != null,则会调用task.run方法,执行完run方法后然后在finally中task被置为null;接着又进入while循环判断,这次task == null,所以不符合第一个判断条件,则会继续判断 task == getTask()) != null。我们来看下getTask做了什么。

2.7 getTask

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 当调用shutdown()方法的时候,线程状态就为shutdown了; 当调用shutdownow()的时候,线程状态就为stop了

decrementWorkerCount();

return null;

}

boolean timed; // Are workers subject to culling?

for (;;) { // 通过死循环设置状态

int wc = workerCountOf(c);

// 设置允许core线程timeout或者线程数量大于coresize,则允许线程超时

timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果线程数量 <= 最大线程数 且 没有超时和允许超时 则跳出死循环

if (wc <= maximumPoolSize && ! (timedOut && timed))

break;

if (compareAndDecrementWorkerCount(c))

return null;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

try {

// 这里是关键,如果允许超时则调用poll从queue中取出task,否则就调用take可阻塞的获取task

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null) // 获取到task则返回,然后runWorker的while循环就继续执行,并调用task的run方法

return r;

timedOut = true; // 否则设置为timeOut,继续循环,但是下次循环会走到if (compareAndDecrementWorkerCount(c)) 处,并返回null。

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

忽略掉具体细节,getTask的整体思路是: 从blockqueue中拿去task,如果queue中没有task则分两种情况:

如果允许超时则调用poll(keepAliveTime, TimeUnit.NANOSECONDS),在规定时间没有返回了则getTask返回null,runWorker结束while循环,work线程结束。当线程数量大于coresize且blockqueue满的时候且小于maxsize的时候,新创建的线程便是走这个逻辑;或者允许core线程超时的时候也是走这个逻辑

如果不允许超时,则会一直阻塞直到blockqueue中有了新的task。take方法阻塞则表示worker线程也阻塞,也就是在没有task执行的情况下,worker线程便会阻塞等待。core线程走的就是这个逻辑。

这个时候回头再看下runWorker,如果task != null,那么就会执行task的run方法,执行完后task就会为被置为null,再次进入while循环执行getTask阻塞在这里了。通过这种方式保留住了线程。如果while循环结束了,那么worker线程也就结束了。

2.8 再看addWorker

分析到这里我们再来看下addWoker。addWorker可以将第一个参数设置为null。例如ThreadPoolExecutor#prestartAllCoreThreads:

public int prestartAllCoreThreads() {

int n = 0;

while (addWorker(null, true)) // addWorker第一个参数是null

++n;

return n;

}

经过前面的分析,我们知道addWoker用来启动一个worker线程,worker线程调用runWorker来执行,而runWorker中有个while循环,判断条件是(task != null || (task = getTask()) != null)。因为我们传入的task为null,所以就会判断task = getTask()) != null,而getTask就是去blockqueue中拿去数据,如果没有任务就会阻塞住。这个时候就是一个阻塞的线程在等待task的到来了。所以传入参数为null表示创建一个空的线程,什么都不执行。

2.9 再看execute

已经知道了线程池内部的大概工作情况,我们再来看下如果所有core线程都创建好了且处于空置状态,这个时候新放入一个线程的执行流程。

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // core线程都创建好了,所以判断条件不满足

if (addWorker(command, true))

return;

c = ctl.get();

}

// 会走到这里,会通过offer往blockingqueue里放置一个task。这个时候阻塞的core线程会通过blockingqueue的take拿到task执行,类似一个生产者消费者的情况

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 如果blockingqueue添加失败,则创建线程直到maxsize

else if (!addWorker(command, false))

reject(command);

}

可见,线程和execute通过blockingqueue来通信,而不是其他方式,execute往blockingqueue中放置task,线程通过take来获取。整体线程池的逻辑如下图

2.10 shutdown

这个时候我们终于可以来看看shutdown和shutdownNow了

看下shutdown

public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(SHUTDOWN); // 重点,将线程状态置为shutdown,这样getTask等workqueue为空后就返回null了

interruptIdleWorkers(); // 重点

onShutdown(); // 什么都没做

} finally {

mainLock.unlock();

}

tryTerminate();

}

private void interruptIdleWorkers() {

interruptIdleWorkers(false);

}

private void interruptIdleWorkers(boolean onlyOne) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers) {

Thread t = w.thread;

// 线程没有中断 且 获取到worker的锁

if (!t.isInterrupted() && w.tryLock()) {

try {

t.interrupt(); // 调用interrup,中断线程

} catch (SecurityException ignore) {

} finally {

w.unlock();

}

}

if (onlyOne)

break;

}

} finally {

mainLock.unlock();

}

}

shutdown的核心方法在interruptIdleWorkers里,这里可以看到在t.interrupt的时候有个判断添加,一个是线程没有设置中断标记,第二个是获取到worker的锁,我们注意下第二个条件。回头看下runWorker,while中执行task的run方法的时候,会先获取到worker线程的锁,所以如果线程正在执行task的run方法,则shutdown的时候会获取锁失败,也就不会中断线程了。这里可以得出结论:shutdown不会中断正在执行的线程。

如果blockingqueu中有task还没执行完呢? 这个时候while中的take并不会阻塞,也不会被中断,shutdown中也没有清空blockingqueue的操作。所以可以得出结论:shutdown会等blockingqueue中的task执行完成再关闭。可以说shutdown是一种比较温柔的关闭方式了。

如果core线程都阻塞在take方法上了,即没有正在执行的task了,那么这个时候 t.interrupt则会中断take方法,worker线程的while循环结束,worker线程结束。当所有的worker线程都结束后线程池就关闭了

总结下就是: shutdown会把它被调用前放到线程池中的task全部执行完。

2.11 shutdownNow

再来看下shutdownNow

public List shutdownNow() {

List tasks;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(STOP); // 重点,将线程状态置为stop

interruptWorkers(); // 重点

tasks = drainQueue(); // 重点

} finally {

mainLock.unlock();

}

tryTerminate();

return tasks;

}

private void interruptWorkers() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers)

w.interruptIfStarted();

} finally {

mainLock.unlock();

}

}

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 没有去获取woker的锁

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

private List drainQueue() {

BlockingQueue q = workQueue;

List taskList = new ArrayList();

q.drainTo(taskList); // 将blockingqueue中的task清空

if (!q.isEmpty()) {

for (Runnable r : q.toArray(new Runnable[0])) {

if (q.remove(r))

taskList.add(r);

}

}

return taskList;

}

从上面的代码可以看出:

shutdownNow不会去获取worker的锁,所以shutdownNow会导致正在运行的task也被中断

shutdownNow会将blockingqueue中的task清空,所以在blockingqueue中的task也不会被执行

总结就是shutdownNow比较粗暴,调用他后,他会将所有之前提交的任务都interrupt,且将blockingqueue中的task清空

另外就是不论是shutdown还是shutdownNow都是调用Thread的interrupt()方法。如果task不响应中断或者忽略中断标记,那么这个线程就不会被终止。例如在run中执行以下逻辑

poolExecutor.execute(new Runnable() {

@Override

public void run() {

while (true) {

System.out.println("b");

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

System.out.printf("不处理"); // 忽略中断

}

}

}

});

运行结果是,即使调用了shutdownNow也终止不了线程运行

b

0

不处理b

b

b

b

b

....

3 总结

线程通过while循环不停的从blockingqueue中获取task来保留线程,避免重复重建线程

4 参考

java 线程池 源码_java线程池源码分析相关推荐

  1. java低层源码_Java线程池及其底层源码实现分析

    */ Callable接口 && Runnable接口 callable调用call方法 runnable调用run方法 都可以被线程调用,但callable的call方法具有返回值( ...

  2. java executor 源码_Java线程池ThreadPoolExecutor深度探索及源码解析

    我们的程序里,时常要使用多线程.因此多线程的管理变的尤为重要.ThreadPoolExecutor很好的解决了这一点.本篇文章主要从源码入手,分析ThreadPoolExecutor的原理. 1.标记 ...

  3. java set和get原理_Java线程池的实现原理和使用

    为什么用线程池 在我们进行开发的时候,为了充分利用系统资源,我们通常会进行多线程开发,实现起来非常简单,需要使用线程的时候就去创建一个线程(继承Thread类.实现Runnable接口.使用Calla ...

  4. java线程池的应用_Java线程池的使用

    Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了, ...

  5. java线程池存在时间_Java线程池基础

    目录: 一.线程池概述 1.线程池类 目前线程池类一般有两个,一个来自于Spring,一个来自于JDK: 来自Spring的线程池:org.springframework.scheduling.con ...

  6. java 线程池的理解_JAVA线程池原理的理解

    线程池原理基础理解: 线程池初始化规定个数的线程,然后这些线程一直运行,并且监控线程队列,只要线程队列被添加进线程,那么线程池不断从队列中取出线程运行.直到队列中的线程为空.实例代码如下: packa ...

  7. java线程池 的方法_JAVA线程池的实现方法

    我们大家都知道,在处理多线程服务并发时,由于创建线程需要占用很多的系统资源,所以为了避免这些不必要的损耗,通常我们采用线程池来解决这些问题. 线程池的基本原理是,首先创建并保持一定数量的线程,当需要使 ...

  8. java线程池多线程优先级_Java线程优先级

    java线程池多线程优先级 Priority of a thread describes how early it gets execution and selected by the thread ...

  9. java线程池的概念_Java线程池的基本概念以及生命周期

    一.为什么要实现线程池? 线程的创建与销毁对于CPU而言开销较大,通过池化技术可避免重复的创建与销毁线程. 方便与线程资源统一管理. 二.几种常见的线程池以及核心参数 不推荐使用Executor创建线 ...

最新文章

  1. 职责链模式(Chain of Responsibility)(对象行为型)
  2. VUE的本地应用-V- if
  3. TP3.2.3 页面跳转后 Cookie 失效 —— 参考解决方案
  4. linux内存占用过高原因
  5. Druid配置文件demo
  6. 语法之知识点的改进(Func/Action)
  7. “约见”面试官系列之常见面试题之第九十三篇之vue获取数据在哪个周期函数(建议收藏)
  8. linux下C语言简单实现线程池
  9. BetterFE 前端技术周刊 - 2019/03/11
  10. Qt 互斥量 QMutex
  11. 小D课堂-SpringBoot 2.x微信支付在线教育网站项目实战_5-6.微信扫码登录回调本地域名映射工具Ngrock...
  12. DB2 导入CSV文件
  13. Web 前端:知道这些,至少有底气去面试了
  14. PC式硬盘录像机常见故障剖析,监控卡常见问题(一)
  15. html投影电脑,如何将电脑内容显示到投影仪或电视上?详细教程奉上
  16. ssl免费证书获取,并在nginx服务器上安装ssl证书,以及docker安装nginx需注意的细节。
  17. ArcGIS 矢量编辑札记(一):Field Calculator 篇
  18. 云服务器哪家比较好呢?
  19. SQL Server安装中错误该性能计数器注册表配置单元已损坏。若要继续,必须修复该性能计数器注册表配置单元的解决
  20. Cloudera Manager安装之利用parcels方式安装3或4节点集群(包含最新稳定版本或指定版本的安装)(添加服务)(CentOS6.5)(五)...

热门文章

  1. Python center 用法
  2. NLPCC:预训练在小米的推理优化落地
  3. 听说你想去大厂看妹子,带你看看腾讯产品运营实习面经
  4. 从C语言的角度重构数据结构系列(八)-数据结构堆知识超级丑数
  5. Keras版Sequence2Sequence对对联实战——自然语言处理技术
  6. api如何使用_什么是API, API是如何工作的?
  7. 结构损伤检测与智能诊断 陈长征_阜康危房检测价格
  8. Python入门100题 | 第072题
  9. Python入门100题 | 第059题
  10. 深度学习100例 | 第24天-卷积神经网络(Xception):动物识别