一、什么是 Fork-Join

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,这种开发方法也叫分治编程。分治编程可以极大地利用CPU资源,提高任务执行的效率,也是目前与多线程有关的前沿技术。

框架图:

  • fork():利用另一个 ForkJoinPool 线程异步执行新创建的子任务
  • join():读取第一个子任务的结果,尚未完成就等待

二、传统的分治编程会遇到什么问题

分治的原理上面已经介绍了,就是切割大任务成小任务来完成。看起来好像也不难实现啊!为什么专门弄一个新的框架呢?
我们先看一下,在不使用 Fork-Join 框架时,使用普通的线程池是怎么实现的。

  1. 我们往一个线程池提交了一个大任务,规定好任务切割的阀值。
  2. 由线程池中线程(假设是线程A)执行大任务,发现大任务的大小大于阀值,于是切割成两个子任务,并调用 submit()提交到线程池,得到返回的子任务的 Future。
  3. 线程A就调用返回的 Future 的 get() 方法阻塞等待子任务的执行结果。
  4. 池中的其他线程(除线程A外,线程A被阻塞)执行两个子任务,然后判断子任务的大小有没有超过阀值,如果超过,则按照步骤2继续切割,否则,才计算并返回结果。

看起来一切都很美好。真的吗?别忘了, 每一个切割任务的线程(如线程A)都被阻塞了,直到其子任务完成,才能继续往下运行 。如果任务太大了,需要切割多次,那么就会有多个线程被阻塞,性能将会急速下降。更糟糕的是,如果你的线程池的线程数量是有上限的,极可能会造成池中所有线程被阻塞,线程池无法执行任务。

三、普通线程池实现分治时阻塞的问题

public class NormalThreadPoolDivideAndConquer {//固定大小的线程池,池中线程数量为3static ExecutorService fixPoolExecutors = Executors.newFixedThreadPool(3);public static void main(String[] args) throws InterruptedException, ExecutionException {//计算 1+2+...+10  的结果CountTaskCallable task = new CountTaskCallable(1,10);//提交主人翁Future<Integer> future = fixPoolExecutors.submit(task);System.out.println("计算的结果:"+future.get());}
}
class CountTaskCallable implements Callable<Integer> {//设置阀值为2private static final int THRESHOLD = 2;private int start;private int end;public CountTaskCallable(int start, int end) {super();this.start = start;this.end = end;}@Overridepublic Integer call() throws Exception {int sum = 0;//判断任务的大小是否超过阀值,也即是两个相加的数的差值不能大于2,在这里意味着需要分为大于4个子任务进行计算,而线程池只有3个,机会造成阻塞boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {System.out.println("切割的任务:"+start+"加到"+end+"   执行此任务的线程是 "+Thread.currentThread().getName());int middle = (start + end) / 2;CountTaskCallable leftTaskCallable = new CountTaskCallable(start, middle);CountTaskCallable rightTaskCallable = new CountTaskCallable(middle + 1, end);// 将子任务提交到线程池中Future<Integer> leftFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(leftTaskCallable);Future<Integer> rightFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(rightTaskCallable);//阻塞等待子任务的执行结果int leftResult = leftFuture.get();int rightResult = rightFuture.get();// 合并子任务的执行结果sum = leftResult + rightResult;}return sum;}
}
  • 运行结果:
切割的任务:1加到10   执行此任务的线程是 pool-1-thread-1
切割的任务:1加到5    执行此任务的线程是 pool-1-thread-2
切割的任务:6加到10   执行此任务的线程是 pool-1-thread-3

池的线程只有三个,当任务分割了三次后,池中的线程也就都被阻塞了,无法再执行任何任务,一直卡着动不了。为了解决这个问题,工作窃取算法呼之欲出

四、工作窃取算法

针对上面的问题,Fork-Join 框架使用了“工作窃取(work-stealing)”算法。工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。在《Java 并发编程的艺术》对工作窃取算法的解释:

