并发编程学习之ForkJoinPool分支合并
目录
一、简介
二、常用API
三、使用示例
四、ForkJoinPool原理
五、总结
一、简介
JDK7引入了一种新的并发框架 - Fork/Join Framework分支/合并框架,同时引入了一种新的线程池ForkJoinPool。
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {}
ForkJoinPool 是对ExecutorService的补充,在某些应用场景下性能比 ExecutorService 更好,ForkJoinPool特别适合“分而治之”的算法,如递归操作等等。
JDK官网对ForkJoinPool的解释如下:
- 运行ForkJoinTask的ExecutorService。ForkJoinPool为非ForkJoinTask客户的提交以及管理和监控操作提供了入口点。
- ForkJoinPool与其他类型的ExecutorService的主要区别在于使用了工作窃取:池中的所有线程都试图查找和执行提交给池的任务和/或由其他活动任务创建的任务(如果不存在工作,则最终阻塞等待工作)。当大多数任务生成其他子任务(就像大多数forkjointask一样),以及许多小任务从外部客户端提交到池时,这就支持了高效的处理。
- 对于大多数应用程序,都可以使用静态commonPool()。公共池由任何没有显式提交到指定池的ForkJoinTask使用。使用公共池通常会减少资源的使用(它的线程在不使用期间缓慢地回收,并在随后使用时恢复)。
二、常用API
【a】构造方法:ForkJoinPool类一共有三个构造方法,如下表所示:
ForkJoinPool() 创建一个并行度等于Runtime.availableProcessors()的ForkJoinPool,使用默认的线程工厂,没有UncaughtExceptionHandler和非异步后进先出处理模式 |
ForkJoinPool(int parallelism) 使用指定的并行级别、默认的线程工厂、没有UncaughtExceptionHandler和非异步LIFO处理模式创建一个ForkJoinPool |
ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) 使用给定的参数创建一个ForkJoinPool |
【b】常用方法
Modifier and Type |
Method and Description |
boolean |
awaitTermination(long timeout, TimeUnit unit) 阻塞,直到所有任务在关闭请求后完成执行,或超时发生,或当前线程被中断(以先发生的情况为准) |
static ForkJoinPool |
commonPool() 返回公共池实例 |
protected int |
drainTasksTo(Collection<? super ForkJoinTask<?>> c) 从调度队列中删除所有未执行的提交和分叉任务,并将它们添加到给定的集合中,而不更改它们的执行状态 |
void |
execute(ForkJoinTask<?> task) 安排(异步)执行给定的任务 |
void |
execute(Runnable task) 在将来的某个时候执行给定的命令 |
int |
getActiveThreadCount() 返回当前正在窃取或执行任务的线程数量的估计值 |
boolean |
getAsyncMode() 如果此池对从未连接的分叉任务使用本地先进先出调度模式,则返回true |
static int |
getCommonPoolParallelism() 返回公共池的目标并行度级别 |
ForkJoinPool.ForkJoinWorkerThreadFactory |
getFactory() 归还用于建造worker的工厂 |
int |
getParallelism() 返回此池的目标并行度级别 |
int |
getPoolSize() 返回已启动但尚未终止的工作线程的数量 |
int |
getQueuedSubmissionCount() 返回提交到此池的尚未开始执行的任务数量的估计值 |
long |
getQueuedTaskCount() 返回工作线程当前在队列中持有的任务总数的估计值(但不包括提交到池中尚未开始执行的任务) |
int |
getRunningThreadCount() 返回未阻塞的工作线程数量的估计值,这些工作线程正在等待连接任务或其他托管同步 |
long |
getStealCount() 返回另一个线程从一个线程的工作队列中窃取的任务总数的估计值 |
Thread.UncaughtExceptionHandler |
getUncaughtExceptionHandler() 返回因执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序 |
boolean |
hasQueuedSubmissions() 如果提交到此池的任何任务尚未开始执行,则返回true |
<T> T |
invoke(ForkJoinTask<T> task) 执行给定的任务,完成后返回其结果 |
<T> List<Future<T>> |
invokeAll(Collection<? extends Callable<T>> tasks) 执行给定的任务,返回一个结果列表,其中包含所有任务完成时的状态和结果 |
boolean |
isQuiescent() 如果所有工作线程当前都处于空闲状态,则返回true |
boolean |
isShutdown() 如果此池已关闭,则返回true |
boolean |
isTerminated() 如果所有任务在关闭后都已完成,则返回true |
boolean |
isTerminating() 如果终止过程已经开始但尚未完成,则返回true |
static void |
managedBlock(ForkJoinPool.ManagedBlocker blocker) 运行给定的可能阻塞的任务 |
protected ForkJoinTask<?> |
pollSubmission() 删除并返回下一个未执行的提交(如果有提交的话) |
void |
shutdown() 可能会启动有序关闭,在此过程中执行以前提交的任务,但不接受任何新任务 |
List<Runnable> |
shutdownNow() 可能尝试取消和/或停止所有任务,并拒绝随后提交的所有任务 |
<T> ForkJoinTask<T> |
submit(Callable<T> task) 提交一个返回值的任务以供执行,并返回一个表示该任务的未决结果的Future |
<T> ForkJoinTask<T> |
submit(ForkJoinTask<T> task) 提交一个ForkJoinTask执行 |
ForkJoinTask<?> |
submit(Runnable task) 提交一个可运行任务以供执行,并返回一个表示该任务的Future |
<T> ForkJoinTask<T> |
submit(Runnable task, T result) 提交一个可运行任务以供执行,并返回一个表示该任务的Future |
String |
toString() 返回一个字符串,该字符串标识这个池及其状态,包括运行状态、并行度级别以及工作者和任务计数的指示 |
三、使用示例
下面通过几个示例说明ForkJoinPool的使用以及与其他方式的性能做一个比较。
案例:计算1至10000000的正整数之和。
下面我们先使用for循环来实现,并记录其运算时间。
【a】使用for循环实现
public class T14_ForkJoinPool {public static void main(String[] args) {long sum = 0;long startTime = System.currentTimeMillis();for (long i = 1; i <= 10000000; i++) {sum += i;}long endTime = System.currentTimeMillis();System.out.println("sum = " + sum + " --->大约耗时:" + (endTime - startTime)+ "毫秒");}}
运行结果如下所示,可以看到,使用for循环计算大概花费了27毫秒。
sum = 50000005000000 --->大约耗时:27毫秒
【b】使用多线程实现
public class T14_ForkJoinPool {public static void main(String[] args) {//核心线程数 以处理器数量作为线程池大小int corePoolSize = Runtime.getRuntime().availableProcessors();//创建ExecutorServiceExecutorService executorService = Executors.newFixedThreadPool(corePoolSize);//存放所有的任务集合List<Future<Long>> results = new ArrayList<>();//每个线程需要处理的数值范围大小int size = 10000000 / corePoolSize;for (int i = 0; i < corePoolSize; i++) {//第一个任务:[0,2499999]//第二个任务:[2500000,4999999]//第三个任务:[5000000,7499999]//第四个任务:[7500000,1000000]//开始位置int start = i * size;//结束位置 : 最后一次(i == (corePoolSize - 1)),直接到1000000.int end = (i == (corePoolSize - 1)) ? 10000000 : ((i + 1) * size - 1);results.add(executorService.submit(new CalThread(start, end)));}long startTime = System.currentTimeMillis();long sum = 0L;for (Future<Long> f : results) {try {//阻塞程序获取执行结果sum += f.get();} catch (Exception ignore) {}}long endTime = System.currentTimeMillis();System.out.println("sum = " + sum + " --->大约耗时:" + (endTime - startTime) + "毫秒");}
}/*** 因为需要获取线程执行结果,所以此处需要使用Callable接口实现多线程*/
class CalThread implements Callable<Long> {/*** 起始位置*/private int start;/*** 结束位置*/private int end;public CalThread(int start, int end) {this.start = start;this.end = end;}@Overridepublic Long call() throws Exception {long total = 0;for (int i = start; i <= end; i++) {total += i;}return total;}
}
运行结果:
sum = 50000005000000 --->大约耗时:30毫秒
由运行结果看,好像多线程运行起来比for循环还稍微慢了点,花费了大概30毫秒,跟笔者电脑也有一些关系。
【c】使用ForkJoinPool实现
下面我们来使用ForkJoinPool实现求和操作,ForkJoinPool中主要依靠RecursiveTask和RecursiveAction来实现。
首先需要创建ForkJoinPool实例,然后就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定ForkJoinTask任务,ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。
UML图如下:
两者的区别:
- RecursiveTask:有返回值,各个子任务结束后,返回结果,大任务可将小任务返回结果进行整合;
- RecursiveAction:无返回值,只是执行任务;
由于我们需要有返回值,所以我们这里使用RecursiveTask,相关代码如下:
public class T07_ForkJoinPool {public static void main(String[] args) {long size = 10000L;ForkJoinPool forkJoinPool = new ForkJoinPool();long startTime = System.currentTimeMillis();ForkJoinTask<Long> forkJoinTask = new ForkJoinSumCalculate(0L, 10000000L, size);Long sum = forkJoinPool.invoke(forkJoinTask);long endTime = System.currentTimeMillis();System.out.println("sum = " + sum + " --->大约耗时:" + (endTime - startTime) + "毫秒");}}class ForkJoinSumCalculate extends RecursiveTask<Long> {/*** 起始位置*/private long start;/*** 结束位置*/private long end;/*** 每个线程处理的数值范围大小*/private long size;public ForkJoinSumCalculate(long start, long end, long size) {this.start = start;this.end = end;this.size = size;}@Overrideprotected Long compute() {long length = end - start;//如果小于size,直接使用for循环求和if (length <= size) {long sum = 0L;for (long i = start; i <= end; i++) {sum += i;}return sum;} else {//递归拆分任务long middle = (start + end) / 2;ForkJoinSumCalculate leftTask = new ForkJoinSumCalculate(start, middle, size);//拆分,同时压入线程队列leftTask.fork();ForkJoinSumCalculate rightTask = new ForkJoinSumCalculate(middle + 1, end, size);rightTask.fork();return leftTask.join() + rightTask.join();}}
}
运行结果:下面是两次运行结果比较,一次用的家里电脑,一次用的公司的电脑,差距有点明显。
sum = 50000005000000 --->大约耗时:58毫秒 (家里电脑)
sum = 50000005000000 --->大约耗时:15毫秒 (公司电脑)
四、ForkJoinPool原理
ForkJoinPool线程池思想:将大任务分解成若干个小任务,当小任务均执行结束后,将任务做一个整合。分支合并思想大体如下图:
ForkJoinPool 算法是基于工作窃取算法(work-stealing algorithm):
所谓工作窃取,简单理解就是线程A空闲时,去偷线程B的任务,拿过来执行。
ForkJoinPool 中每个工作线程都维护的是一个双向队列Deque的工作队列,用来存放任务ForkJoinTask。每个工作线程在遇到 fork()时,会将任务插入到工作队列尾部,同时工作线程在工作时,采用后进先出(LIFO)方式取出任务来执行。如果本线程存在空闲的线程,会尝试去窃取别的线程工作队列中的任务拿过来执行,并且采用先进先出( FIFO)方式的去偷别人的任务,即窃取的任务的别的线程工作队列的队列首部任务。在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
用图来描述上面的话大概就如下:
五、总结
本文简单介绍了ForkJoinPool分支合并框架的概念、使用方法以及工作窃取原理,在某些场合如递归运算,大数据量的计算等使用ForkJoinPool来分解大任务是一个不错的选择。
并发编程学习之ForkJoinPool分支合并相关推荐
- java并发编程学习一
java并发编程学习一 什么是进程和线程? 进程是操作系统进行资源分配的最小单位 进程跟进程之间的资源是隔离的,同一个进程之中的线程可以共享进程的资源. 线程是进程的一个实体,是CPU 调度和分派的基 ...
- libevent c++高并发网络编程_高并发编程学习(2)——线程通信详解
前序文章 高并发编程学习(1)--并发基础 - https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-j ...
- 高并发编程学习(2)——线程通信详解
前序文章 高并发编程学习(1)--并发基础 - https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-j ...
- java中解决脏读_java并发编程学习之脏读代码示例及处理
使用interrupt()中断线程 当一个线程运行时,另一个线程可以调用对应的Thread对象的interrupt()方法来中断它,该方法只是在目标线程中设置一个标志,表示它已经被中断,并立即 ...
- java公平锁和非公平锁_java并发编程学习之再谈公平锁和非公平锁
在java并发编程学习之显示锁Lock里有提过公平锁和非公平锁,我们知道他的使用方式,以及非公平锁的性能较高,在AQS源码分析的基础上,我们看看NonfairSync和FairSync的区别在什么地方 ...
- Java高并发编程学习(三)java.util.concurrent包
简介 我们已经学习了形成Java并发程序设计基础的底层构建块,但对于实际编程来说,应该尽可能远离底层结构.使用由并发处理的专业人士实现的较高层次的结构要方便得多.要安全得多.例如,对于许多线程问题,可 ...
- java并行任务,Java 并发编程学习(五):批量并行执行任务的两种方式
Java 并发编程学习(五):批量并行执行任务的两种方式 背景介绍 有时候我们需要执行一批相似的任务,并且要求这些任务能够并行执行.通常,我们的需求会分为两种情况: 并行执行一批任务,等待耗时最长的任 ...
- 基于《狂神说Java》JUC并发编程--学习笔记
前言: 本笔记仅做学习与复习使用,不存在刻意抄袭. -------------------------------------------------------------------------- ...
- 【并发入门】Java 并发编程学习笔记
注:该笔记主要记录自 B站 up主 遇见狂神说的个人空间_哔哩哔哩_bilibili 1.什么是 JUC Java 工具类中的 并发编程包 学习:源码 + 官方文档 业务:普通的线程代码 Thread ...
- Java并发编程学习 + 原理分析(建议收藏)
总结不易,如果对你有帮助,请点赞关注支持一下 微信搜索程序dunk,关注公众号,获取博客源码 Doug Lea是一个无私的人,他深知分享知识和分享苹果是不一样的,苹果会越分越少,而自己的知识并不会因为 ...
最新文章
- PHP更新数据库记录
- YOLOv5x6模型来了! 同样支持CPU上ONNX部署与推理
- python项目实战三个小实例
- vmware不能和主机相连
- ElasticSearch创建文档
- 欢迎您参加_ADT技术培训营
- [原][歌曲]感动的歌曲排序
- 天池 在线编程 高效作业处理服务(01背包DP)
- 电子计算机之父冯.诺依曼的主要贡献,冯•诺依曼的贡献有哪些?
- 知道ThreadLocal吗?一起聊聊到底有啥用
- 判断回文(Java和JavaScript)
- android 识别车牌颜色,Android、ios移动端车牌识别sdk / 车牌识别API
- 如何批量压缩图片大小?
- MacBook在任意文件夹目录打开终端
- SqlServer 对象名无效的解决方法
- android需要电脑输入吗,Android 远程输入法,用电脑给手机输入文字
- JAVA 浏览器下载excel,自定义样式:合并单元格,设置多种背景填充颜色,冻结窗格
- 【PG074】1 简略学习Aurora 64B/66B IP核
- VC无负担实现XP风格界面
- mysql 怎么存经纬度_mysql存储地图经纬度的表怎么设计?
热门文章
- android图像与动画处理,在Android和iPhone上对照片进行动画处理的7种最佳应用 | MOS86...
- Deepracer 学了就能云驾驭赛车? Deepracer机器学习进阶版干货分享!
- 转载:图解Raft 让一致性算法变得更简单
- 微型计算机主机作用,微型计算机的主机包括()。
- 旋转图像 leetcode
- 计算广告、推荐系统常用语
- 使用折半查找法查找数组中的元素
- 12满秩分解与奇异值分解(2)
- java多线程(2)----继承的方式创建多线程
- svm出现浮点数与字符串不能计算的错误(label必须为 整形或浮点型)