线程池invokeAll方法详解

  • 问题起源与抽象
  • 问题排查与猜测
    • 猜测一:invokeAll 在异步执行后会不会同步等待线程执行完毕获取最终结果
    • 猜测二:队列里面可能存在第一次调用 invokeAll 执行了但没有删掉的任务,所以才会导致第二次放入队列失败
  • 两次猜测失败后的总结
  • 复查源码,真相大白
  • 问题解决方案
  • 参考

线上真实案例,多次调用线程池 ThreadPoolExecutor 的 invokeAll() 方法进行数据统计时任务被拒绝,故事从此开始。

本文重在讲述问题的产生、抽象、寻找解决方法的过程,并结合源码对原因进行抽丝剥茧般的分析。bug 千千万万,唯有合理的逻辑推理思维才能让这些 bug 显露原形。

问题起源与抽象

先来看一段简单的代码,定义一个核心线程数5、有界队列5的线程池,然后创建10个任务丢进去执行2次。

按照以前对线程池执行逻辑的理解,创建的10个线程,会先交给核心线程去执行,5个核心线程满了之后,存放到队列中,刚好存储剩下的5个,按理说10个任务都会正常执行完毕。本次只测试固定大小的线程池。

public class InvokeAllTest {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,60 * 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),new MyThreadFactory());public static void main(String[] args) {List<Callable<Void>> tasks = new ArrayList<>();for (int i = 0; i < 10; i++) {tasks.add(new InvokeAllThread());}System.out.println("第一次任务执行前的executor: " + executor);try {executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一次任务执行完毕后的executor: " + executor);System.out.println("==============第一次任务执行完毕,开始第二次任务============");try {Thread.sleep(1000);executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次任务执行完毕后的executor:" + executor);}// 任务执行线程。通过打印线程名称,观察提交的任务被哪个线程执行static class InvokeAllThread implements Callable<Void> {@Overridepublic Void call() throws Exception {System.out.println(Thread.currentThread().getName());return null;}}// 给工作线程自定义名字,方便观察提交的任务被哪个线程执行static class MyThreadFactory implements ThreadFactory {private AtomicInteger threadNum = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, String.valueOf(threadNum.getAndIncrement()));if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}

运行程序后发现,第一次调用 invokeAll 正常执行,第二次调用报错。多次执行结果相同。

第一次任务执行前的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
1
2
3
4
4
5
3
2
3
3
第一次任务执行完毕后的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10]
==============第一次任务执行完毕,开始第二次任务============
2
4
5
2
1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3a71f4dd rejected from java.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 13]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:238)at com.aaron.hp.thread.pool.InvokeAllTest.main(InvokeAllTest.java:36)

问题排查与猜测

既然程序出现异常,就该调用 debug 模式进行排查,并遵循"大胆猜测,小心求证"的态度,去解决这个问题。

猜测一:invokeAll 在异步执行后会不会同步等待线程执行完毕获取最终结果

由于 invokeAll 封装的太好,之前只知道最后会同步等待才能获取返回值。那么现在就需要去证实这个概念。

进入 invokeAll 方法后,发现调用了f.get(),那么毫无疑问,这个猜测可以排除掉了。

其实从执行过程的输出内容也可以看出,两次调用 invokeAll 的执行顺序和界限(打印语句) 非常明显。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 任务被添加后的具体执行execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {// 此处同步等待f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}
}

猜测二:队列里面可能存在第一次调用 invokeAll 执行了但没有删掉的任务,所以才会导致第二次放入队列失败

由于未阅读源码,猜测只有当创建的任务执行完毕并且销毁之后,才会从队列中真正移除。

那么就需要查看入队列和出队列的时机。查看 invokeAll 方法中的 execute(f) 方法。

查看 ThreadPoolExecutor 类下的 execute 方法源码:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 判断工作线程数是否小于核心线程数,如果是则创建 Worker 工作线并返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 判断主线程是否在运行,并判断是否入队列成功if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 否则重新创建 Worker 线程,创建失败则抛出拒绝策略else if (!addWorker(command, false))reject(command);
}

此时就会发现入队列的操作在workQueue.offer(command)处完成,而我们提交的任务是由一个叫 Worker 类的实例来执行,addWorker(command, true)创建 Worker 实例。

那么我们就分别进去这两个方法来看下源码:

