在JDK7之前,并行处理数据集合非常麻烦。首先需要自己明确的把包含数据的数据结构分成若干个子部分,第二需要给每个子部分分配一个独立的线程;第三需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分合并起来。

Doug Lea 在JDK7中引入了fork/join框架,让这些操作更稳定,更不易出错。

本节主要内容:
1. 用并行流并行处理数据
2. 并行流的性能分析
3. fork/join框架
4. 使用Spliterator分割流

学完本节期望能达到:
1. 熟练使用并行流,来加速业务性能
2. 了解流内部的工作原理,以防止误用的情况
3. 通过Spliterator控制数据块的划分方式

并行流

可以通过对数据源调用parallelStream方法来将源转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样可以自动将工作负荷转到多核中并行处理。

考虑下面一个实现:给定正整数n,计算 1 + 2 + … n的和。
使用stream的实现:

private static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
}

将上面的顺序流转换为并行流,实现如下:

private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
}

即通过调用方法parallel可将顺序流转换为并行流。

但需要注意的是流仅在终端操作时才开始执行,所以当前流是顺序流还是并行流以最靠近终端操作的流类型为准,示例:

list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);

此种情况并不会按预想的先使用并行流执行过滤,再按顺序流执行映射转换。而是整个流水线操作都按并行流执行。

配置并行流使用的线程池

并行流内部使用了默认的ForkJoinPool, 它默认的线程数量就是处理器的数量(Runtime.getRuntime().availableProcessors())。也可以通过设置系统属性来改变它(System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”))。但它是一个全局设置,会影响所有的并行流,一般而言线程数等于处理器数量是一个合理的数值,不需要修改。

测试流性能

一般而言,同一个功能给我们的感觉是并行流性能会比顺序流性能更好。然而在软件工程中,优化性能的黄金准则是:测量。我们开发了程序,用来测量4种写法的累加,看看性能如何:

@Slf4j
public class SumSample {/*** 顺序流、并行流性能测试* 实现1~1亿整型数字累加**/public static void main(String[] args) {CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000)));}/*** 内部迭代方式实现累加*/private static long forSum(long n) {long result = 0;for (int i = 1; i <= n; i ++) {result += i;}return result;}/*** 顺序流实现累加*/private static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}/*** 并行流实现累加*/private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}/*** long原生流范围实现累加*/private static long longParallelSum(long n) {return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum);}
}
// result:
2022-01-18 10:53:59.035 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:53:59.039 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 58
2022-01-18 10:53:59.039 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-================================================================================
2022-01-18 10:54:00.459 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:54:00.459 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 1420
2022-01-18 10:54:00.459 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-================================================================================
2022-01-18 10:54:04.627 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:54:04.628 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 4167
2022-01-18 10:54:04.628 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-================================================================================
2022-01-18 10:54:04.688 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:54:04.688 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 60

使用四种方法实现1~1亿个数的累加,这是在i7 2.4GHz 6core/12threads CPU的执行结果。让人很意外,并非是并行流性能最好,反而是最差的,最朴实的for循环单线程性能最佳。

原因:

  1. iterate生成的是装箱对象,必须拆箱成数字才能求和。 这一点好理解因为iterate生成的流元素是Long类型,进行累加计算下一个流元素需要先拆箱,计算完再装箱。
  2. iterate很难分成多个独立的块来并行执行。原因是应用这个函数都要依赖前一次应用的结果,即本质上iterate需要顺序执行。虽然标记了流是并行流,但并不意味着一定能并行执行,反而增加了额外开销,影响了性能。

通过上面的比较需要意识到:并行编程比较复杂,有时候甚至违反直觉。如果用的不对(如本例,采用了一个不易并行化的操作iterate),甚至会让性能更差。所以了解parallel方法背后的执行细节非常必要。

LongStream.rangeClosed 代替 iterate

仅高效求和的示例,可用LongStream.rangeClosed高效替代iterate实现并行计算。它的优点是:

  1. LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销
  2. LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。

