通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,submit 等方法提交线程池任务的区别,来深入理解线程池及并发编程
前言
以前使用线程池,对execute 、 submit 等方法提交线程池任务的区别比较模糊,现在通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,submit 等方法提交线程池任务的区别,来深入理解线程池及并发编程。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(6, 10, 10L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(20),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
- 若当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
- 若运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
- 若加入 BlockingQueue 成功,需要二次检查线程池的状态。
- 若线程池没有处于Running,则从 BlockingQueue 移除任务,启动拒绝策略。
- 若线程池处于Running状态,则检查工作线程(worker)是否为0。若为0,则创建新的线程来处理任务。
- 若加入 BlockingQueue失败,则创建新的线程来处理任务。
- 若启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
ThreadPoolExecutor 提交线程池任务
ThreadPoolExecutor 的继承关系
从接口Executor来看,这个接口就是一个实现,就是执行execute方法,这个接口就是线程执行的入口。
- ExecutorService接口继承了Executor接口,里面的的方法比较多,常见的shutdownNow,shutdown就是在这个接口里面的,还有就是常见往线程池里面提交任务的时候submit方法。
- ExecutorService丰富了对任务执行和管理的功能。
- AbstractExecutorService是一个抽象类,实现了ExecutorService接口。
线程池 execute() 的工作逻辑
外界通过 execute 这个方法来向线程池提交任务。
addWorker
addWork() 的两个参数,第一个是需要提交的线程 Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。
execute()
execute() 中有三处调用了 addWork()逐一分析:
- 条件 if (workerCountOf(c) < corePoolSize) 这个很好理解,工作线程数少于核心线程数,提交任务,所以 addWorker(command, true)。
- 如果 workerCountOf(recheck) == 0,如果worker的数量为0,那就 addWorker(null,false)。为什么这里是 null ?之前已经把 command 提交到阻塞队列了 workQueue.offer(command) 。所以提交一个空线程,直接从阻塞队列里面取就可以了。
- 如果线程池没有 RUNNING 或者 offer 阻塞队列失败,addWorker(command,false),对应的就是,阻塞队列满了,将任务提交到,非核心线程池,与最大线程池比较。
测试案例
使用 execute 提交任务,线程池内抛出异常会导致线程退出,线程池只能重新创建一个线程。
如果每个异步任务都以异常结束,那么线程池可能完全起不到线程重用的作用。
而且主线程无法捕获(catch)到线程池内抛出的异常。因为没有手动捕获异常进行处理,ThreadGroup 帮我们进行了未捕获异常的默认处理,向标准错误输出,打印了出现异常的线程名称和异常信息。
显然,这种没有以统一的错误日志格式记录错误信息打印出来的形式,对生产级代码是不合适的。
如下,execute 提交任务,抛出异常后,从线程名称可以看出,老线程退出,创建了新的线程。
ThreadGroup 处理未捕获异常:直接输出到 System.err
解决方式:
以 execute 方法提交到线程池的异步任务,最好在任务内部做好异常处理;
设置自定义的异常处理程序作为保底,比如在声明线程池时自定义线程池的未捕获异常处理程序。或者设置全局的默认未捕获异常处理程序。
// 自定义线程池的未捕获异常处理程序ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,30, TimeUnit.MINUTES,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("pool-%d").setUncaughtExceptionHandler((Thread t, Throwable e) -> {log.error("pool happen exception, thread is {}", t, e);}).build());// 设置全局的默认未捕获异常处理程序static {Thread.setDefaultUncaughtExceptionHandler((thread, throwable)-> {log.error("Thread {} got exception", thread, throwable)});}
定义的异常处理程序将未捕获的异常信息打印到标准日志中了,老线程同样会退出。如果要避免这个问题,就需要使用 submit 方法提交任务。
/** 百度在线网络技术(北京)有限公司拥有本软件版权2021并保留所有权利。* Copyright 2021, Baidu.com,Inc 2:Baidu Online Network Technology (Beijing) Co.,Ltd,* All rights reserved.*/package com.azdebugit.threapool.executor;import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class JunitTest {private static Logger log = LoggerFactory.getLogger(JunitTest.class.getSimpleName());@Testpublic void poolTest() throws InterruptedException {// 自定义线程池的未捕获异常处理程序ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,30, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("pool-%d").setUncaughtExceptionHandler((Thread t, Throwable e) -> {log.error("pool happen exception, thread is {}", t, e);}).build());try {IntStream.rangeClosed(1,6).forEach(i-> {executor.execute(() -> {if (i == 3) {log.info("throw execptions");throw new RuntimeException("error");}log.info("doing i={}", i);});});}catch (Exception e){log.info("catch execptions");e.printStackTrace();}executor.awaitTermination(1,TimeUnit.MINUTES);}// 设置全局的默认未捕获异常处理程序static {Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {log.error("Thread {} got exception", thread, throwable);});}}
测试运行结果,如下:
定义的异常处理程序将未捕获的异常信息打印到标准日志中了,老线程同样会退出。如果要避免这个问题,就需要使用 submit 方法提交任务。
使用submit提交任务
使用submit,线程不会退出,但是异常不会记录,会被生吞掉。
查看FutureTask源码可以发现,在执行任务出现异常之后,异常存到了一个outcome字段中,只有在调用get方法获取FutureTask结果的时候,才会以ExecutionException的形式重新抛出异常。所以我们可以通过捕获get方法抛出的异常来判断线程的任务是否抛出了异常。
submit提交任务,可以通过Future获取返回结果,如果抛出异常,可以捕获ExecutionException得到异常栈信息。通过线程名称可以看出,老线程也没有退出。
@Testpublic void poolTestSubmit() throws InterruptedException {// 自定义线程池的未捕获异常处理程序ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,30, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new ThreadFactoryBuilder().setNameFormat("pool-%d").setUncaughtExceptionHandler((Thread t, Throwable e) -> {log.error("pool happen exception, thread is {}", t, e);}).build());/* try {IntStream.rangeClosed(1,6).forEach(i-> {executor.execute(() -> {if (i == 3) {log.info("throw execptions");throw new RuntimeException("error");}log.info("doing i={}", i);});});}catch (Exception e){log.info("catch execptions");e.printStackTrace();}*/List<Future<?>> result = IntStream.rangeClosed(1,6).mapToObj(i-> {return executor.submit(() -> {if (i == 3) {log.info("throw execptions");throw new RuntimeException("error");}log.info("doing i={}", i);});}).collect(Collectors.toList());for (Future<?> future:result){try {future.get();
// log.info("get future ={}", future.get());} catch (ExecutionException e) {log.info("catch error");}}
需要注意的是,使用submit时,setUncaughtExceptionHandler设置的异常处理器不会生效。
submit 与 execute 的区别
- execute提交的是Runnable类型的任务,而submit提交的是Callable或者Runnable类型的任务;
- execute的提交没有返回值,而submit的提交会返回一个Future类型的对象;
- execute提交的时候,如果有异常,就会直接抛出异常,而submit在遇到异常的时候,通常不会立马抛出异常,而是会将异常暂时存储起来,等待你调用Future.get()方法的时候,才会抛出异常;
- execute 提交的任务抛出异常,老线程会退出,线程池会立即创建一个新的线程。
- submit 提交的任务抛出异常,老线程不会退出;
- 线程池设置的 UncaughtExceptionHandler 对 execute 提交的任务生效,对 submit 提交的任务不生效。
ForkJoinPool 提交线程池任务
Java8的parallelstream背后,是共享同一个ForkJoinPool,默认并行度是CPU核数-1。对于CPU绑定的任务来说,使用这样的配置比较合适,但如果集合操作,涉及同步IO操作的话(比如数据库操作、外部服务调用等),建议自定义一个ForkJoinPool(或普通线程池)。因此在使用Java8的并行流时,建议只用在计算密集型的任务,IO密集型的任务,建议自定义线程池来提交任务,避免影响其它业务。
计算机中一个任务,一般是由一个线程来处理的。
如果此时出现了一个非常耗时的大任务,比如对一个大的ArrayList,每个元素进行+1操作,如果是普通的ThreadPoolExecutor,就会出现线程池中,只有一个线程,正在处理这个大任务,而其他线程却空闲着,这会导致CPU负载不均衡,空闲的处理器无法帮助工作。
ForkJoinPool就是用来解决这种问题的:将一个大任务拆分成多个小任务后,使用fork可以将小任务,分发给其他线程同时处理,使用join,可以将多个线程处理的结果,进行汇总---->分治思想的并行版本。
ForkJoinPool 提交任务有三种方式:
- invoke()会等待任务计算完毕并返回计算结果;
- execute()是直接向池提交一个任务来异步执行,无返回结果;
- submit()也是异步执行,但是会返回提交的任务,在适当的时候可通过task.get()获取执行结果。
submit提交线程池任务
Executors提供的一些预设的线程池,比如单线程线程池SingleThreadExecutor,固定大小的线程池FixedThreadPool等。
但是很少有人会注意到其中还提供了一种特殊的线程池:newWorkStealingPool,我们点进这个方法,会看到和其他方法不同的是,这种线程池并不是通过ThreadPoolExecutor来创建的,而是ForkJoinPool来创建的。
ThreadPoolExecutor与ForkJoinPool这两种线程池之间并不是继承关系,而是平级关系:
- ThreadPoolExecutor是一个基本的存储线程的线程池,需要执行任务的时候就从线程池中拿一个线程来执行。
- 而ForkJoinPool不是ThreadPoolExecutor的代替品,这种线程池是为了实现“分治法”这一思想而创建的,通过把大任务拆分成小任务,然后再把小任务的结果汇总起来就是最终的结果,和MapReduce的思想很类似
ForkJoinPool最核心的思想可以这样描述:
if(任务很小){直接计算得到结果
}else{分拆成N个子任务调用子任务的fork()进行计算调用子任务的join()合并计算结果
}
fork()
fork()方法类似于线程的Thread.start()方法,但是它不是真的启动一个线程,而是将任务放入到工作队列中。
join()
join()方法类似于线程的Thread.join()方法,但是它不是简单地阻塞线程,而是利用工作线程运行其它任务。
当一个工作线程中调用了join()方法,它将处理其它任务,直到注意到目标子任务已经完成了。
ForkJoinPool内部原理-工作窃取
work-stealing(工作窃取)算法
ForkJoinPool 的另一个特性是,它使用了work-stealing(工作窃取)算法:
线程池内的所有工作线程,都尝试找到并执行已经提交的任务,或者是被其他活动任务,创建的子任务(如果不存在就阻塞等待)。
这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。
尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理,来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。
ForkJoinPool中的任务
ForkJoinPool 中的任务分为两种:
- 一种是本地提交的任务(Submission task,如 execute、submit 提交的任务);
- 另外一种是 fork 出的子任务(Worker task)。
两种任务都会存放在 WorkQueue 数组中,但是这两种任务并不会混合在同一个队列里,ForkJoinPool 内部使用了一种随机哈希算法(有点类似 ConcurrentHashMap 的桶随机算法),将工作队列与对应的工作线程关联起来,Submission 任务存放在 WorkQueue 数组的偶数索引位置,Worker 任务存放在奇数索引位。
实质上,Submission 与 Worker 一样,只不过它被限制只能执行它们提交的本地任务,在后面的源码解析中,我们统一称之为“Worker”。
任务的分布情况如下图:
ForkJoinPool以Deque为(双端队列)数据结构为基础,通过work-stealing算法,可以充分利用CPU多核进行复杂计算,避免线程进入阻塞或闲置状态,使线程进行饱和工作。
这对java的并发处理问题的能力是很大的提升。也避免了大范围使用锁带来的问题。
但也有缺点,每个worker线程维护一个任务队列,在任务窃取时需要volatile控制,join时可能造成的阻塞,都是一定的资源消耗,但相对于它的性能提升,都是可以接受的。
fork 出子任务的案例演示:
由于代码太多,请详见我的gitee
以下用gif进行演示其中的执行过程
execute提交线程池任务
public class ForkJoinPoolTest {
// private static Logger log = LoggerFactory.getLogger(ForkJoinPoolTest.class.getSimpleName());private static final int threads = 3;CountDownLatch countDownLatch = new CountDownLatch(threads);@Testpublic void test1() throws InterruptedException {System.out.println("-------- begin ----------");ForkJoinPool forkJoinPool = new ForkJoinPool();
// ExecutorService forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());for (int i = 0; i < threads; i++) {forkJoinPool.execute(new Runnable() {@Overridepublic void run() {try {System.out.println("getParallelism=" + forkJoinPool.getParallelism());System.out.println("getStealCount=" + forkJoinPool.getStealCount());System.out.println(Thread.currentThread().getName());} catch (Exception e) {e.printStackTrace();} finally {countDownLatch.countDown();}}});}countDownLatch.await();System.out.println("-------- end ----------");}
}
可以看出ForkJoinPool可以根据CPU的核数,并行的执行,适合使用在很耗时的操作,可以充分的利用CPU执行任务。
线程池创建正确姿势
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多,会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。
线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时,创建销毁线程开销的代价,另一方面避免了线程数量膨胀,导致的过分调度问题,保证了对内核的充分利用。
使用线程池的好处:
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池,可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
使用线程池,要解决的问题是什么:
线程池解决的核心问题,就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性,将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
确认线程池本身是不是复用的?
使用了线程池,就需要确保线程池是在复用的,每次new一个线程池出来,可能比不用线程池还糟糕。如果你没有直接声明线程池,而是使用其他人提供的类库,来获得一个线程池,请务必查看源码,以确认线程池的实例化方式和配置是符合预期的。
线程池配置
我们需要根据自己的场景、并发情况,来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:
- 对于执行比较慢、数量不大的IO任务,要考虑更多的线程数,而不需要太大的队列。
- 对于吞吐量较大的计算型任务,线程数量不宜过多,可以是CPU核数或核数*2(理由是,线程若调度到某个CPU进行执行,如果任务本身是CPU绑定的任务,那么过多的线程,只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。
- 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。
除了建议手动声明线程池以外,还建议用一些监控手段,来观察线程池的状态。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。
参考引用
- https://www.cnblogs.com/chiangchou/p/thread-pool.html
- https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
- https://cloud.tencent.com/developer/article/1732607
- https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-ForkJoinPool.html
通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,submit 等方法提交线程池任务的区别,来深入理解线程池及并发编程相关推荐
- [Java并发编程(二)] 线程池 FixedThreadPool、CachedThreadPool、ForkJoinPool?为后台任务选择合适的 Java executors...
[Java并发编程(二)] 线程池 FixedThreadPool.CachedThreadPool.ForkJoinPool?为后台任务选择合适的 Java executors ... 摘要 Jav ...
- Java并发编程之线程池ThreadPoolExecutor解析
线程池存在的意义 平常使用线程即new Thread()然后调用start()方法去启动这个线程,但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源,不仅增加GC回收压力,并 ...
- struts1-2,springMVC原理基本对比(单例,多例)-servlet与filter区别
2019独角兽企业重金招聘Python工程师标准>>> 最近做项目用到了struts2,之前一直是用struts1和springMVC.感觉到了struts2从很大程度上和这两个还是 ...
- 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )
文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...
- 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )
文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...
- Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)
为什么80%的码农都做不了架构师?>>> 这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情.并发课题对于Java来说是一个又重要又难的一大块 ...
- python和java对比并发_Python并发编程之从性能角度来初探并发编程(一)
本文目录并发编程的基本概念 单线程VS多线程VS多进程 性能对比成果总结 前言 作为进阶系列的一个分支「并发编程」,我觉得这是每个程序员都应该会的. 并发编程 这个系列,我准备了将近一个星期,从知识点 ...
- [并发编程] - Executor框架#ThreadPoolExecutor源码解读03
文章目录 Pre execute源码分析 addWorker()解读 Worker解读 Pre [并发编程] - Executor框架#ThreadPoolExecutor源码解读02 说了一堆结论性 ...
- [并发编程] - Executor框架#ThreadPoolExecutor源码解读02
文章目录 Pre 线程池的具体实现 线程池的创建 参数解读 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactor ...
最新文章
- 【力扣网练习题】最长公共前缀
- 【推荐系统】深入理解YouTube推荐系统算法
- 2014\Province_C_C++_B\3 李白打酒
- vim 下的 ex 指令(底行命令模式下)
- 导入表格只有一行 帆软_万万没想到!把x个表格合合合合成一份,10分钟就搞定...
- SmartImage图片第三方控件android
- 【BZOJ 1031】[JSOI2007]字符加密Cipher(后缀数组模板)
- 登录账号用户名判断_如何设计 QQ、微信等第三方账号登陆 ?
- 142.PHP session 阻塞问题
- mysql数据库关联查询慢_mysql数据库多表关联查询的慢SQL优化
- spark MLlib机器学习教程
- Chrome OS Factory开发测试流程
- Tilera 服务器上hadoop单机版测试
- 我有一个梦,叫“禾下乘凉梦“!
- 《华盛人》技术服务支持
- 服务器kvm切换器怎么使用?
- 迷宫算法总结(最短路径)BFS宽度优先
- html5支持4k视频,【4K电影大礼包】目前压缩最好的五部4KHEVC(H.265)格式电影
- 用户体验设计学习总结(下)
- 一块硬盘装了黑苹果 一块硬盘装了win7_英特尔NUC8黑苹果教程(详细)