文章目录

  • 一、并行流
    • 1、将顺序流转换成并行流
    • 2、测量流的性能
  • 二、分之/合并框架ForkJoinPool
    • 1、使用RecursiveTask
  • 三、Spliterator

本章节可以让你用Stream接口不费力气就能对数据集执行并行操作,可以声明性的讲顺序流变成并行流。

一、并行流

Stream接口可以调用方法parallelStream很容易把集合转换为并行流。所谓并行流就是把内容分成多个数据块,用不同线程处理每块数据。

1、将顺序流转换成并行流

可以将流转换成并行流,调用方法parallel。例:

public static long parallelSum(long n) {return Stream.iterate(1L,i -> i+1).limit(n).parallel().reduce(0L,Long::sum);}

如果从并行流变成顺序流可以调用sequential这个方法完成。

stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();

2、测量流的性能

按常理并行求和方法应该比迭代方法性能好。然而在软件工程上,靠猜是绝对不行的,因此我们要进行实战看结果。

public class ParallelStreams {public static long measureSumPerf(Function<Long, Long> adder, long n) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();long sum = adder.apply(n);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Result: " + sum);if (duration < fastest) fastest = duration;}return fastest;}public static long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result;}public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}public static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}public static void main(String[] args) {System.out.println("Sequential sum done in:" +measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");System.out.println("Iterative sum done in:" +measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );}
}

运行结果:

运行结果相当令人失望,求和方法并行是顺序版本的将近10倍,为什么会出现这样的结果,实际上有两个问题:
1、iterate生成是装箱的对象,必须拆箱才能求和。
2、很难把iterate分成多个独立块来并行执行。iterate很难分割成能独立执行的小块,因为每次应用这个函数都要 依赖前一次应用执行结果。

使用有针对性的方法,避免装箱拆箱操作和能分成独立块并行。可以使用,LongStream.rangeClosed与iterate相比有两个优点。
1、产生原始long数字,没有装箱拆箱操作。
2、会生成数字范围,很容易拆成小块。
例:

 public static long rangedSum(long n) {return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);}public static long parallelRangedSum(long n) {return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);}System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::rangedSum, 10_000_000) + " msecs" );System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs" );

执行结果:

得到结果终于比顺序执行快,我们使用并行流的时候一定要正确使用,比如算法改变了某些共享状态。

二、分之/合并框架ForkJoinPool

分之/合并框架的目的是有递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。ForkJoinPool是ExecutorService的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

1、使用RecursiveTask

要把任务提交到线程池中,必须创建RecursiveTask的子类,重写compute方法。R是并行化产生的结果类型。如果任务不返回结果用RecursiveAction。
protected abstract R compute();这个方法同时定义了将任务拆分成子任务的逻辑,和任务无法在拆分时,生成单个任务逻辑。

if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}

用分之/合并框架并行求和