通过示例演示它的并行执行性能比同样是并行流的iterate版本要快了70倍。可见它有效利用了并行。

为什么并行流还是比for慢?

上面的执行结果可以看出LongStream.rangeClosed的性能还是比for略慢一点,原因是:

并行化是有代价的,并行过程中需要对流做递归划分,把流的归纳操作分配到不同的线程,最后合并。且多个核心之间移动数据的代价也很大。

正确使用并行流

使用并行流加速性能需要确保用对,如果计算结果是错误的,再快也没意义。
误用并行流而产生错误的首要原因是使用的算法改变了某些共享状态。 如下面示例:

class Accumulator {public long total = 0;public void add(long value) { total += value; }
}public static long sideEffectSum(long n) {Accumulator accumulator = new Accumulator();LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);return accumulator.total;}//result:
2022-01-18 11:40:16.943 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> sideEffectSum: 1 + ... + 100_000_000, result: 1037016191509285
2022-01-18 11:40:16.944 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 40

从上面示例看出虽然很快,但结果是错误的。 原因是total += value非原子操作,出现了竞态条件。如果使用同步来修复,就失去了并行的意义。 所以写并行流时一定要考虑多个线程是否会修改共享对象的可变状态。

高效使用并行流

一些高效使用并行流的建议:

  1. 如果有疑问,进行测试。并行流并不总是更快,且有时候跟直觉不一致。 使用适当的基准进行测试来检查其性能。
  2. 留意自动拆装箱。 频繁的自动拆装箱非常损耗性能。此种情况时尽量使用原始数据流来应对: IntStream, LongStream, DoubleStream。
  3. 有些操作天生并行流的性能就比顺序流差,如依赖元素顺序的操作:limit(), findFirst()等。
  4. 需要考虑流操作流水线的总计算成本。 设N为元素的总数,Q是一个元素通过流水线的大致处理成本,则N * Q 是对总成本的粗略估计。 Q值越高意味着使用并行流时的性能更好的可能性更大。 (使用for循环计算1 … N比并行流块原因就是Q太小,虽然N已经够大了)
  5. 对于较小的数据量,选择并行流几乎从来都不是最优的。因为并行本身开销就大,如果元素不多无法覆盖并行本身的开销。
  6. 需要考虑背后的数据结构是否易于分解。如用range工厂方法创建的原始流可以快速分解。 后面可以自定义Spliterator来完全掌控分解过程。
  7. 还要考虑终端操作中合并步骤的代价大小,如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价会超过通过并行得到的性能提升。

一些常见的数据源的可分解性汇总:

Fork/Join框架

想要正确的使用并行流,了解它背后的实现原理至关重要。 并行流背后就是采用的Fork/Join框架。
// TODO: 待补充

Spliterator

// TODO: 待补充

小结

  1. 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
  2. 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的
    行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
  3. 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,
    或处理单个元素特别耗时的时候。
  4. 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎
    总是比尝试并行化某些操作更为重要。
  5. 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程
    上执行,然后将各个子任务的结果合并起来生成整体结果。
  6. Spliterator定义了并行流如何拆分它要遍历的数据。

