前言

以前使用线程池,对execute 、 submit 等方法提交线程池任务的区别比较模糊,现在通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,submit 等方法提交线程池任务的区别,来深入理解线程池及并发编程。

 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(6, 10, 10L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(20),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
  • 若当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
  • 若运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
  • 若加入 BlockingQueue 成功,需要二次检查线程池的状态。
  • 若线程池没有处于Running,则从 BlockingQueue 移除任务,启动拒绝策略。
  • 若线程池处于Running状态,则检查工作线程(worker)是否为0。若为0,则创建新的线程来处理任务。
  • 若加入 BlockingQueue失败,则创建新的线程来处理任务。
  • 若启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。

ThreadPoolExecutor 提交线程池任务

ThreadPoolExecutor 的继承关系

从接口Executor来看,这个接口就是一个实现,就是执行execute方法,这个接口就是线程执行的入口。

  • ExecutorService接口继承了Executor接口,里面的的方法比较多,常见的shutdownNow,shutdown就是在这个接口里面的,还有就是常见往线程池里面提交任务的时候submit方法。
  • ExecutorService丰富了对任务执行和管理的功能。
  • AbstractExecutorService是一个抽象类,实现了ExecutorService接口。

线程池 execute() 的工作逻辑

外界通过 execute 这个方法来向线程池提交任务。

addWorker

addWork() 的两个参数,第一个是需要提交的线程 Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。

execute()

execute() 中有三处调用了 addWork()逐一分析:

  • 条件 if (workerCountOf(c) < corePoolSize) 这个很好理解,工作线程数少于核心线程数,提交任务,所以 addWorker(command, true)。
  • 如果 workerCountOf(recheck) == 0,如果worker的数量为0,那就 addWorker(null,false)。为什么这里是 null ?之前已经把 command 提交到阻塞队列了 workQueue.offer(command) 。所以提交一个空线程,直接从阻塞队列里面取就可以了。
  • 如果线程池没有 RUNNING 或者 offer 阻塞队列失败,addWorker(command,false),对应的就是,阻塞队列满了,将任务提交到,非核心线程池,与最大线程池比较。

测试案例

使用 execute 提交任务,线程池内抛出异常会导致线程退出,线程池只能重新创建一个线程。

如果每个异步任务都以异常结束,那么线程池可能完全起不到线程重用的作用。
而且主线程无法捕获(catch)到线程池内抛出的异常。因为没有手动捕获异常进行处理,ThreadGroup 帮我们进行了未捕获异常的默认处理,向标准错误输出,打印了出现异常的线程名称异常信息

显然,这种没有以统一的错误日志格式记录错误信息打印出来的形式,对生产级代码是不合适的。
如下,execute 提交任务,抛出异常后,从线程名称可以看出,老线程退出,创建了新的线程。

ThreadGroup 处理未捕获异常:直接输出到 System.err


解决方式:
以 execute 方法提交到线程池的异步任务,最好在任务内部做好异常处理;
设置自定义的异常处理程序作为保底,比如在声明线程池时自定义线程池的未捕获异常处理程序。或者设置全局的默认未捕获异常处理程序。

 // 自定义线程池的未捕获异常处理程序ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,30, TimeUnit.MINUTES,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("pool-%d").setUncaughtExceptionHandler((Thread t, Throwable e) -> {log.error("pool happen exception, thread is {}", t, e);}).build());// 设置全局的默认未捕获异常处理程序static {Thread.setDefaultUncaughtExceptionHandler((thread, throwable)-> {log.error("Thread {} got exception", thread, throwable)});}  

定义的异常处理程序将未捕获的异常信息打印到标准日志中了,老线程同样会退出。如果要避免这个问题,就需要使用 submit 方法提交任务。

/** 百度在线网络技术(北京)有限公司拥有本软件版权2021并保留所有权利。* Copyright 2021, Baidu.com,Inc 2:Baidu Online Network Technology (Beijing) Co.,Ltd,* All rights reserved.*/package com.azdebugit.threapool.executor;import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class JunitTest {private static Logger log = LoggerFactory.getLogger(JunitTest.class.getSimpleName());@Testpublic void poolTest() throws InterruptedException {// 自定义线程池的未捕获异常处理程序ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,30, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("pool-%d").setUncaughtExceptionHandler((Thread t, Throwable e) -> {log.error("pool happen exception, thread is {}", t, e);}).build());try {IntStream.rangeClosed(1,6).forEach(i-> {executor.execute(() -> {if (i == 3) {log.info("throw execptions");throw new RuntimeException("error");}log.info("doing i={}", i);});});}catch (Exception e){log.info("catch execptions");e.printStackTrace();}executor.awaitTermination(1,TimeUnit.MINUTES);}// 设置全局的默认未捕获异常处理程序static {Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {log.error("Thread {} got exception", thread, throwable);});}}

测试运行结果,如下:

定义的异常处理程序将未捕获的异常信息打印到标准日志中了,老线程同样会退出。如果要避免这个问题,就需要使用 submit 方法提交任务。

使用submit提交任务

使用submit,线程不会退出,但是异常不会记录,会被生吞掉。

查看FutureTask源码可以发现,在执行任务出现异常之后,异常存到了一个outcome字段中,只有在调用get方法获取FutureTask结果的时候,才会以ExecutionException的形式重新抛出异常。所以我们可以通过捕获get方法抛出的异常来判断线程的任务是否抛出了异常。

submit提交任务,可以通过Future获取返回结果,如果抛出异常,可以捕获ExecutionException得到异常栈信息。通过线程名称可以看出,老线程也没有退出。

 @Testpublic void poolTestSubmit() throws InterruptedException {// 自定义线程池的未捕获异常处理程序ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,30, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("pool-%d").setUncaughtExceptionHandler((Thread t, Throwable e) -> {log.error("pool happen exception, thread is {}", t, e);}).build());/* try {IntStream.rangeClosed(1,6).forEach(i-> {executor.execute(() -> {if (i == 3) {log.info("throw execptions");throw new RuntimeException("error");}log.info("doing i={}", i);});});}catch (Exception e){log.info("catch execptions");e.printStackTrace();}*/List<Future<?>> result = IntStream.rangeClosed(1,6).mapToObj(i-> {return executor.submit(() -> {if (i == 3) {log.info("throw execptions");throw new RuntimeException("error");}log.info("doing i={}", i);});}).collect(Collectors.toList());for (Future<?> future:result){try {future.get();
//               log.info("get future ={}", future.get());} catch (ExecutionException e) {log.info("catch error");}}

需要注意的是,使用submit时,setUncaughtExceptionHandler设置的异常处理器不会生效。

submit 与 execute 的区别

  • execute提交的是Runnable类型的任务,而submit提交的是Callable或者Runnable类型的任务;
  • execute的提交没有返回值,而submit的提交会返回一个Future类型的对象;
  • execute提交的时候,如果有异常,就会直接抛出异常,而submit在遇到异常的时候,通常不会立马抛出异常,而是会将异常暂时存储起来,等待你调用Future.get()方法的时候,才会抛出异常;
  • execute 提交的任务抛出异常,老线程会退出,线程池会立即创建一个新的线程。
  • submit 提交的任务抛出异常,老线程不会退出;
  • 线程池设置的 UncaughtExceptionHandler 对 execute 提交的任务生效,对 submit 提交的任务不生效。

ForkJoinPool 提交线程池任务

Java8的parallelstream背后,是共享同一个ForkJoinPool,默认并行度是CPU核数-1。对于CPU绑定的任务来说,使用这样的配置比较合适,但如果集合操作,涉及同步IO操作的话(比如数据库操作、外部服务调用等),建议自定义一个ForkJoinPool(或普通线程池)。因此在使用Java8的并行流时,建议只用在计算密集型的任务,IO密集型的任务,建议自定义线程池来提交任务,避免影响其它业务。

计算机中一个任务,一般是由一个线程来处理的。

如果此时出现了一个非常耗时的大任务,比如对一个大的ArrayList,每个元素进行+1操作,如果是普通的ThreadPoolExecutor,就会出现线程池中,只有一个线程,正在处理这个大任务,而其他线程却空闲着,这会导致CPU负载不均衡,空闲的处理器无法帮助工作。

ForkJoinPool就是用来解决这种问题的:将一个大任务拆分成多个小任务后,使用fork可以将小任务,分发给其他线程同时处理,使用join,可以将多个线程处理的结果,进行汇总---->分治思想的并行版本。

ForkJoinPool 提交任务有三种方式:

  • invoke()会等待任务计算完毕并返回计算结果;
  • execute()是直接向池提交一个任务来异步执行,无返回结果;
  • submit()也是异步执行,但是会返回提交的任务,在适当的时候可通过task.get()获取执行结果。

submit提交线程池任务

Executors提供的一些预设的线程池,比如单线程线程池SingleThreadExecutor,固定大小的线程池FixedThreadPool等。
但是很少有人会注意到其中还提供了一种特殊的线程池:newWorkStealingPool,我们点进这个方法,会看到和其他方法不同的是,这种线程池并不是通过ThreadPoolExecutor来创建的,而是ForkJoinPool来创建的。

ThreadPoolExecutor与ForkJoinPool这两种线程池之间并不是继承关系,而是平级关系:

  • ThreadPoolExecutor是一个基本的存储线程的线程池,需要执行任务的时候就从线程池中拿一个线程来执行。
  • 而ForkJoinPool不是ThreadPoolExecutor的代替品,这种线程池是为了实现“分治法”这一思想而创建的,通过把大任务拆分成小任务,然后再把小任务的结果汇总起来就是最终的结果,和MapReduce的思想很类似

ForkJoinPool最核心的思想可以这样描述:

if(任务很小){直接计算得到结果
}else{分拆成N个子任务调用子任务的fork()进行计算调用子任务的join()合并计算结果
}

fork()

fork()方法类似于线程的Thread.start()方法,但是它不是真的启动一个线程,而是将任务放入到工作队列中。

join()

join()方法类似于线程的Thread.join()方法,但是它不是简单地阻塞线程,而是利用工作线程运行其它任务。

当一个工作线程中调用了join()方法,它将处理其它任务,直到注意到目标子任务已经完成了。

ForkJoinPool内部原理-工作窃取

work-stealing(工作窃取)算法

ForkJoinPool 的另一个特性是,它使用了work-stealing(工作窃取)算法:
线程池内的所有工作线程,都尝试找到并执行已经提交的任务,或者是被其他活动任务,创建的子任务(如果不存在就阻塞等待)。

这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。

尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理,来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。

ForkJoinPool中的任务

ForkJoinPool 中的任务分为两种:

  • 一种是本地提交的任务(Submission task,如 execute、submit 提交的任务);
  • 另外一种是 fork 出的子任务(Worker task)。

两种任务都会存放在 WorkQueue 数组中,但是这两种任务并不会混合在同一个队列里,ForkJoinPool 内部使用了一种随机哈希算法(有点类似 ConcurrentHashMap 的桶随机算法),将工作队列与对应的工作线程关联起来,Submission 任务存放在 WorkQueue 数组的偶数索引位置,Worker 任务存放在奇数索引位。
实质上,Submission 与 Worker 一样,只不过它被限制只能执行它们提交的本地任务,在后面的源码解析中,我们统一称之为“Worker”。
任务的分布情况如下图:

ForkJoinPool以Deque为(双端队列)数据结构为基础,通过work-stealing算法,可以充分利用CPU多核进行复杂计算,避免线程进入阻塞或闲置状态,使线程进行饱和工作。

这对java的并发处理问题的能力是很大的提升。也避免了大范围使用锁带来的问题。

但也有缺点,每个worker线程维护一个任务队列,在任务窃取时需要volatile控制,join时可能造成的阻塞,都是一定的资源消耗,但相对于它的性能提升,都是可以接受的。

fork 出子任务的案例演示:

由于代码太多,请详见我的gitee

以下用gif进行演示其中的执行过程

execute提交线程池任务

public class ForkJoinPoolTest {
//    private static Logger log = LoggerFactory.getLogger(ForkJoinPoolTest.class.getSimpleName());private static final int threads = 3;CountDownLatch countDownLatch = new CountDownLatch(threads);@Testpublic void test1() throws InterruptedException {System.out.println("-------- begin ----------");ForkJoinPool forkJoinPool = new ForkJoinPool();
//        ExecutorService forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());for (int i = 0; i < threads; i++) {forkJoinPool.execute(new Runnable() {@Overridepublic void run() {try {System.out.println("getParallelism=" + forkJoinPool.getParallelism());System.out.println("getStealCount=" + forkJoinPool.getStealCount());System.out.println(Thread.currentThread().getName());} catch (Exception e) {e.printStackTrace();} finally {countDownLatch.countDown();}}});}countDownLatch.await();System.out.println("-------- end ----------");}
}

可以看出ForkJoinPool可以根据CPU的核数,并行的执行,适合使用在很耗时的操作,可以充分的利用CPU执行任务。

线程池创建正确姿势

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多,会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。

线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时,创建销毁线程开销的代价,另一方面避免了线程数量膨胀,导致的过分调度问题,保证了对内核的充分利用。

使用线程池的好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。

  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池,可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

使用线程池,要解决的问题是什么:

线程池解决的核心问题,就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性,将带来以下若干问题:

  • 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  • 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  • 系统无法合理管理内部的资源分布,会降低系统的稳定性。

确认线程池本身是不是复用的?

使用了线程池,就需要确保线程池是在复用的,每次new一个线程池出来,可能比不用线程池还糟糕。如果你没有直接声明线程池,而是使用其他人提供的类库,来获得一个线程池,请务必查看源码,以确认线程池的实例化方式和配置是符合预期的。

线程池配置

我们需要根据自己的场景、并发情况,来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:

  • 对于执行比较慢、数量不大的IO任务,要考虑更多的线程数,而不需要太大的队列。
  • 对于吞吐量较大的计算型任务,线程数量不宜过多,可以是CPU核数或核数*2(理由是,线程若调度到某个CPU进行执行,如果任务本身是CPU绑定的任务,那么过多的线程,只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。
  • 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。

除了建议手动声明线程池以外,还建议用一些监控手段,来观察线程池的状态。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。

参考引用

  • https://www.cnblogs.com/chiangchou/p/thread-pool.html
  • https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
  • https://cloud.tencent.com/developer/article/1732607
  • https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-ForkJoinPool.html

通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,submit 等方法提交线程池任务的区别,来深入理解线程池及并发编程相关推荐

  1. [Java并发编程(二)] 线程池 FixedThreadPool、CachedThreadPool、ForkJoinPool?为后台任务选择合适的 Java executors...

    [Java并发编程(二)] 线程池 FixedThreadPool.CachedThreadPool.ForkJoinPool?为后台任务选择合适的 Java executors ... 摘要 Jav ...

  2. Java并发编程之线程池ThreadPoolExecutor解析

    线程池存在的意义 平常使用线程即new Thread()然后调用start()方法去启动这个线程,但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源,不仅增加GC回收压力,并 ...

  3. struts1-2,springMVC原理基本对比(单例,多例)-servlet与filter区别

    2019独角兽企业重金招聘Python工程师标准>>> 最近做项目用到了struts2,之前一直是用struts1和springMVC.感觉到了struts2从很大程度上和这两个还是 ...

  4. 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

    文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...

  5. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  6. Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)

    为什么80%的码农都做不了架构师?>>>    这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情.并发课题对于Java来说是一个又重要又难的一大块 ...

  7. python和java对比并发_Python并发编程之从性能角度来初探并发编程(一)

    本文目录并发编程的基本概念 单线程VS多线程VS多进程 性能对比成果总结 前言 作为进阶系列的一个分支「并发编程」,我觉得这是每个程序员都应该会的. 并发编程 这个系列,我准备了将近一个星期,从知识点 ...

  8. [并发编程] - Executor框架#ThreadPoolExecutor源码解读03

    文章目录 Pre execute源码分析 addWorker()解读 Worker解读 Pre [并发编程] - Executor框架#ThreadPoolExecutor源码解读02 说了一堆结论性 ...

  9. [并发编程] - Executor框架#ThreadPoolExecutor源码解读02

    文章目录 Pre 线程池的具体实现 线程池的创建 参数解读 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactor ...

最新文章

  1. 【力扣网练习题】最长公共前缀
  2. 【推荐系统】深入理解YouTube推荐系统算法
  3. 2014\Province_C_C++_B\3 李白打酒
  4. vim 下的 ex 指令(底行命令模式下)
  5. 导入表格只有一行 帆软_万万没想到!把x个表格合合合合成一份,10分钟就搞定...
  6. SmartImage图片第三方控件android
  7. 【BZOJ 1031】[JSOI2007]字符加密Cipher(后缀数组模板)
  8. 登录账号用户名判断_如何设计 QQ、微信等第三方账号登陆 ?
  9. 142.PHP session 阻塞问题
  10. mysql数据库关联查询慢_mysql数据库多表关联查询的慢SQL优化
  11. spark MLlib机器学习教程
  12. Chrome OS Factory开发测试流程
  13. Tilera 服务器上hadoop单机版测试
  14. 我有一个梦,叫“禾下乘凉梦“!
  15. 《华盛人》技术服务支持
  16. 服务器kvm切换器怎么使用?
  17. 迷宫算法总结(最短路径)BFS宽度优先
  18. html5支持4k视频,【4K电影大礼包】目前压缩最好的五部4KHEVC(H.265)格式电影
  19. 用户体验设计学习总结(下)
  20. 一块硬盘装了黑苹果 一块硬盘装了win7_英特尔NUC8黑苹果教程(详细)

热门文章

  1. Awesome Flutter筛选与实用度分析
  2. 消炎抗生素滴眼液行业调研报告 - 市场现状分析与发展前景预测(2021-2027年)
  3. Impala shell命令
  4. InSAR基础:距离向分辨率和方位向分辨率
  5. 时间、延时、延缓操作
  6. 亚美尼亚副总理代表团造访清微智能
  7. rdkit中 logP, mr, TPSA, Labute ASA 讲解
  8. 中控考勤系统设置说明
  9. centos7关闭防火墙命令(centos7关闭防火墙命令)
  10. 【常用html、bat、QQ、py代码】