矮油黑人问号脸。。没想到这个 ThreadPoolExecutor 类的 addWorker 这么长,给核心代码写个注释重点关注,扫一眼然后去看 offer 方法(英文注释是源码中自带的)。前面都是校验,创建核心线程处为new Worker(firstTask)

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 上面一堆都是校验,此处才是 Worker 被创建的地方,注意被传入的 firstTaskw = new Worker(firstTask);// 此处发现 Worker 里面居然还有个 therad 线程,不过想想也是,没有线程怎么异步执行呢。点进 Worker 的构造方法看一眼就会发现,这个线程就是由我们自定义的 threadFactory 来创建的,所以核心线程名称就是我们之前设定好的名字。this.thread = getThreadFactory().newThread(this);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 实例成功创建后,让它启动起来t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

接着是 ArrayBlockingQueue 类的 offer 方法,在 enqueue(e)处进入队列:

public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {// 进入队列enqueue(e);return true;}} finally {lock.unlock();}
}

此时我们先来调试一波,看看入队列时这些方法的执行情况,在三个 if 处分别设置断点,在 addWorker 和 offer 方法靠前的未知打断点,确定是否会进入。

第一次调用 invokeAll:addWorker 进入5次,offer 方法进入5次。

第二次调用 invokeAll:addWorker 进入0次,offer 方法进入10次(可能是5-10次)。

那么发现了新的问题:程序居然没报错!正常执行完成!这不科学!

带着疑惑,重新 debug,居然还没报错!难道之前的异常是偶然吗?

以最快速度连按 F9 debug了几次,有时候报错。。

重新运行 run 了几次,次次报错。。

怀疑人生了。。

此时墨菲定律在我头脑中回响,“偶然事件存在必然的因素”。那么大胆猜测,这个原因极有可能是队列消费速度较慢导致的,去查看消费部分的源码。由于 worker 也是一个线程,那么肯定有类似的 run 方法:

查看 ThreadPoolExecutor 类 的 Worker 这个内部类,找到 run() 方法:

public void run() {runWorker(this);
}

而 run 方法调用的是 ThreadPoolExecutor 类里的 runWorker(this)

final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 此处注意,将 worker 里存入的 firstTask 取出来,交给下面的 while 去执行Runnable task = w.firstTask;// 将 worker 里的 firstTask 属性置空w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 不为空,即取出的 firstTask 不为空,则执行;否则调用 getTask() 方法获取 task 再执行while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 此处为空实现,可自定义beforeExecute(wt, task);Throwable thrown = null;try {// 调用 task 的 run 方法执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

查看 ThreadPoolExecutor 类下的 getTask() 方法:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 此处为出队列操作,poll 和 take 的区别在于,poll 会等待指定时间,而 take 是阻塞的,会一直等待Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

看到这里,猜测二也就不攻自破,出队列后任务才会被执行,所以某个任务出队列后,执行成功与否与队列再无瓜葛。(注意这个说法只针对默认代码,如果自定义了拒绝策略是可以将被 interrupt 的线程重新塞回队列里的)

两次猜测失败后的总结

  1. 队列是异步消费的,但入队是同步进行的,如果队列的容量不足以承载要存入队列的任务数,就会被拒绝。(虽然是 ArrayBlockQueue 的特性,但这是通过 debug 以及 run 后观察到的)

  2. 第一次 addWorker 方法执行了5次,offer 执行了5次;第二次则是 0 次,10 次。刚才忽略了这个细节,那么需要重新找到相应的源码阅读。

  3. 任务从队列中移除与任务是否执行完毕无关,先移除,后执行。

  4. 我们创建的任务,是由 worker 核心线程去调用任务的 run 方法来同步执行的,而不是调用任务实例的 start 去异步执行,这也就是为什么 invokeAll 可以获取到返回值的原因所在。

    **备注:**这里有点绕,任务实例指的是我们最开始在 for 循环中创建的10个tasks new InvokeAllThread(),为什么继承了 Callable 明明改写的是 call()方法,但却有 run()方法可以被调用呢?这是因为在 invokeAll()方法执行execute()方法前,通过RunnableFuture<T> f = newTaskFor(t);进行了包装。

复查源码,真相大白

查看 ThreadPoolExecutor 类下的 execute() 方法,创建 worker 前的判断如下:

if (workerCountOf(c) < corePoolSize) { ...}

第一次调用 invokeAll 时,线程池中的核心线程 worker 数为0,小于 corePoolSize,所以前5次会创建 worker 核心线程并返回,此时随着 worker 的创建,我们创建的10个任务中的5个也会随着 worker 的创建作为 firstTask 属性被传进去。后5个任务则被放入 queue 中。

第二次调用 invokeAll 时,线程池中的核心数已经是5,所以10个任务都会被放入 queue 中异步消费,但是我们的 queue 的容量为5。如果消费速度快于入队速度(debug),那么10个任务会正常执行。但是入队速度太快的话(run),前5个肯定可以入队,后面的5个几乎都会被拒绝。