7. Java8新特性-并行数据处理(parallel)相关推荐

  1. java8新特性--并行流与串行流

    并行流与串行流 1.概述 2.实例 1.概述 并行流就是把一个内容分成多个数据块,并用不同的线程分 别处理每个数据块的流. Java 8 中将并行进行了优化,我们可以很容易的对数据进行并 行操作.St ...

  2. Java8 新特性之流式数据处理(转)

    转自:https://www.cnblogs.com/shenlanzhizun/p/6027042.html 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作 ...

  3. 【Java8新特性 串行/并行流-Optional容器类-时间格式化线程安全等】

    Java8新特性二 一.并行流与顺序流 1.概念 2.Fork/Join框架 3. Fork/Join框架代码示例: 二.Optional类 1. 什么是Optional对象 2. Optional类 ...

  4. Java8新特性总结 -5.Stream API函数式操作流元素集合

    所有示例代码打包下载 : 点击打开链接 Java8新特性 : 接口新增默认方法和静态方法 Optional类 Lambda表达式 方法引用 Stream API - 函数式操作流元素集合 Date/T ...

  5. java8新特性:三,Stream

    java8新特性:三,Stream 1 Stream介绍 Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据. Stream 使用一种类似用 SQL 语句从数据 ...

  6. 【Java8新特性】关于Java8的Stream API,看这一篇就够了!!

    写在前面 Java8中有两大最为重要的改变.第一个是 Lambda 表达式:另外一个则是 Stream API(java.util.stream.*)  ,那什么是Stream API呢?Java8中 ...

  7. java8新特性简述

    Java8发布时间是2014年3月19日,距离今日已经很久了,那么Java8新特性你了解吗? java8是Java的一次重大升级,巨大的里程碑式的改进!! Java语言新特性: 1.与传统结合 -- ...

  8. 【java8新特性】——Stream API详解(二)

    一.简介 java8新添加了一个特性:流Stream.Stream让开发者能够以一种声明的方式处理数据源(集合.数组等),它专注于对数据源进行各种高效的聚合操作(aggregate operation ...

  9. Java8新特性总结 -7.新API和工具

    所有示例代码打包下载 : 点击打开链接 Java8新特性 : 接口新增默认方法和静态方法 Optional类 Lambda表达式 方法引用 Stream API - 函数式操作流元素集合 Date/T ...

  10. java stream byte_乐字节-Java8新特性之Stream流(上)

    上一篇文章,小乐给大家介绍了<Java8新特性之方法引用>,下面接下来小乐将会给大家介绍Java8新特性之Stream,称之为流,本篇文章为上半部分. 1.什么是流? Java Se中对于 ...

最新文章

  1. node-mongoDB
  2. 老鼠的求爱之旅 (DP)
  3. 非线性降维-核主成分分析KPCA
  4. C#中全角转半角以及半角转全角
  5. 聚焦数智技术助力乡村振兴 京东云为乡村振兴注入“数智”力量
  6. 在主方法中定义一个大小为10*10的二维字符型数组,数组名为y,正反对角线上存的是‘*’,其余 位置存的是‘#’;输出这个数组中的所有元素。...
  7. Android注解编程的第一步---模仿ButterKnife的ViewBinder机制
  8. 刷机常识,双清,BL,REC,TWRP
  9. 什么是传感器?不同类型的传感器及其应用
  10. 原生ajax如何跨域,原生ajax 如何解决cors跨域问题
  11. python求x的y次方logn_次方计算器
  12. LSB图像数字水印嵌入算法(含python代码)
  13. 商城项目商品列表页的渲染实现(含动图)
  14. 你了解国际版的阿里云吗?
  15. python3 时区 时间戳 指定输入时间为东八区时间、北京时间
  16. 10个Excel实用操作技巧分享,使用率超高,让你一学就会
  17. 计算机专业论文1000字英语作文,计算机专业毕业论文一千字以上
  18. 更改tomcat访问端口()
  19. 高速C/C++编译工具(ccache)
  20. 百度2015校园招聘面试题回忆录(成功拿到offer)

热门文章

  1. 浅谈代理服务器的作用
  2. SU2 CFD代码阅读
  3. 如何正确安装驱动程序
  4. Taylor Swift - Enchanted_20131123141153-pdf
  5. python怎么读音发音英语-django的英文读法是什么
  6. 2008服务器系统开启ftp,2008服务器开启ftp服务
  7. aka名字_中国新说唱:马来王子尤长靖,尤长靖自编AKA名字
  8. hadoop培训笔记
  9. 使用腾讯云托管部署前端项目
  10. 深度装N卡LINUX驱动 性能怎么样,讲解Deepin 20开源Nouveau和闭源NVIDIA驱动,附装闭源N卡驱动的方法...