使用工作窃取算法有什么优势呢?

  • 假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
  1. Fork-Join 框架使用工作窃取算法
  1. Fork-Join 框架的线程池ForkJoinPool 的任务分为“外部任务” 和 “内部任务”。
  2. “外部任务”是放在ForkJoinPool 的全局队列里;
  3. ForkJoinPool 池中的每个线程都维护着一个内部队列,用于存放“内部任务”。
  4. 线程切割任务得到的子任务就会作为“内部任务”放到内部队列中。
  5. 当此线程要想要拿到子任务的计算结果时,先判断子任务有没有完成,如果没有完成,则再判断子任务有没有被其他线程“窃取”,一旦子任务被窃取了则去执行本线程“内部队列”的其他任务,或者扫描其他的任务队列,窃取任务,如果子任务没有被窃取,则由本线程来完成。
  6. 最后,当线程完成了其“内部任务”,处于空闲的状态时,就会去扫描其他的任务队列,窃取任务
  1. 工作窃取算法的优点
    Fork-Join 框架中的工作窃取算法的优点可以总结为以下两点:
  1. 线程是不会因为等待某个子任务的完成或者没有内部任务要执行而被阻塞等待、挂起,而是会扫描所有的队列,窃取任务,直到所有队列都为空时,才会被挂起。
  2. Fork-Join框架在多CPU的环境下,能提供很好的并行性能。在使用普通线程池的情况下,当CPU不再是性能瓶颈时,能并行地运行多个线程,然而却因为要互斥访问一个任务队列而导致性能提高不上去。(所以ForkJoin适合在多核环境下,单核环境使用ForkJoin没什么意思。)而 Fork-Join框架为每个线程为维护着一个内部任务队列,以及一个全局的任务队列,而且任务队列都是双向队列,可从首尾两端来获取任务,极大地减少了竞争的可能性,提高并行的性能。

五、Fork-Join 框架的使用介绍

  1. Fork/Join有三个核心类:
  • ForkJoinPool: 执行任务的线程池,继承了 AbstractExecutorService 类。
  • ForkJoinWorkerThread:执行任务的工作线程(即ForkJoinPool线程池里的线程)。每个线程都维护着一个内部队列,用于存放“内部任务”。继承了 Thread类。
  • ForkJoinTask: 一个用于ForkJoinPool的任务抽象类。实现了 Future 接口

因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不会继承ForkJoinTask来实现自定义的任务,而是继承ForkJoinTask的两个子类,实现 compute() 方法:

  • RecursiveTask: 子任务带返回结果时使用
  • RecursiveAction: 子任务不带返回结果时使用

compute 方法的实现模式一般是:

if 任务足够小直接返回结果
else分割成N个子任务依次调用每个子任务的fork方法执行子任务依次调用每个子任务的join方法合并执行结果

六、Fork-Join 例子演示

  • 计算 1+2+…+12 的结果。

使用Fork/Join框架首先要考虑到的是如何分割任务,如果我们希望每个子任务最多执行两个数的相加,那么我们设置分割的阈值是2,由于是12个数字相加。同时,观察执行任务的线程名称,理解工作窃取算法的实现。

public class CountTest {public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool forkJoinPool = new ForkJoinPool();//创建一个计算任务,计算 由1加到12CountTask countTask = new CountTask(1, 12);Future<Integer> future = forkJoinPool.submit(countTask);System.out.println("最终的计算结果:" + future.get());}
}class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= THRESHOLD;//任务已经足够小,可以直接计算,并返回结果if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}System.out.println("执行计算任务,计算    " + start + "到 " + end + "的和  ,结果是:" + sum + "   执行此任务的线程:" + Thread.currentThread().getName());} else { //任务过大,需要切割System.out.println("任务过大,切割的任务:  " + start + "加到 " + end + "的和       执行此任务的线程:" + Thread.currentThread().getName());int middle = (start + end) / 2;//切割成两个子任务CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);//执行子任务leftTask.fork();rightTask.fork();//等待子任务的完成,并获取执行结果int leftResult = leftTask.join();int rightResult = rightTask.join();//合并子任务sum = leftResult + rightResult;}return sum;}
}
  • 运行结果:
