前言:在我们日常的并发编程中,如果每一个请求都对应一个线程,这样不仅线程的创建和销毁都将很占用资源,也会给系统带来巨大的压力,很容易造成系统崩溃。鉴于此,线程池的出现就很好的化解了这一问题。线程的池化就是指创建指定个数的线程放到容器中,有任务的时候线程会去执行,任务处理完就会放回线程池等待其他执行其他任务。如此就可以做到线程的重复使用。接下来我们就开始来学习ThreadPoolExecutor源码。

一:ThreadPoolExecutor中重要的成员变量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

具体解析:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))

ctl 是一个原子类,利用高地位来维护了两个概念,线程数量(workerCount)和线程池的状态(runState),其中高 3 位表示线程池的状态(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED),低 29 位维护了线程池中当前活动的线程数量。

private static final int COUNT_BITS = Integer.SIZE - 3;

COUNT_BITS 表示的是workerCount 所占用的位数,int 占用32位,那么COUNT_BITS 就占用 32-3 = 29 位。

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

CAPACITY 表示线程数量(workerCount)的上限,1 左移 29 位后减 1,就是:1的二进制为 000 000 000 000 000 000 000 000 000 000 01 左移 29 位后得到 001 000 000 000 000 000 000 000 000 000 00 减 1 得到 000 111 111 111 111 1 11 111 111 111 111 11 这其中前 3 位表示线程池的运行状态,后 29 位才表示线程数量的最大值 29 个 1 ,即:536870911。

private static final int RUNNING  = -1 << COUNT_BITS;

RUNNING 的状态时能接受新提交的任务,并且也能处理阻塞队列中的任务。-1 的二进制为 111 111 111 111 111 111 111 111 111 111 11 左移 29 位后 111 000 000 000 000 000 000 000 000 000 00 即:高 3 位 都为 1 低 29 位都为 0 表示RUNNING状态。

private static final int SHUTDOWN  =  0 << COUNT_BITS;

SHUTDOWN 表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING状态时,调用shutdown() 方法会使线程池进入到该状态以及finalize()方法在执行过程中也会调用 shutdown()方法进入该状态。0 的二进制为 000 000 000 000 000 000 000 000 000 000 00 左移 29 位后还是 000 000 000 000 000 000 000 000 000 000 00 即:高 3 位 都为 0 低 29 位都为 0 表示SHUTDOWN状态。

private static final int STOP  =  1 << COUNT_BITS;

STOP不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNINGSHUTDOWN状态时,调用 shutdownNow()方法会使线程池进入到该状态。1 的二进制为 000 000 000 000 000 000 000 000 000 000 01 左移 29 位后得到 001 000 000 000 000 000 000 000 000 000 00 即:高 3 位为 001 低 29 位都为 0 表示 STOP状态。

private static final int TIDYING  =  2 << COUNT_BITS;

TIDYING当所有的任务都终止了,workerCount 为 0 时,线程池进入该状态后会调用 terminated()方法进入 TERMINATED状态。2 的二进制为 000 000 000 000 000 000 000 000 000 000 10 左移 29 位后得到 010 000 00 0 000 000 000 000 000 000 000 00 即:高 3 位为 010 低 29 位都为 0 表示 TIDYING 状态。

private static final int TERMINATED  =  3 << COUNT_BITS;

TERMINATEDterminated()方法执行完后进入该状态。3 的二进制为 000 000 000 000 000 000 000 000 000 000 11 左移 29 位后得到 011 000 000 000 000 000 000 000 000 000 00 即:高 3 位为 011 低 29 位都为 0 表示 TERMINATED状态。

状态之间的转化图示:

二:ThreadPoolExecutor中重要的成员方法

 private static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }

具体解析:

 private static int ctlOf(int rs, int wc) { return rs | wc; }

我们先看 ctlOf这个方法,rs 表示runState,从上面的分析我们可以看出 runState 都是高 3 位有值,低 29 位全部为 0 的intwc 表示 workerCount 都是高 3 位全部为 0 ,低 29 位有值得int。后对 rswc 进行 运算(两个都为 0 时才为 0,其他情况均为 1 )。其实就是获取runState 的高 3 位,workerCount 低29位填充的数。

