背景:ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。这种思想值得学习。

使用

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

下面的UML类图显示了ForkJoinPool、ForkJoinTask之间的关系:

举例

以还行没有返回值的“大任务”(简单低打印1~300的数值)为例,程序将一个“大任务”拆分成多个“小任务”,并将任务交给ForkJoinPool来执行

package work1201.basic;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;/*** ClassName:ForkJoinPoolAction <br/>* Function: 使用ForkJoinPool完成一个任务的分段执行* 简单的打印0-300的数值。用多线程实现并行执行*/
public class ForkJoinPoolAction {public static void main(String[] args) throws Exception{PrintTask task = new PrintTask(0, 300);//创建实例,并执行分割任务ForkJoinPool pool = new ForkJoinPool();pool.submit(task);//线程阻塞,等待所有任务完成pool.awaitTermination(2, TimeUnit.SECONDS);pool.shutdown();}
}/*** ClassName: PrintTask <br/>* Function: 继承RecursiveAction来实现“可分解”的任务。*/
class PrintTask extends RecursiveAction{private static final int THRESHOLD = 50; //最多只能打印50个数private int start;private int end;public PrintTask(int start, int end) {super();this.start = start;this.end = end;}@Overrideprotected void compute() {        if(end - start < THRESHOLD){for(int i=start;i<end;i++){System.out.println(Thread.currentThread().getName()+"的i值:"+i);}}else {int middle =(start+end)/2;PrintTask left = new PrintTask(start, middle);PrintTask right = new PrintTask(middle, end);//并行执行两个“小任务”left.fork();right.fork();}        }
}

执行结果:

ForkJoinPool-1-worker-1的i值:262
ForkJoinPool-1-worker-7的i值:75
ForkJoinPool-1-worker-7的i值:76
ForkJoinPool-1-worker-5的i值:225
ForkJoinPool-1-worker-3的i值:187
ForkJoinPool-1-worker-6的i值:150
ForkJoinPool-1-worker-6的i值:151
ForkJoinPool-1-worker-6的i值:152
ForkJoinPool-1-worker-6的i值:153
ForkJoinPool-1-worker-6的i值:154
......
因为我的电脑是i7处理器,一共8个cpu,观察线程的名称可以发现,8个cpu都在运行。

通过RecursiveTask的返回值,来对一个长度为100的数组元素进行累加。

package work1201.basic;import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;/*** ClassName:ForJoinPollTask <br/>* Function: 对一个长度为100的元素值进行累加   */
public class ForJoinPollTask {public static void main(String[] args) throws Exception {int[] arr = new int[100];Random random = new Random();int total =0;//初始化100个数组元素for(int i=0,len = arr.length;i<len;i++){int temp = random.nextInt(20);//对数组元素赋值,并将数组元素的值添加到sum总和中total += (arr[i]=temp);}System.out.println("初始化数组总和:"+total);SumTask task = new SumTask(arr, 0, arr.length);
//        创建一个通用池,这个是jdk1.8提供的功能ForkJoinPool pool = ForkJoinPool.commonPool();Future<Integer> future = pool.submit(task); //提交分解的SumTask 任务System.out.println("多线程执行结果:"+future.get());pool.shutdown(); //关闭线程池              }
}/*** ClassName: SumTask <br/>* Function: 继承抽象类RecursiveTask,通过返回的结果,来实现数组的多线程分段累累加*  RecursiveTask 具有返回值*/
class SumTask extends RecursiveTask<Integer>{private static final int THRESHOLD = 20; //每个小任务 最多只累加20个数private int arry[];private int start;private int end;        /*** Creates a new instance of SumTask.* 累加从start到end的arry数组* @param arry* @param start* @param end*/public SumTask(int[] arry, int start, int end) {super();this.arry = arry;this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum =0;//当end与start之间的差小于threshold时,开始进行实际的累加if(end - start <THRESHOLD){for(int i= start;i<end;i++){sum += arry[i];}return sum;}else {//当end与start之间的差大于threshold,即要累加的数超过20个时候,将大任务分解成小任务int middle = (start+ end)/2;SumTask left = new SumTask(arry, start, middle);SumTask right = new SumTask(arry, middle, end);//并行执行两个 小任务left.fork();right.fork();//把两个小任务累加的结果合并起来return left.join()+right.join();}        }
}

执行结果:

初始化数组总和:1008
多线程执行结果:1008

分析

在Java 7中引入了一种新的线程池:ForkJoinPool。

它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。

这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。

比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。

那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

以上程序的关键是fork()和join()方法。在ForkJoinPool使用的线程中,会使用一个内部队列来对需要执行的任务以及子任务进行操作来保证它们的执行顺序。

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

ps:ForkJoinPool在执行过程中,会创建大量的子任务,导致GC进行垃圾回收,这些是需要注意的。

转载自:https://www.cnblogs.com/lixuwu/p/7979480.html

ForkJoinPool 详解相关推荐