任务过大,切割的任务: 1加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-1
任务过大,切割的任务: 7加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-3
任务过大,切割的任务: 1加到 6的和 执行此任务的线程:ForkJoinPool-1-worker-2
执行计算任务,计算 7到 9的和 ,结果是:24 执行此任务的线程:ForkJoinPool-1-worker-3
执行计算任务,计算 1到 3的和 ,结果是:6 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 4到 6的和 ,结果是:15 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 10到 12的和 ,结果是:33 执行此任务的线程:ForkJoinPool-1-worker-3
最终的计算结果:78

从结果可以看出:

提交的计算任务是由线程1执行,线程1进行了第一次切割,切割成两个子任务 “7加到12“ 和
”1加到6“,并提交这两个子任务。然后这两个任务便被 线程2、线程3 给窃取了。线程1 的内部队列中已经没有任务了,这时候,线程2、线程3
也分别进行了一次任务切割并各自提交了两个子任务,于是线程1也去窃取任务(这里窃取的都是线程2的子任务)。

  • RecursiveAction 演示
    遍历指定目录(含子目录)找寻指定类型文件
public class FindDirsFiles extends RecursiveAction{/*** 当前任务需要搜寻的目录*/private File path;public FindDirsFiles(File path) {this.path = path;}public static void main(String [] args){try {// 用一个 ForkJoinPool 实例调度总任务ForkJoinPool pool = new ForkJoinPool();FindDirsFiles task = new FindDirsFiles(new File("D:/"));//异步调用pool.execute(task);System.out.println("Task is Running......");Thread.sleep(1);int otherWork = 0;for(int i=0;i<1000000;i++){otherWork = otherWork+i;}System.out.println("Main Thread done sth......,otherWork=" + otherWork);//阻塞的方法task.join();System.out.println("Task end");} catch (Exception e) {e.printStackTrace();}}@Overrideprotected void compute() {List<FindDirsFiles> subTasks = new ArrayList<>();File[] files = path.listFiles();if(files!=null) {for(File file:files) {if(file.isDirectory()) {subTasks.add(new FindDirsFiles(file));}else {//遇到文件,检查if(file.getAbsolutePath().endsWith("txt")) {System.out.println("文件:"+file.getAbsolutePath());}}}if(!subTasks.isEmpty()) {for (FindDirsFiles subTask : invokeAll(subTasks)) {//等待子任务执行完成subTask.join();}}}   }
}

参考文章
参考文章

分支合并 Fork-Join 框架相关推荐

  1. 基于Fork/Join框架实现对大型浮点数数组排序(归并算法和插入排序算法)

    分支/合并框架 说明 重点是那个浮点数数组排序的例子,从主函数展开,根据序号看 1.GitHub代码欢迎star.你们轻轻的一点,对我鼓励特大,我有一个习惯,看完别人的文章是会点赞的. 2.个人认为学 ...

  2. 基于Fork/Join框架实现对大型浮点数数组排序(归并算法和插入排序算法) 1

    分支/合并框架 说明 重点是那个浮点数数组排序的例子,从主函数展开,根据序号看 1.GitHub代码欢迎star.你们轻轻的一点,对我鼓励特大,我有一个习惯,看完别人的文章是会点赞的. 2.个人认为学 ...

  3. 【JUC】第六章 Fork/Join 框架、CompletableFuture

    第六章 Fork/Join 框架.CompletableFuture 文章目录 第六章 Fork/Join 框架.CompletableFuture 一.Fork/Join 框架 1.简介 2.For ...

  4. Java8 - 一文搞定Fork/Join 框架