public class ForkJoinSumCalculator extends RecursiveTask<Long> {private final long[] numbers;private final int start;private final int end;public static final long THRESHOLD = 10_000;public ForkJoinSumCalculator(long[] numbers) {this(numbers,0,numbers.length);}private ForkJoinSumCalculator(long[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}@Overrideprotected Long compute() {//该任务负责求和的部分大小int length = end - start;//如果大小小于或等于阈值,顺序执行结果if (length <= THRESHOLD) {return computeSequentially();}//创建一个子任务为数组的前一半求和ForkJoinSumCalculator leftTask =new ForkJoinSumCalculator(numbers, start, start + length/2);//利用另一个ForkJoinPool线程异步执行创建子任务leftTask.fork();//创建一个数组另一半求和ForkJoinSumCalculator rightTask =new ForkJoinSumCalculator(numbers, start + length/2, end);Long rightResult = rightTask.compute();//读取第一个子任务结果,如果尚未完成就等待Long leftResult = leftTask.join();return leftResult + rightResult;}private long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {{sum += numbers[i];}}return sum;}public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task);}public static void main(String[] args) {System.out.println(forkJoinSum(10000000));}
}

使用分之/合并框架需注意几点:
1、对于一个任务调用join方法会阻塞调用方,直到该任务结束。因此,在两个子任务的计算开始之后 再调用它。
2、不在RecursiveTask子类内部使用invoke方法

三、Spliterator

Spliterator是java8中加入的另一个新接口,字面意思是可分迭代器,主要作用于并行执行。

public interface Spliterator<T> {、//按顺序一个一个使用Spliterator元素,如果有其它元素遍历返回trueboolean tryAdvance(Consumer<? super T> action);//可以把元素划分出去分给第二个SpliteratorSpliterator<T> trySplit();//还剩多少元素要遍历long estimateSize();//特性int characteristics();
}

并行数据处理与性能详解与ForkJoin框架相关推荐

  1. 《Java8实战》笔记(07):并行数据处理与性能

    并行数据处理与性能 在Java 7之前,并行处理数据集合非常麻烦. 第一,你得明确地把包含数据的数据结构分成若干子部分. 第二,你要给每个子部分分配一个独立的线程. 第三,你需要在恰当的时候对它们进行 ...

  2. 最新阿里云ECS服务器S6/C6/G6/N4/R6/sn2ne/sn1ne/se1ne处理器CPU性能详解

    阿里云ECS服务器S6/C6/G6/N4/R6/sn2ne/sn1ne/se1ne处理器CPU性能怎么样?阿里云服务器优惠活动机型有云服务器S6.计算型C6.通用型G6.内存型R6.云服务器N4.云服 ...

  3. 《Hadoop海量数据处理:技术详解与项目实战(第2版)》一2.8 小结

    本节书摘来异步社区<Hadoop海量数据处理:技术详解与项目实战(第2版)>一书中的第2章,第2.8节,作者: 范东来 责编: 杨海玲,更多章节内容可以访问云栖社区"异步社区&q ...

  4. 《Hadoop海量数据处理:技术详解与项目实战(第2版)》一第2章 环境准备

    本节书摘来异步社区<Hadoop海量数据处理:技术详解与项目实战(第2版)>一书中的第2章,第2.1节,作者: 范东来 责编: 杨海玲,更多章节内容可以访问云栖社区"异步社区&q ...

  5. 阿里云轻量级GPU计算型vgn6i云服务器配置性能详解

    查看全文 http://www.taodudu.cc/news/show-2923995.html 相关文章: 抽象数据类型 C++实现 计算复数 [(8+6i)*(4+3i)]/[(8+6i)+(4 ...

  6. mysql 嵌套查询性能_MySQL数据库之嵌套查询与连接查询的性能详解

    本文主要向大家介绍了MySQL数据库之嵌套查询与连接查询的性能详解 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 嵌套查询与连接查询的性能:连接查询一般较快:子查询很难被优化. ...

  7. 国产GPU的发展历程及芯片性能详解

    一.国产GPU的发展历程 二.国产GPU进口代替的紧迫性 三.景嘉微:具有完全自主知识产权,打破国外GPU长期垄断 四.景嘉微国产GPU芯片概述 五.景嘉微国产GPU芯片性能详解 六.景嘉微国产GPU ...

  8. 32 --> 详解 OpenWRT系统框架基础软件模块之netifd

    一.简介 OpenWrt路由操作系统的框架基础软件有很多,大部分是通用的软件模块,如 dhcp .dnsmasq.iproute.cmwp.vpn.ipsec等等:OpenWrt还集成部分具有专属特征 ...

  9. 干货|详解最新语音识别框架 深度全序列卷积神经网络

    原标题:干货|详解最新语音识别框架 深度全序列卷积神经网络 导读:目前最好的语音识别系统采用双向长短时记忆网络(LSTM,LongShort Term Memory),但是,这一系统存在训练复杂度高. ...

最新文章

  1. 金融风控实战——风控数据挖掘方法
  2. sql 192标准 连接查询
  3. 【NLP】有三AI NLP知识星球来了,仅此一家别无分店
  4. PHP即将退出,PHP4即将退出历史舞台
  5. 2019聊大考研计算机调剂,2019年聊城大学硕士研究生预调剂工作说明
  6. html标签名都是小写,到底啥是w3c标准(示例代码)
  7. UI实用素材|下拉菜单细节设计,分层呈现
  8. java多维数组的反射类型_Java多维数组和Arrays类方法总结详解
  9. ImportError: cannot import name main
  10. [Linux] 解决Ubuntu12.10 64位google chrome安装Flash后出现couldn‘t load plug-in的问题;
  11. 数据可视化系统在哪些行业中应用
  12. 程序员的寂寞,你们不懂
  13. 门诊管理系统开发能提高医生的诊疗水平和质量吗
  14. java计算机毕业设计线上医药用品分销系统设计与实现MyBatis+系统+LW文档+源码+调试部署
  15. 079冒险岛mysql解封账号_Win7系统玩冒险岛079单机版输入账号密码后出现error38如何解决?...
  16. 主流各云平台主机性能对比
  17. 什么是网络智能运维?如何保障业务7x24小时在线?
  18. 我想推出这么一种应用(现代诗歌)
  19. 作业一 统计软件简介与数据操作
  20. ABB机器人编程技巧:双工位预约程序

热门文章

  1. 程序员的能力拓展模型
  2. android studio clone 方法不能先用,Android Studio中使用git功能无法clone原因分析
  3. 几种常见的用于拟合的分布
  4. 【Python】Python+Matplotlib+LaTeX玩转数学公式
  5. 【机器学习】为什么在信用风险建模中首选树模型?
  6. 【学术相关】中国计算机学会推荐中文科技期刊目录
  7. 【Python基础】玩一玩python第三方进度条库tqdm
  8. ​【Python入门】Python数学math模块55个函数详解
  9. 霸榜COCO和Cityscapes!南理工CMU提出极化自注意力,更精细的双重注意力建模结构
  10. 娱乐社交,玩票大的!2021网易云信“融合通信开发者大赛”决赛名单公布!