原文链接

当我们需要执行大量的小任务时,有经验的Java开发人员都会采用线程池来高效执行这些小任务。然而,有一种任务,例如,对超过1000万个元素的数组进行排序,这种任务本身可以并发执行,但如何拆解成小任务需要在任务执行的过程中动态拆分。这样,大任务可以拆成小任务,小任务还可以继续拆成更小的任务,最后把任务的结果汇总合并,得到最终结果,这种模型就是Fork/Join模型。

Java7引入了Fork/Join框架,我们通过RecursiveTask这个类就可以方便地实现Fork/Join模式。

例如,对一个大数组进行并行求和的RecursiveTask,就可以这样编写:

class SumTask extends RecursiveTask<Long> {static final int THRESHOLD = 100;long[] array;int start;int end;SumTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start <= THRESHOLD) {// 如果任务足够小,直接计算:long sum = 0;for (int i = start; i < end; i++) {sum += array[i];}try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println(String.format("compute %d~%d = %d", start, end, sum));return sum;}// 任务太大,一分为二:int middle = (end + start) / 2;System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));SumTask subtask1 = new SumTask(this.array, start, middle);SumTask subtask2 = new SumTask(this.array, middle, end);invokeAll(subtask1, subtask2);Long subresult1 = subtask1.join();Long subresult2 = subtask2.join();Long result = subresult1 + subresult2;System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);return result;}
}

 

编写这个Fork/Join任务的关键在于,在执行任务的compute()方法内部,先判断任务是不是足够小,如果足够小,就直接计算并返回结果(注意模拟了1秒延时),否则,把自身任务一拆为二,分别计算两个子任务,再返回两个子任务的结果之和。

最后写一个main()方法测试:

public static void main(String[] args) throws Exception {// 创建随机数组成的数组:long[] array = new long[400];fillRandom(array);// fork/join task:ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发数4ForkJoinTask<Long> task = new SumTask(array, 0, array.length);long startTime = System.currentTimeMillis();Long result = fjp.invoke(task);long endTime = System.currentTimeMillis();System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}

  

关键代码是fjp.invoke(task)来提交一个Fork/Join任务并发执行,然后获得异步执行的结果。

我们设置任务的最小阀值是100,当提交一个400大小的任务时,在4核CPU上执行,会一分为二,再二分为四,每个最小子任务的执行时间是1秒,由于是并发4个子任务执行,整个任务最终执行时间大约为1秒。

新手在编写Fork/Join任务时,往往用搜索引擎搜到一个例子,然后就照着例子写出了下面的代码:

protected Long compute() {if (任务足够小?) {return computeDirect();}// 任务太大,一分为二:SumTask subtask1 = new SumTask(...);SumTask subtask2 = new SumTask(...);// 分别对子任务调用fork():subtask1.fork();subtask2.fork();// 合并结果:Long subresult1 = subtask1.join();Long subresult2 = subtask2.join();return subresult1 + subresult2;
}

  

很遗憾,这种写法是错!误!的!这样写没有正确理解Fork/Join模型的任务执行逻辑。

JDK用来执行Fork/Join任务的工作线程池大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。对400个元素的数组求和,执行时间应该为1秒。但是,换成上面的代码,执行时间却是两秒。

这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程,导致了4个子任务至少需要7个线程才能并发执行。

打个比方,假设一个酒店有400个房间,一共有4名清洁工,每个工人每天可以打扫100个房间,这样,4个工人满负荷工作时,400个房间全部打扫完正好需要1天。

Fork/Join的工作模式就像这样:首先,工人甲被分配了400个房间的任务,他一看任务太多了自己一个人不行,所以先把400个房间拆成两个200,然后叫来乙,把其中一个200分给乙。

紧接着,甲和乙再发现200也是个大任务,于是甲继续把200分成两个100,并把其中一个100分给丙,类似的,乙会把其中一个100分给丁,这样,最终4个人每人分到100个房间,并发执行正好是1天。

如果换一种写法:

// 分别对子任务调用fork():
subtask1.fork();
subtask2.fork();

  

这个任务就分!错!了!

比如甲把400分成两个200后,这种写法相当于甲把一个200分给乙,把另一个200分给丙,然后,甲成了监工,不干活,等乙和丙干完了他直接汇报工作。乙和丙在把200分拆成两个100的过程中,他俩又成了监工,这样,本来只需要4个工人的活,现在需要7个工人才能1天内完成,其中有3个是不干活的。

其实,我们查看JDK的invokeAll()方法的源码就可以发现,invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。

 

转载于:https://www.cnblogs.com/dream-to-pku/p/8477041.html

【java并发系列】Fork/Join任务(转)相关推荐

  1. Java 并发 (13) -- Fork/Join 框架

    文章目录 1. 简介 2. 精讲 1. 什么是 Fork/Join 框架 2. 工作窃取算法 3. Fork/Join 框架的设计 4. 使用 Fork/Join 框架 5. Fork/Join 框架 ...

  2. 5W字高质量java并发系列详解教程(上)-附PDF下载

    文章目录 第一章 java.util.concurrent简介 主要的组件 Executor ExecutorService ScheduledExecutorService Future Count ...

  3. java 中的fork join框架

    文章目录 ForkJoinPool ForkJoinWorkerThread ForkJoinTask 在ForkJoinPool中提交Task java 中的fork join框架 fork joi ...

  4. Java 7:Fork / Join框架示例

    Java 7中的Fork / Join Framework专为可分解为较小任务的工作而设计,并将这些任务的结果组合起来以产生最终结果. 通常,使用Fork / Join Framework的类遵循以下 ...

  5. 【Java】java中的Fork/Join

    1.概述 视频:java中的Fork/Join Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小任 ...

  6. Java并发系列(10)——FutureTask 和 CompletionService

    接上一篇<Java并发系列(9)--并发工具类> 文章目录 8 FutureTask 与 CompletionService 8.1 FutureTask 8.1.1 类图 8.1.2 几 ...

  7. Java并发系列(11)——ThreadPoolExecutor实现原理与手写

    接上一篇<Java并发系列(10)--FutureTask 和 CompletionService> 文章目录 9 线程池 9.1 JDK 线程池 9.2 ThreadPoolExecut ...

  8. Java并行任务框架Fork/Join

    Fork/Join是什么? Fork意思是分叉,Join为合并.Fork/Join是一个将任务分割并行运行,然后将最终结果合并成为大任务的结果的框架,父任务可以分割成若干个子任务,子任务可以继续分割, ...

  9. Java中的Fork / Join框架的简要概述

    Fork / Join框架是使用并发分治法解决问题的框架. 引入它们是为了补充现有的并发API. 在介绍它们之前,现有的ExecutorService实现是运行异步任务的流行选择,但是当任务同质且独立 ...

最新文章

  1. 风控算法最常见的知识WOE讲解!
  2. php 图片在线编辑功能,summernote在线编辑器提交的内容PHP处理其中图片函数
  3. 微服务架构的基础框架选择:Spring Cloud还是Dubbo?
  4. songCMS 3.15 cookie SQLINJ
  5. 开发日记-20190814 关键词 日常
  6. 会计证考试《财经法规与职业道德》第四章精选题
  7. 项目管理、测试管理、代码bug 管理
  8. Gridcontrol新增行选中有关问题
  9. stl iterator_在C ++ STL中使用const_iterator访问字符列表的元素
  10. dataframe scala 修改值_python – 使用Scala的API替换DataFrame的值
  11. java scanner 回车_Java Scanner类用法及nextLine()产生的换行符问题实例分析
  12. 让你人见人爱的27个原则
  13. SpringBoot+Vue项目上手
  14. 语义模型及自然语言处理系统基础算法
  15. JS原型链原理(链表)
  16. 1. 使用 MegaRAID Storage Manager 监控
  17. Alibaba内网内部资料真香 -Spring手册太全了,
  18. python将xls转换为xlsx
  19. 机器学习深度学习中反向传播之偏导数链式法则
  20. java全国二级考点,java计算机全国二级考试时间

热门文章

  1. TensorFlow学习笔记02:使用tf.data读取和保存数据文件
  2. 工信部部长苗圩:今年我国部分地区将发放5G临时牌照...
  3. anaconda pycharm_python入门必备干货 | python,pycharm,anaconda区别与联系
  4. Win10下配置Docker
  5. 【岩熹解读】今天“麻花”不开心:只靠个别明星的业绩太难了
  6. html5自定义报表工具,利用javascript和jxl实现自定义报表的输出
  7. mysql 手机号省份_手机号码怎么划分省份
  8. filezilla定时上传_filezilla使用教程,filezilla使用教程,教程详解
  9. Spring事务原理详解
  10. python 读取yml文件_python读取yaml配置文件