  1. Java并发编程(07):Fork/Join框架机制详解

    本文源码:GitHub·点这里 || GitEE·点这里 一.Fork/Join框架 Java提供Fork/Join框架用于并行执行任务,核心的思想就是将一个大任务切分成多个小任务,然后汇总每个小任务 ...

  2. JAVA六大线程池详解

    1)Executor执行器 详解 线程池的最顶层接口,内部就一个execute方法yong书写线程的具体执行方式 public class testExecutor implements Execut ...

  3. java的数组与Arrays类源码详解

    java的数组与Arrays类源码详解 java.util.Arrays 类是 JDK 提供的一个工具类,用来处理数组的各种方法,而且每个方法基本上都是静态方法,能直接通过类名Arrays调用. 类的 ...

  4. 102-并发编程详解(中篇)

    这里续写上一章博客 Phaser新特性 : 特性1:动态调整线程个数 CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行期间动态地 调整要同 ...

  5. JAVA SE知识整合(暂时完结 五万七字)后续分点详解

    目录 1.别再问为什么在类里面写个sysout语句爆红了,类里面有且只有五个成分: 2.面向对象三大特征: 封装,继承,多态 (扫盲扫盲,别这个都不知道) 3.讲一下static这个很重要的关键词 4 ...

  6. JUC详解(各种乱七八糟的锁)

    文章目录 1JUC 2回顾多线程知识 3Lock锁(重点) 4Lock锁 5JUC版的生产者与消费者问题(虚假唤醒) 6有序线程 7八锁问题 8CopyOnWriteArrayList(读写复制) 9 ...

  7. Java JUC并发编程详解

    Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...

  8. java线程池详解及五种线程池方法详解

    基础知识 Executors创建线程池 Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThrea ...

  9. Netty详解(持续更新中)

    Netty详解 1. Netty概述 1.1 Netty简介 1.2 原生NIO问题 1.3 Netty特点 1.4 Netty应用场景 1.3 Netty版本说明 2. Java IO模型 2.1 ...

最新文章

  1. CentOs7中安装python3.7.6
  2. JVM内存管理:深入Java内存区域与OOM
  3. java jedis使用_Java中使用Jedis操作Redis
  4. WebRTC 中收集音视频编解码能力
  5. Squid反向代理加速缓存+负载均衡实验架构
  6. 前端学习(1327):node全局对象global
  7. 管理学习笔记(1)——高效团队的五大关键
  8. 如何正确的开始用Go编程
  9. linux6 rac 11g,oracle linux 6.操作系统oracle 11g rac
  10. 有效管理云计算成本的多个措施
  11. hibernate_使用c3p0连接池配置
  12. H5页面通过浏览器调用摄像头拍照
  13. 电脑php的基本方法是什么,做文员的基本电脑操作是什么
  14. Word 样式模板复制到另一文档
  15. Python学习一:python语言基础
  16. Python人脸识别智能考勤系统 (供源码,附报告)(可答疑,可调试)
  17. SOCK5代理服务器透明穿透技术
  18. jQuery_Ajax下载服务器文件流的方法
  19. Kubernetes 基于Kubectl的GitOps CI/CD
  20. DataWhale_Pandas Task10 时序数据

热门文章

  1. 使用BAT脚本实现一键配置Mysql服务器和开启远程服务
  2. HTML+CSS+JS体育网页制作 DW静态网页设计(篮球NBA 5页 带psd文件 )
  3. 如何通过三视图判断立方体个数_“三视图”“小正方体个数”一篇搞定!
  4. oracle批量查询更新,Oracle批量查询、删除、更新使用BULK COLLECT提高效率
  5. 从pdf复制文字到word中的问题
  6. 流程控制之顺数结构和选择结构
  7. css3 和html5实例,HTML5和CSS3实例教程
  8. 亚马逊Kindle电子书在线管理网站,管理我的内容和设备入口,如何进入
  9. 关于身份证号或者手机号,密码 隐藏中间几位数字
  10. 2020python考试题库_大学mooc2020用Python玩转数据期末考试公众号答案