    文章目录 概述 CPU密集型 vs IO密集型 计算密集型任务 IO密集型 简单示例 Fork/Join常用的类 RecursiveTask 实现 并行计算 RecursiveAction Fork/ ...

  5. java forkjoinpool_Java并发——Fork/Join框架与ForkJoinPool

    为了防止无良网站的爬虫抓取文章,特此标识,转载请注明文章出处.LaplaceDemon/ShiJiaqi. http://www.cnblogs.com/shijiaqi1066/p/4631466. ...

  6. 《尚硅谷高级技术之JUC高并发编程》学习笔记11—— Fork / Join 框架

    文章目录 Fork / Join 框架简介 fork() 方法 join() 方法 Fork / Join 框架的异常处理 入门案例 总结 Fork / Join 框架简介 Fork / Join 它 ...

  7. java fork join原理_细说Fork/Join框架

    什么是Fork/Join框架? Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果的框架.Fork就是把一个大 ...

  8. 使用Fork/Join框架优化归并排序

    Fork/Join框架是一个非常有意思的并发框架,它非常适合于处理类似归并排序这种将大的问题分解成多个小问题,并将结果进行合并的情况,这次我们就使用Fork/Join框架来优化我们的归并排序.查看更多 ...

  9. Fork/Join框架

    Fork就是把大任务切分为若干子任务并行执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果. 要使用ForkJoin框架,必须首先创建ForkJoin任务.它提供在任务中执行fork ...

  10. 并发编程-22J.U.C组件拓展之Fork/Join框架

    文章目录 Fork/Join框架概述 工作窃取算法 优点 缺点 Fork/Join框架的设计 ForkJoinTask ForkJoinPool 示例 Fork/Join框架的异常处理 代码 Fork ...

最新文章

  1. pom.xml设置mysql连接_maven工程配置pom.xml实现mybatis的访问数据库操作
  2. Linxu安装Tomcat与Jdk并卸载自带OpenJdk
  3. 怎么提高es服务器的性能,es集群服务器配置规则是怎样的?什么是es集群
  4. 一、Vue基础语法学习笔记系列——插值操作(Mustache语法、v-once、v-html、v-text、v-pre、v-cloak)、绑定属性v-bind(绑定class、style)、计算属性
  5. JVM003_属性表
  6. 原来MySQL面试还会问这些...
  7. 讯时网关IP对接PBX
  8. 构建azure对话机器人_如何在5分钟内使用Azure创建聊天机器人
  9. 计算机视觉 AI 工具集 OpenVINO™,是你心目中的深度学习框架 Top1 吗?
  10. Android 获取手机总内存和可用内存等信息
  11. mini- KMS_Activator_v1.2最新版(迷你KMS)使用方法
  12. DATEUTIL计算时间进度
  13. JDK11 JAVA11下载安装与快速配置环境变量教程
  14. sigar(System Information Gatherer And Reporter)简介
  15. mysql secure_file_priv 属性相关的文件读写权限问题
  16. 第一个用计算机编舞的人,多媒体平台·虚拟人·数字舞蹈
  17. java根据入参不同调不同方法_java根据传入参数不同调用不同的方法,求高手支妙招!...
  18. 申请美国大学计算机专业,低GPA如何申请美国大学计算机专业
  19. MAX98390CEWX D类放大器,集成动态扬声器管理(MAX98390)
  20. 商业智能时代,大数据分析行业前景

热门文章

  1. Identity Server 4 原理和实战(完结)_建立Identity Server 4项目,Client Credentials 授权实例...
  2. 通过Playbook部署LAMP(5)
  3. 使用WSW将Nginx创建为Windows系统服务
  4. 8.2.1.3 Range Optimization
  5. myeclipse 中项目名出现红色感叹号解决方法
  6. c# select标签绑定枚举,并以Description做Text显示
  7. ECMAScript 6教程 (一)
  8. 怎样编写一个Photoshop滤镜(1)
  9. TensorFlow版本
  10. noclobber属性