private static int workerCountOf(int c)  { return c & CAPACITY; }

workerCountOf传入的 c 是上面变量 ctl的值,其中高 3 位为线程池运行状态 runState,低 29 位为线程池中当前活动的线程数量 workerCount然后与 CAPACITY( 000 111 111 111 111 1 11 111 111 111 111 11 )进行 &(两个都为 1 时才为 1,其他情况均为 0 ) 操作。得到的结果就是 高三位肯定 000 接下里就是 c 的 低 29 位与CAPACITY的低 29 位进行 & 运算 结果得到的还是 c 的低 29 位,因此也就拿到了 workerCount 的值。

 private static int runStateOf(int c)     { return c & ~CAPACITY; }

runStateOf传入的 c 是上面变量 ctl的值,其中高 3 位为线程池运行状态runState,低 29 位为线程池中当前活动的线程数量workerCount 然后与 CAPACITY 的取反进行 & 运算。 CAPACITY 的二进制为 000 111 111 111 111 1 11 111 111 111 111 11 取 ~ 后得到 111 000 000 000 000 000 000 000 000 000 00 与 c 进行 & 运算后得到前三位得到计算,后 29 位依然是 0 ,此时就拿到了线程池的运行状态 runState

三:ThreadPoolExecutor的构造方法

在学习ThreadPoolExecutor的构造方法之前,我们先了解一下几个重要的参数:

序号 名称 类型 含义
1 corePoolSize int 核心线程大小
2 maximumPoolSize int 最大线程大小
3 keepAliveTime long 超出核心线程数量以外的线程空余存活时间
4 unit TimeUnit 时间单位
5 workQueue BlockingQueue 线程等待队列
6 threadFactory ThreadFactory 线程创建工厂
7 handler RejectedExecutionHandler 拒绝策略
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {//检查参数的合理性if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

参数的具体解析:

  • corePoolSize:指核心线程数,线程池在完成初始化后默认情况下,线程池中并没有任何线程,线程是会等待任务到来时才会去创建线程去执行任务。

  • maximumPoolSize:指最大线程数量,线程池也会在核心线程的基础之上去额外创建一些线程,但是新增加的线程数有一个上限,就是 maximumPoolSize 指定的数量。

  • keepAliveTime:如果线程池当前线程数多于corePoolSize,且多余的线程空闲时间超过keepAliveTime,他们就会被终止回收

  • workQueue:保存等待执行的任务的阻塞队列,一般会有三种常见的队列类型:

    1. 直接交接:SynchronousQueue 是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。所以只能做暂时交接,使用这个队列 maximumPoolSize 最好设置偏大一点,因为可能会导致线程创建的个数偏多。
    2. 有界队列:一般使用ArrayBlockingQueue,是一个有界缓存等待队列,可以指定缓存队列的大小,当线程数量大于corePoolSize时,多余的任务会缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行;当ArrayBlockingQueue满时,则又会开启新的线程去执行,直到线程数量达到maximumPoolSize;当线程数已经达到最大的maximumPoolSize时,再有新的任务到达时会执行拒绝策略。
    3. 无界队列:一般使用LinkedBlockingQueue,是一个无界(没有大小限制)缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待,在使用此阻塞队列时maximumPoolSizes就相当于无效了。
  • threadFactory:它是 ThreadFactory 类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的 ThreadFactory 来创建线程时,会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程,同时也设置了线程的名称。

  • keepAliveTime :超出核心线程数量以外的线程空余存活时间。当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,当等待的时间超过了keepAliveTime就会销毁。

  • handler:表示线程池的拒绝策略。如果队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:

    1. AbortPolicy:直接抛出异常,这是默认策略;
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务;

corePoolSizemaximumPoolSize 之间的切换遵循以下规则:

  1. 如果线程数小于corePoolSize,即使有其他线程处于空闲状态,也会创建一个新线程去执行新的任务;
  2. 如果线程数等于或者大于 corePoolSize 但小于 maximumPoolSize ,则会把当前任务放到队列 workQueue
  3. 如果队列已满,并且线程数小于 maximumPoolSize 那么就会创建一个新线程去执行任务;
  4. 如果运行的线程数量大于等于maximumPoolSize,且队列 workQueue 已经满了,则通过handler所指定的策略来处理新任务;

具体流程图如下:

四:ThreadPoolExecutor中的execute()方法

public void execute(Runnable command) {// 校验参数if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取到 ctl 里的值 int c = ctl.get();// 获取当前活跃的线程数量 如果活跃度线程数量小于核心线程数 尝试添加新的worker线程去处理提交的任务// 如果添加线程去处理返回 true 则表示添加成功 否则 就重新获取当前活跃的线程数量if (workerCountOf(c) < corePoolSize) {/** addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;* 如果为true,根据corePoolSize来判断;* 如果为false,则根据maximumPoolSize来判断*/if (addWorker(command, true))return;c = ctl.get();}// 判断线程池的状态是否是 RUNNING 状态,即 可以接受新任务,也可以处理队列任务的状态// 如果是 RUNNING 状态就尝试把任务提交到任务队列中去if (isRunning(c) && workQueue.offer(command)) {// 上述条件都成功// 再次获取 ctl 的值进行二次检查int recheck = ctl.get();// 如果当前线程池的状态不是 RUNNING 且从任务队队列中移除之前提交任务 成功后调用 reject 方法拒绝任务if (! isRunning(recheck) && remove(command))reject(command);// 如果当前工作线程为 0 那么就添加一个不携带任务的工作线程else if (workerCountOf(recheck) == 0)/** 这里传入的参数表示:* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,*    添加线程时根据 maximumPoolSize 来判断;* 如果判断 workerCount 大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。*/addWorker(null, false);}/** 如果执行到这里,有两种情况:* 1. 线程池已经不是RUNNING状态;* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。*/// 如果添加线程去处理任务失败 那么调用reject()方法拒绝任务command,// 线程数的判断利用maximumPoolSize作为边界约束条件else if (!addWorker(command, false))reject(command);
}

具体解析:如果线程池在启动后处于 RUNNING 状态:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;

  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;

  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;

  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

五:ThreadPoolExecutor中的addWork()方法

addWorker()方法是用来在线程池中添加新的线程去执行任务,该方法传入两个参数,firstTask 是指新增加的线程去执行的任务,core 表示为为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSizefalse 表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

private boolean addWorker(Runnable firstTask, boolean core) {// 避免死循环retry:for (;;) {// 获取 ctl 的值int c = ctl.get();// 获取运行状态int rs = runStateOf(c);// Check if queue empty only if necessary./** 如果线程处于非运行状态 且 rs 不等于 SHUTDOWN 且 firstTask 不等于空 且 workQueue 为空* 那么就返回 false 表示不能添加 work *  1. (rs >= SHUTDOWN)线程池已经 shutdown 后,还要添加新的任务,拒绝*  2. ( (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) 这个条件的意思是*      当进入 SHUTDOWN 状态,传进来的任务为空,且任务队列不为空的时候,此时是允许添加新线程的,但是*             对这个条件取反的时候 就不可以添加新线程了。*/if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//获取工作的线程数int wc = workerCountOf(c);//如果工作线程数大于默认容量大小或者大于核心线程数大小,则返回 false 表示不可以添加新线程了。//core 是之前传入的参数 true表示根据corePoolSize比较,false根据maximumPoolSize比较if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过 cas 来增加工作线程数,如果 cas 成功,则跳出循环if (compareAndIncrementWorkerCount(c))break retry;//再次获取 ctl 的值c = ctl.get();  // Re-read ctl// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 根据 firstTask 来创建一个 Workerw = new Worker(firstTask);// 每个 Worker 中都会根据线程工厂创建一个线程 final Thread t = w.thread;if (t != null) {// 重入锁,避免并发问题final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// rs < SHUTDOWN表示是RUNNING状态 // 如果线程池处于RUNNING状态 或者 处于 SHUTDOWN 但是 firstTask 为空 才可以添加线程// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//这里没启动if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//将新创建的 Worker 添加到 workers 集合中workers.add(w);int s = workers.size();//如果集合中的工作线程数大于最大线程数,这个最大线程数表示线程池曾经出现过的最大线程数if (s > largestPoolSize)//更新线程池出现过的最大线程数largestPoolSize = s;//创建成功workerAdded = true;}} finally {//释放锁mainLock.unlock();}if (workerAdded) {//启动线程t.start();workerStarted = true;}}} finally {//如果添加失败,要减掉实际工作线程数,因为我们最开始的时候增加了工作线程数if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

addWorker 方法主要包括两部分,第一部分主要是通过循环CAS来操作线程的个数,对其加 1,第二部分主要是新建一个线程并启用。至于内部类 Worker,上面t.start() 启动线程后,我们就把内部类 Worke对象传了进去,内部类 Worker 是实现了 runable 接口的,jvm 执行 run 方法的时候就会执行 Worker 中的 run 方法,也就启动了下面分析的 runWorker 方法。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {/ // 禁止中断,直到runWorkersetState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//维护的线程this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

六:ThreadPoolExecutor中的addWorkerFailed()方法

如果添加 Worker 并且启动线程失败,则会做失败后的处理。

 private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)// 如果 worker 已经构造好了,则从 workers 集合中移除这个 workerworkers.remove(w);// 原子递减核心线程数decrementWorkerCount();// 尝试结束线程池tryTerminate();} finally {mainLock.unlock();}}

七:ThreadPoolExecutor中的runWorker()方法

前面看了 ThreadPoolExecutoraddWorker()方法,其主要是用来增加工作线程,而runWorker()方法是线程池中执行任务的真正处理逻辑。


final void runWorker(Worker w) {//获取线程Thread wt = Thread.currentThread();//获取任务Runnable task = w.firstTask;w.firstTask = null;/*** unclock 是表明 worker 线程是可以被中断的 * 在 new Worker 时有 setState(-1) * unclock 方法里面调用了内部类 Worker 中的 tryRelease() 方法,将 state 置为 0,* 在 Worker 中 interruptIfStarted() 方法中 state>=0 才允许调用中断* state = 0 时表示解锁状态 state = 1 时表示锁定状态*/w.unlock(); // allow interrupts// 是否因为异常退出循环boolean completedAbruptly = true;try {//  while 循环为了实现线程复用// 如果 task 为空,则通过getTask 来获取任务while (task != null || (task = getTask()) != null) {//加锁、防止在 shutdown() 时不终止正在运行的 workerw.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt/*** 线程在 stop 状态的时候是不接受新任务,不执行已经加入任务队列的任务,还中断正在执行的任务* 所以 (runStateAtLeast(ctl.get(), STOP) 条件表示如果线程处于 stop 状态以上则中断线程* ( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))* Thread.interrupted()来判断是否中断是为了确保在 RUNNING 或者 SHUTDOWN 状态时线程是非中* 断状态的,因为 Thread.interrupted() 方法会复位中断的状态* 此时确保线程中断标志位为 true 且是 stop 状态以上,接着清除了中断标志* !wt.isInterrupted()则再一次检查保证线程需要设置中断标志位*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//这里默认是个空实现,我们可以自己继承 ThreadpoolExecutor 去重写beforeExecute(wt, task);Throwable thrown = null;try {//执行任务的 run 方法task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {//这这里对任务进行置空,方便下次循环进来通过 getTask() 去获取任务task = null;//记录完成的任务数量w.completedTasks++;//解锁w.unlock();}}completedAbruptly = false;} finally {//用来销毁工作线程,下面会有具体解析processWorkerExit(w, completedAbruptly);}}

其实上面的代码主要干了四件事:

  1. 通过 while 循环的条件 getTask()去任务队列获取任务;
  2. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  3. 然后通过task.run()执行任务;
  4. 直到 getTask()为 null 时表示任务执行完毕,则跳出循环,执行processWorkerExit()方法销毁线程。

八:ThreadPoolExecutor中的getTask()方法

worker 线程会通过getTask()方法从任务队列中获取需要执行的任务

private Runnable getTask() {// timeOut变量的值表示上次从阻塞队列中取任务时是否超时boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary./*** 当线程池状态rs >= SHUTDOWN,也就是非RUNNING状态 且 满足下面两个条件下面的任意一个* 则 将 workerCount 减 1 并返回 null* 1: 线程池状态为 stop( shutdownNow()会导致变成 STOP )* 2: workQueue 为空( shutdown 状态的线程池还是要执行 workQueue 中剩余的任务 )* * 原因在于当前线程池状态的值是SHUTDOWN或以上时,就不允许再向阻塞队列中添加任务*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?/*** timed 变量用于判断是否需要进行超时控制* allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时* wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量* 对于超过核心线程数量的这些线程,需要进行超时控制*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 1: wc > maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法*    否则已经 addWorker()成功不会超过 maximumPoolSize* 2: timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中*    获取任务发生了超时.其实就是体现了空闲线程的存活时间* 3: 有效线程数量大于 1 或者任务队列为空 ** 如果满足上面条件则将 workerCount 减 1**/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {/*** 如果 timed = true * 则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;* 如果 timed = false * 则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空**/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//如果拿到的任务不为空,则直接返回给 worker 进行处理if (r != null)return r;///如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收timedOut = true;} catch (InterruptedException retry) {// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试timedOut = false;}}}

从上面的代码中我们可以看到在第二个if语句中,对线程池的有效线程数量进行了控制,在上文分析中我们知道如果当前线程池的线程数量超过了 corePoolSize 且小于maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但是如果获取任务超时的情况下没有拿到任务,此时timedOuttrue,说明workQueue已经为空了。此时,也就表明当前线程池中有的线程已经处于空闲的状态了,也就可以把多余的线程进行销毁,保持线程数量在 corePoolSize即可。销毁线程在runWorker方法执行完之后,也就是Worker 中的 run方法执行完,由 JVM 自动回收,而且当getTask 方法返回 null时,在 runWorker 方法中会跳出 while循环,然后会执行processWorkerExit方法。

九:ThreadPoolExecutor中的processWorkerExit()方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//统计完成的任务数completedTaskCount += w.completedTasks;// 从workers中移除,也就表示着从线程池中移除了一个工作线程workers.remove(w);} finally {mainLock.unlock();}// 根据线程池状态进行判断是否结束线程池tryTerminate();int c = ctl.get();/** 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。*/if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}

至此,至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

十:ThreadPoolExecutor中的shutdown()方法

shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers()方法请求中断所有空闲的worker,最后调用tryTerminate()尝试结束线程池。

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 安全策略判断checkShutdownAccess();// 切换状态为SHUTDOWNadvanceRunState(SHUTDOWN);// 中断空闲线程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试结束线程池 使线程池的状态设置为TERMINATED。tryTerminate();
}private void interruptIdleWorkers() {interruptIdleWorkers(false);}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;//加锁 因为workers是HashSet类型的,不能保证线程安全mainLock.lock();try {//遍历workers中所有的工作线程for (Worker w : workers) {Thread t = w.thread;//如果线程没有被中断 且 tryLock 成功 那么就中断该线程if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}final void tryTerminate() {for (;;) {int c = ctl.get();/** 当前线程池的状态为以下几种情况时,直接返回:* 1. RUNNING,因为还在运行中,不能停止;* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果线程数量不为0,则中断一个空闲的工作线程,并返回if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// terminated方法默认什么都不做,留给子类实现terminated();} finally {// 设置状态为TERMINATEDctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

十一:ThreadPoolExecutor中的shutdownNow()方法

 public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 切换状态为STOPadvanceRunState(STOP);// 中断所有工作线程,无论是否空闲interruptWorkers();// 取出队列中没有被执行的任务tasks = drainQueue();} finally {mainLock.unlock();}//使线程池的状态设置为TERMINATEDtryTerminate();return tasks;}private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//中断所有工作线程,无论是否是空闲的for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}

十二:线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

public class MyThreadPoolExecutor extends ThreadPoolExecutor {private final static Logger logger = LoggerFactory.getLogger(MyThreadPoolExecutor.class);// 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间private ConcurrentHashMap<String, Date> startTimes;public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, longkeepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit,workQueue);this.startTimes = new ConcurrentHashMap<>();}@Overridepublic void shutdown() {logger.info("已经执行的任务数:{},当前活动线程数:{},当前排队线程数:{}", this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());super.shutdown();}//任务开始之前记录任务开始时间@Overrideprotected void beforeExecute(Thread t, Runnable r) {startTimes.put(String.valueOf(r.hashCode()), new Date());super.beforeExecute(t, r);}@Overrideprotected void afterExecute(Runnable r, Throwable t) {Date startDate = startTimes.remove(String.valueOf(r.hashCode()));Date finishDate = new Date();long diff = finishDate.getTime() - startDate.getTime();// 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、// 已完成任务数量、任务总数、队列里缓存的任务数量、// 池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止logger.info("任务耗时:{}", diff);logger.info("初始线程数:{}", this.getPoolSize());logger.info("核心线程数:{}", this.getCorePoolSize());logger.info("正在执行的任务数量:{}", this.getActiveCount());logger.info("已经执行的任务数:{}", this.getCompletedTaskCount());logger.info("任务总数:{}", this.getTaskCount());logger.info("最大允许的线程数:{}", this.getMaximumPoolSize());logger.info("线程空闲时间:{}", this.getKeepAliveTime(TimeUnit.MILLISECONDS));super.afterExecute(r, t);}public static ExecutorService newCachedThreadPool() {return new MyThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue());}
}//测试
class Test implements Runnable {private static ExecutorService executorService = MyThreadPoolExecutor.newCachedThreadPool();@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {for (int i = 0; i < 100; i++) {executorService.execute(new Test());}executorService.shutdown();}
}

十三:Executors里提供的线程池 API

为了方便对于线程池的使用,在 Executors 里面提供了几个线程池的工厂方法,只需要直接使用Executors 的工厂方法,就可以使用线程池:

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

FixedThreadPoolcorePoolSizemaximumPoolSize都是指定的值且相等,也就是说当线程池中的线程数都是核心线程,当线程数超过核心线程数后,任务都会被放到阻塞队列中。另外 keepAliveTime 为 0,该参数对核心线程无效也就是说超出核心线程数量以外的线程无存活时间,这里用的阻塞队列是LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限,如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出
这个线程池执行任务的流程如下:

  1. 线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
  2. 线程数等于核心线程数后,将任务加入阻塞队列
  3. 由于队列容量非常大,可以一直添加
  4. 执行完任务的线程反复去队列中取任务执行

适用场景:FixedThreadPool 用于负载比较大的服务器,可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞,为了资源的合理利用,需要限制当前线程数量。

CachedThreadPool

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

CachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程; 其中corePoolSize = 0maximumPoolSize = Integer.MAX_VALUE 说明其没有核心线程,非核心线程数无上限,但是keepAliveTime = 60s表明每个线程空闲的时间只有 60 秒,超过后就会被回收。SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待
它的执行流程如下:

  1. 没有核心线程,直接向 SynchronousQueue 中提交任务
  2. 如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个
  3. 执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就被回收

适用场景:CachedThreadPool快速处理大量耗时较短的任务,如NettyNIO接受请求时,可使用CachedThreadPool

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

SingleThreadExecutor 创建了一个corePoolSizemaximumPoolSize都是 1 ,也就是说创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序( FIFO, LIFO, 优先级)执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}

newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。

 public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}

创建一个定长的线程池,而且支持定时、延迟以及周期性的任务执行。

十四:总结

本文从ThreadPoolExecutor的成员变量到具体的成员方法都进行比较详细的分析,对重要的源码也做了详细的注释。总结如下:

  • 首先分析了主要的几个成员变量和成员方法,为下文的分析打下了基础;
  • 接着从构造方法出发,分析了几个重要的参数;
  • 然后从execute方法出发到最后的shutdown方法,展现了线程池的工作流程,其中也分析了线程池是如何通过corePoolSizemaximumPoolSize以及workQueue之间的配合来决定任务的执行、阻塞或者拒绝;
  • 最后分析了Executors里提供的线程池 API, 提供了四个线程池的工厂方法,只需要直接使用Executors 的工厂方法,就可以使用线程池,同时也分析了他们的具体使用场景;

参考博文:https://blog.csdn.net/djzhao/article/details/82192918

Java并发编程之ThreadPoolExecutor源码解析相关推荐

  1. Java并发编程之FutureTask源码解析

    上次总结一下AQS的一些相关知识,这次总结了一下FutureTask的东西,相对于AQS来说简单好多呀 之前提到过一个LockSupport的工具类,也了解一下这个工具类的用法,这里也巩固一下吧 /* ...

  2. Java并发编程之CountDownLatch源码解析

    一.导语 最近在学习并发编程原理,所以准备整理一下自己学到的知识,先写一篇CountDownLatch的源码分析,之后希望可以慢慢写完整个并发编程. 二.什么是CountDownLatch Count ...

  3. Java并发编程之ThreadLocal源码分析

    1 一句话概括ThreadLocal   什么是ThreadLocal?顾名思义:线程本地变量,它为每个使用该对象的线程创建了一个独立的变量副本. 2 ThreadLocal使用场景   用一句话总结 ...

  4. Android 网络编程之OkHttp源码解析

    前言:OkHttp框架是Android的网络请求框架,无数的项目都在使用着这个框架,重要性不言而喻; 本文会将OKHTTP的源码进行拆解,每个部分来单独学习,由简入深,循序渐进,篇幅较长,建议收藏,慢 ...

  5. Java并发修改异常的源码解析

    1. 什么时候会产生并发修改异常 并发的意思是同时发生,那么其实并发修改的字面意思就是同时修改,通过查看JDK的API我们可以得知,并发修改异常的出现的原因是:当方法检测到对象的并发修改,但不允许这种 ...

  6. 并发编程之 Semaphore 源码分析

    前言 并发 JUC 包提供了很多工具类,比如之前说的 CountDownLatch,CyclicBarrier ,今天说说这个 Semaphore--信号量,关于他的使用请查看往期文章并发编程之 线程 ...

  7. 并发编程之 ThreadLocal 源码剖析

    前言 首先看看 JDK 文档的描述: 该类提供了线程局部 (thread-local) 变量.这些变量不同于它们的普通对应物,因为访问某个变量(通过其 get 或 set 方法)的每个线程都有自己的局 ...

  8. 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等

    线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...

  9. 面试官系统精讲Java源码及大厂真题 - 37 ThreadPoolExecutor 源码解析

    37 ThreadPoolExecutor 源码解析 当你做成功一件事,千万不要等待着享受荣誉,应该再做那些需要的事. -- 巴斯德 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可 ...

  10. Java并发编程之CyclicBarrier详解

    简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...

最新文章

  1. linux下修改网卡接口名称
  2. R语言-画edcf图、直方图、正态概率图,计算分位数(任意分位)
  3. 奇奇seo优化软件_信阳seo优化排名软件
  4. 安装deepin linux
  5. rxjs of操作符传入数组的单步执行
  6. sql server中创建数据库和表的语法
  7. 初中计算机课教什么时候,初中计算机教学课程教学方法探讨
  8. for循环python爬虫_python爬虫 for循环只出来一条
  9. android应用程序架构由哪四个组成,android 应用程序结构是哪些
  10. oracle 导入单表,oracle 10g 中单个数据表的导入、导出
  11. 协议 UDP、TCP UDP的接发数据的步骤
  12. STM32—驱动GY85-IMU模块
  13. linux pdf放到一页,linux – 如何将多个PDF页面连接到单个页面
  14. 第8代CPU i5-8250U 电脑安装核显 Windows 7 x64位驱动
  15. 海外 Android 三方应用市场
  16. 【转】清华本科结业生两年的工作经历
  17. 网络工程师考试内容纲要
  18. Hive/MaxCompute SQL性能优化(三):数据倾斜优化实战
  19. 今天刚到货的小米平板2,就出现dnx fastboot mode一直卡死黑屏
  20. 图片的质量压缩和二次采样

热门文章

  1. 10-2 系统设计真题解析:短网址系统的设计与实现
  2. redis连接异常 redis.clients.jedis.exceptions.JedisClusterException CLUSTERDOWN The cluster is down
  3. Allegro_SMT手工焊接辅助程序
  4. 转叶寒栋分享:方舟支持C语言编译新功能开源啦!
  5. 华三交换机配置vrrp_VRRP原理与配置 华为、华三交换机,路由器
  6. C++ 知识补给(二)
  7. Ubuntu搭建饥荒(Don't Starve Together)游戏服务器
  8. 习题:求1-n的阶乘之和,用一个非递归函数fac(n)求n!,n的值由主函数输入,最终得到的结果在主函数中输出。
  9. 深入浅出 - Android系统移植与平台开发(四)- Android启动流程
  10. 服务器抓不到mrcp协议,mrcp与一句话识别