java-forkjoin框架的使用
ForkJoin是Java7提供的原生多线程并行处理框架,其基本思想是将大任务分割成小任务,最后将小任务聚合起来得到结果。fork是分解的意思, join是收集的意思. 它非常类似于HADOOP提供的MapReduce框架,只是MapReduce的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin更加类似于单机版的MapReduce。
在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行。那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行。这种方式减少了线程的等待时间,提高了性能。子问题中应该避免使用synchronized关键词或其他方式方式的同步。也不应该是一阻塞IO或过多的访问共享变量。在理想情况下,每个子问题的实现中都应该只进行CPU相关的计算,并且只适用每个问题的内部对象。唯一的同步应该只发生在子问题和创建它的父问题之间。
Fork/Join使用两个类完成以上两件事情:
1. RecursiveAction,用于没有返回结果的任务2. RecursiveTask,用于有返回值的任务
源码推荐查询 jdk8的
· ForkJoinPool:task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务。
2个构造方法
ForkJoinPool(int parallelism) 创建一个包含parallelism个并行线程的ForkJoinPool。ForkJoinPool() 以Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool。
3种方式启动
异步执行 execute(ForkJoinTask) ForkJoinTask.fork 等待获取结果 invoke(ForkJoinTask) ForkJoinTask.invoke 执行,获取Future submit(ForkJoinTask) ForkJoinTask.fork(ForkJoinTask are Futures)
异常处理:
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常.
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
if(task.isCompletedAbnormally()) {System.out.println(task.getException()); }
然后, 代码展示
import java.util.concurrent.ForkJoinPool import java.util.concurrent.ForkJoinTask import java.util.concurrent.RecursiveTask /*** fork* 对一个大数组进行并行求和的RecursiveTask** 大任务可以拆成小任务,小任务还可以继续拆成更小的任务,最后把任务的结果汇总合并,得到最终结果,这种模型就是Fork/Join模型。Java7引入了Fork/Join框架,我们通过RecursiveTask这个类就可以方便地实现Fork/Join模式。* Created by wenbronk on 2017/7/13.*/ class ForkJoinTest extends RecursiveTask<Long> {static final int THRESHOLD = 100long[] arrayint startint endForkJoinTest(long[] array, int start, int end) {this.start = startthis.end = endthis.array = array}@Overrideprotected Long compute() {if (end - start < THRESHOLD) {long sum = 0for (int i = start; i < end; i++) {sum += array[i]}try {Thread.sleep(100)} catch (Exception e) {e.printStackTrace()}println String.format('compute %d %d = %d', start, end, sum)}// 对于大任务, 分多线程执行int middle = (end + start) / 2println String.format('split %d %d => %d %d, %d %d', start, end, start, middle, middle, end)def subtask1 = new ForkJoinTest(this.array, start, middle);def subtask2 = new ForkJoinTest(this.array, middle, end);invokeAll(subtask1, subtask2)Long subresult1 = subtask1.join()Long subresult2 = subtask2.join()Long result = subresult1 + subresult2System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);return result}public static void main(String[] args) throws Exception {// 创建随机数组成的数组:long[] array = new long[400]; // fillRandom(array);// fork/join task:ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发数4ForkJoinTask<Long> task = new ForkJoinTest(array, 0, array.length);long startTime = System.currentTimeMillis();Long result = fjp.invoke(task);long endTime = System.currentTimeMillis();System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");} }
java代码的实现
package com.wenbronk.test;import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;/*** forkjoin的简单易用* Created by wenbronk on 2017/7/26.*/ public class CountTask extends RecursiveTask<Integer>{private volatile static int count = 0;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}public static final int threadhold = 2;@Overrideprotected Integer compute() {int sum = 0;System.out.println("开启了一条线程单独干: " + count++);// 如果任务足够小, 就直接执行boolean canCompute = (end - start) <= threadhold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}}else {//任务大于阈值, 分裂为2个任务int middle = (start + end) / 2;CountTask countTask1 = new CountTask(start, middle);CountTask countTask2 = new CountTask(middle + 1, end);// 开启线程 // countTask1.fork(); // countTask2.fork(); invokeAll(countTask1, countTask2);Integer join1 = countTask1.join();Integer join2 = countTask2.join();// 结果合并sum = join1 + join2;}return sum;}// 测试public static void main(String[] args) throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool();CountTask countTask = new CountTask(1, 100);ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);System.out.println(result.get());} }
转载于:https://www.cnblogs.com/wenbronk/p/7228455.html
java-forkjoin框架的使用相关推荐
- Java ForkJoin 框架初探
多核时代,编程语言如果不支持多核编程就OUT了,Java为了迎头赶上,在Java 8 版本增加大量支持多核编程的类库,如Stream等,Java 7开始支持的ForkJoin框架也是为了更好的支持多核 ...
- java forkjoin MySQL_Java并发fork-join框架
fork-join框架允许在几个工作进程中断某个任务,然后等待结果组合它们. 它在很大程度上利用了多处理器机器的生产能力. 以下是fork-join框架中使用的核心概念和对象. Fork Fork是一 ...
- 分支合并 Fork-Join 框架
一.什么是 Fork-Join Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,这种开发方法也叫分 ...
- JAVA并行框架:Fork/Join
转载自 https://www.cnblogs.com/dongguacai/p/6021859.html JAVA并行框架:Fork/Join 一.背景 虽然目前处理器核心数已经发展到很大数目,但是 ...
- Java7任务并行执行神器:ForkJoin框架
转载自 Java7任务并行执行神器:Fork&Join框架 Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继 ...
- ForkJoin框架源码分析(详细)
ForkJoin简介及使用 ForkJoin框架是CompletableFuture和java8 stream使用到的框架.主要用于分片处理的场景. 可以通过自定义分片粒度来实现任务分解.并行处理数据 ...
- ForkJoin框架详解 一张图搞明白工作窃取(work-stealing)机制
1 ForkJoin框架 1.1 ForkJoin框架 ForkJoinPool一种ExecutorService的实现,运行ForkJoinTask任务.ForkJoinPool区别于其它Execu ...
- 使用forkjoin框架分页查询所有数据的例子
使用forkjoin框架分页查询所有数据的例子 import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor ...
- Java集合框架综述,这篇让你吃透!
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:平凡希 cnblogs.com/xiaoxi/p/60899 ...
- 【Java集合框架】ArrayList类方法简明解析(举例说明)
本文目录 1.API与Java集合框架 2.ArrayList类方法解析 2.1 add() 2.2 addAll() 2.3 clear() 2.4 clone() 2.5 contains() 2 ...
最新文章
- 实现盒子动画和键盘特效
- php怎么生成前端网页,PHP自动生成前端的表单框架
- C语言 文件操作9--fgetc()和fputc()
- 蓝桥杯 作物杂交 DFS搜索
- jsf 自定义属性_必填字段的自定义JSF验证器
- 《南方都市报》:中国互联网“公共性”正在变形或流失
- 那年我整理的JavaEE面试题
- 《Photoshop修饰与合成专业技法》—第1章快速选择工具和调整边缘
- Android中的内存泄露问题
- z世代消费力白皮书_谁在影响2.6亿年轻人的消费?Z世代消费力白皮书2019|企鹅智库...
- Bluedroid 函数分析:BTA_GATTC_Open
- MySQL数据误删恢复
- NetGear stora 重置成功后,个人文件夹丢失的找回办法
- vpu测试_611bp上的VPU测试
- Markdown还能这么玩?这款开源神器绝了!
- 联想计算机电源维修,台式联想电脑,主机电源灯不亮是怎么回事,应该怎么维修...
- 我现在是CodeGear公司的员工了
- mysql 收缩日志_MySQL5.7中Undo回收收缩相关参数
- 概念POJO、DTO、DAO、PO、BO、VO、QO、ENTITY详解
- win10+TPLINK,用PXE安装redhat系统
热门文章
- [转]SQLServer和Oracle,存储过程区别,常用函数对比
- Linux:echo、read、cat命令
- Hyper-V Server 2008 R2 系统部署向导
- ASP.NET MVC Bundles 用法和说明(打包javascript和css)
- Z 字形变换 C++实现 java实现 leetcode系列(六)
- 【前端_js】JavaScript知识点总结
- 后面一次上传对linux kernel 的分析
- visual studio 2017 添加自定义代码片段 .snippet文件
- python 试题归纳及答疑 更新中.....
- ADB命令行控制界面开关