问题解决方案

  1. 对于固定大小的线程池,我们要按照实际情况设置 queue 和 worker 的数量。根据任务类型(IO/CPU)以及机器配置(CPU 核数等)设置 worker 核心线程数;而根据我们的任务多少来设定 queue 的大小,而不是 queue + worker 的总数。
  2. 重写拒绝策略,将被丢弃的任务重新 put 回队列中去,put 是阻塞的。

参考

ThreadPoolExecutor源码分析及阻塞提交任务方法

Thread的中断机制(interrupt)

线程池invokeAll方法详解相关推荐

  1. 并发编程五:java并发线程池底层原理详解和源码分析

    文章目录 java并发线程池底层原理详解和源码分析 线程和线程池性能对比 Executors创建的三种线程池分析 自定义线程池分析 线程池源码分析 继承关系 ThreadPoolExecutor源码分 ...

  2. java线程池ThreadPoolExecutor类详解

    线程池有哪些状态 1. RUNNING:  接收新的任务,且执行等待队列中的任务 Accept new tasks and process queued tasks  2. SHUTDOWN: 不接收 ...

  3. 【多线程】线程池拒绝策略详解与自定义拒绝策略

    线程池的拒绝策略 ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略 CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务 ...

  4. 线程与线程池(一条龙详解)

    一:前言 一个问题引出的学习笔记 并发类库提供的线程池实现有哪些? 其实Executors已经为我们封装好了 4 种常见的功能线程池,如下: 定长线程池(FixedThreadPool) 定时线程池( ...

  5. Java线程池七大参数详解和配置

    目录 一.corePoolSize核心线程数 二.maximunPoolSize最大线程数 三.keepAliveTime空闲线程存活时间 四.unit空闲线程存活时间的单位 五.workQueue线 ...

  6. ThreadPoolExecutor线程池核心参数详解

    理解ThreadPoolExecutor线程池的corePoolSize.maximumPoolSize和poolSize 我们知道,受限于硬件.内存和性能,我们不可能无限制的创建任意数量的线程,因为 ...

  7. python队列线程池_实例详解:python高级编程之消息队列(Queue)与进程池(Pool)

    今天为大家带来的内容是:python高级编程之消息队列(Queue)与进程池(Pool),结合了实例的形式详细分析了Python消息队列与进程池的相关原理.使用技巧与操作注意事项!!! Queue消息 ...

  8. 线程池之ThreadPoolExecutor详解

    转自:https://thinkwon.blog.csdn.net/article/details/102541900

  9. java线程池详解及五种线程池方法详解

    基础知识 Executors创建线程池 Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThrea ...

最新文章

  1. android获取小程序音频时长,最新微信小程序获取音频时长与实时获取播放进度...
  2. 解决在ascx使用outputcache就不可以设置用户控件自己的属性
  3. leetcode 219. 存在重复元素 II(规定步长)
  4. 计算机基础及wps office应用_自考本科计算机应用基础考试大纲
  5. [css] 如何形成BFC?
  6. 蛋疼的中文编码及其计算机编码历史
  7. java操作sql数据库_java-JDBC连接数据库并进行SQL操作
  8. OpenShift 4 - DevSecOps Workshop (4) - 为 Task 增加参数和Workspace
  9. 2003-can't connect to MYSQL server on 'localhost'(10038)
  10. 大数据分析常用的方法有哪些
  11. 37.Linux/Unix 系统编程手册(下) -- DAEMON
  12. NDCG、AUC介绍
  13. Top 10 tough core Java interview questions answers programming
  14. 电子邮箱市场盈利模式
  15. Linux环境下Python操作word
  16. nginx变量ngx.var
  17. 阿里、京东、百度“激战”互联网医疗
  18. 扫频光学相干层析原理(SS-OCT)
  19. html.append清空,关于jquery的append()和html()使用
  20. 下载Stani‘s Python Editor

热门文章

  1. qpOASES使用笔记
  2. git下载子模块命令git clone --recursive和git submodule update --init
  3. 删除Management Data Warehouse (MDW) job失败
  4. Basler相机安装教程
  5. Unity3d 改变场景中钢体对象重力(Physics和Physics 2D)大小方向设置
  6. 圣诞节计算机音乐,圣诞节音乐
  7. 0x00007FF8DE6BD1E2 (ucrtbased.dll)处(位于 6-指针与动态内存申请.exe 中)引发的异常: 0xC0000005: 写入位置 0xFFFFFFFF9288D140
  8. MVC 音乐商店 第1部分: 概述和文件- 新建项目
  9. Planbar 2018 新功能 BIM 加密狗更新
  10. java中怎么让字体可以显示下划线呢_java中怎么让字体可以显示下划线呢