并行数据处理与性能详解与ForkJoin框架
文章目录
- 一、并行流
- 1、将顺序流转换成并行流
- 2、测量流的性能
- 二、分之/合并框架ForkJoinPool
- 1、使用RecursiveTask
- 三、Spliterator
本章节可以让你用Stream接口不费力气就能对数据集执行并行操作,可以声明性的讲顺序流变成并行流。
一、并行流
Stream接口可以调用方法parallelStream很容易把集合转换为并行流。所谓并行流就是把内容分成多个数据块,用不同线程处理每块数据。
1、将顺序流转换成并行流
可以将流转换成并行流,调用方法parallel。例:
public static long parallelSum(long n) {return Stream.iterate(1L,i -> i+1).limit(n).parallel().reduce(0L,Long::sum);}
如果从并行流变成顺序流可以调用sequential这个方法完成。
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
2、测量流的性能
按常理并行求和方法应该比迭代方法性能好。然而在软件工程上,靠猜是绝对不行的,因此我们要进行实战看结果。
public class ParallelStreams {public static long measureSumPerf(Function<Long, Long> adder, long n) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();long sum = adder.apply(n);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Result: " + sum);if (duration < fastest) fastest = duration;}return fastest;}public static long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result;}public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}public static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}public static void main(String[] args) {System.out.println("Sequential sum done in:" +measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");System.out.println("Iterative sum done in:" +measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );}
}
运行结果:
运行结果相当令人失望,求和方法并行是顺序版本的将近10倍,为什么会出现这样的结果,实际上有两个问题:
1、iterate生成是装箱的对象,必须拆箱才能求和。
2、很难把iterate分成多个独立块来并行执行。iterate很难分割成能独立执行的小块,因为每次应用这个函数都要 依赖前一次应用执行结果。
使用有针对性的方法,避免装箱拆箱操作和能分成独立块并行。可以使用,LongStream.rangeClosed与iterate相比有两个优点。
1、产生原始long数字,没有装箱拆箱操作。
2、会生成数字范围,很容易拆成小块。
例:
public static long rangedSum(long n) {return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);}public static long parallelRangedSum(long n) {return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);}System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::rangedSum, 10_000_000) + " msecs" );System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs" );
执行结果:
得到结果终于比顺序执行快,我们使用并行流的时候一定要正确使用,比如算法改变了某些共享状态。
二、分之/合并框架ForkJoinPool
分之/合并框架的目的是有递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。ForkJoinPool是ExecutorService的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。
1、使用RecursiveTask
要把任务提交到线程池中,必须创建RecursiveTask的子类,重写compute方法。R是并行化产生的结果类型。如果任务不返回结果用RecursiveAction。
protected abstract R compute();这个方法同时定义了将任务拆分成子任务的逻辑,和任务无法在拆分时,生成单个任务逻辑。
if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}
用分之/合并框架并行求和
public class ForkJoinSumCalculator extends RecursiveTask<Long> {private final long[] numbers;private final int start;private final int end;public static final long THRESHOLD = 10_000;public ForkJoinSumCalculator(long[] numbers) {this(numbers,0,numbers.length);}private ForkJoinSumCalculator(long[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}@Overrideprotected Long compute() {//该任务负责求和的部分大小int length = end - start;//如果大小小于或等于阈值,顺序执行结果if (length <= THRESHOLD) {return computeSequentially();}//创建一个子任务为数组的前一半求和ForkJoinSumCalculator leftTask =new ForkJoinSumCalculator(numbers, start, start + length/2);//利用另一个ForkJoinPool线程异步执行创建子任务leftTask.fork();//创建一个数组另一半求和ForkJoinSumCalculator rightTask =new ForkJoinSumCalculator(numbers, start + length/2, end);Long rightResult = rightTask.compute();//读取第一个子任务结果,如果尚未完成就等待Long leftResult = leftTask.join();return leftResult + rightResult;}private long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {{sum += numbers[i];}}return sum;}public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task);}public static void main(String[] args) {System.out.println(forkJoinSum(10000000));}
}
使用分之/合并框架需注意几点:
1、对于一个任务调用join方法会阻塞调用方,直到该任务结束。因此,在两个子任务的计算开始之后 再调用它。
2、不在RecursiveTask子类内部使用invoke方法
三、Spliterator
Spliterator是java8中加入的另一个新接口,字面意思是可分迭代器,主要作用于并行执行。
public interface Spliterator<T> {、//按顺序一个一个使用Spliterator元素,如果有其它元素遍历返回trueboolean tryAdvance(Consumer<? super T> action);//可以把元素划分出去分给第二个SpliteratorSpliterator<T> trySplit();//还剩多少元素要遍历long estimateSize();//特性int characteristics();
}
并行数据处理与性能详解与ForkJoin框架相关推荐
- 《Java8实战》笔记(07):并行数据处理与性能
并行数据处理与性能 在Java 7之前,并行处理数据集合非常麻烦. 第一,你得明确地把包含数据的数据结构分成若干子部分. 第二,你要给每个子部分分配一个独立的线程. 第三,你需要在恰当的时候对它们进行 ...
- 最新阿里云ECS服务器S6/C6/G6/N4/R6/sn2ne/sn1ne/se1ne处理器CPU性能详解
阿里云ECS服务器S6/C6/G6/N4/R6/sn2ne/sn1ne/se1ne处理器CPU性能怎么样?阿里云服务器优惠活动机型有云服务器S6.计算型C6.通用型G6.内存型R6.云服务器N4.云服 ...
- 《Hadoop海量数据处理:技术详解与项目实战(第2版)》一2.8 小结
本节书摘来异步社区<Hadoop海量数据处理:技术详解与项目实战(第2版)>一书中的第2章,第2.8节,作者: 范东来 责编: 杨海玲,更多章节内容可以访问云栖社区"异步社区&q ...
- 《Hadoop海量数据处理:技术详解与项目实战(第2版)》一第2章 环境准备
本节书摘来异步社区<Hadoop海量数据处理:技术详解与项目实战(第2版)>一书中的第2章,第2.1节,作者: 范东来 责编: 杨海玲,更多章节内容可以访问云栖社区"异步社区&q ...
- 阿里云轻量级GPU计算型vgn6i云服务器配置性能详解
查看全文 http://www.taodudu.cc/news/show-2923995.html 相关文章: 抽象数据类型 C++实现 计算复数 [(8+6i)*(4+3i)]/[(8+6i)+(4 ...
- mysql 嵌套查询性能_MySQL数据库之嵌套查询与连接查询的性能详解
本文主要向大家介绍了MySQL数据库之嵌套查询与连接查询的性能详解 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 嵌套查询与连接查询的性能:连接查询一般较快:子查询很难被优化. ...
- 国产GPU的发展历程及芯片性能详解
一.国产GPU的发展历程 二.国产GPU进口代替的紧迫性 三.景嘉微:具有完全自主知识产权,打破国外GPU长期垄断 四.景嘉微国产GPU芯片概述 五.景嘉微国产GPU芯片性能详解 六.景嘉微国产GPU ...
- 32 --> 详解 OpenWRT系统框架基础软件模块之netifd
一.简介 OpenWrt路由操作系统的框架基础软件有很多,大部分是通用的软件模块,如 dhcp .dnsmasq.iproute.cmwp.vpn.ipsec等等:OpenWrt还集成部分具有专属特征 ...
- 干货|详解最新语音识别框架 深度全序列卷积神经网络
原标题:干货|详解最新语音识别框架 深度全序列卷积神经网络 导读:目前最好的语音识别系统采用双向长短时记忆网络(LSTM,LongShort Term Memory),但是,这一系统存在训练复杂度高. ...
最新文章
- 金融风控实战——风控数据挖掘方法
- sql 192标准 连接查询
- 【NLP】有三AI NLP知识星球来了,仅此一家别无分店
- PHP即将退出,PHP4即将退出历史舞台
- 2019聊大考研计算机调剂,2019年聊城大学硕士研究生预调剂工作说明
- html标签名都是小写,到底啥是w3c标准(示例代码)
- UI实用素材|下拉菜单细节设计,分层呈现
- java多维数组的反射类型_Java多维数组和Arrays类方法总结详解
- ImportError: cannot import name main
- [Linux] 解决Ubuntu12.10 64位google chrome安装Flash后出现couldn‘t load plug-in的问题;
- 数据可视化系统在哪些行业中应用
- 程序员的寂寞,你们不懂
- 门诊管理系统开发能提高医生的诊疗水平和质量吗
- java计算机毕业设计线上医药用品分销系统设计与实现MyBatis+系统+LW文档+源码+调试部署
- 079冒险岛mysql解封账号_Win7系统玩冒险岛079单机版输入账号密码后出现error38如何解决?...
- 主流各云平台主机性能对比
- 什么是网络智能运维?如何保障业务7x24小时在线?
- 我想推出这么一种应用(现代诗歌)
- 作业一 统计软件简介与数据操作
- ABB机器人编程技巧:双工位预约程序
热门文章
- 程序员的能力拓展模型
- android studio clone 方法不能先用,Android Studio中使用git功能无法clone原因分析
- 几种常见的用于拟合的分布
- 【Python】Python+Matplotlib+LaTeX玩转数学公式
- 【机器学习】为什么在信用风险建模中首选树模型?
- 【学术相关】中国计算机学会推荐中文科技期刊目录
- 【Python基础】玩一玩python第三方进度条库tqdm
- ​【Python入门】Python数学math模块55个函数详解
- 霸榜COCO和Cityscapes!南理工CMU提出极化自注意力,更精细的双重注意力建模结构
- 娱乐社交,玩票大的!2021网易云信“融合通信开发者大赛”决赛名单公布!