流在处理数据进行一些迭代操作的时候确认很方便,但是在执行一些耗时或是占用资源很高的任务时候,串行化的流无法带来速度/性能上的提升,并不能满足我们的需要。

通常我们会使用多线程来并行或是分片分解执行任务,而在Stream中也提供了这样的并行方法,下面将会一一介绍这些方法。

将顺序流转为并行流

使用parallelStream()方法或者是使用stream().parallel()来转化为并行流。

但是只是可能会返回一个并行的流,流是否能并行执行还受到其他一些条件的约束(如是否有序,是否支持并行)。

对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。如果对这个方法调用了多次,将以最后一次执行为准。

package com.morris.java8.parallel;import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ParallerDemo {public static void main(String[] args) {IntStream list = IntStream.range(0, 6);//开始并行执行list.parallel().forEach(i -> {Thread thread = Thread.currentThread();System.err.println("integer:" + i + "," + "currentThread:" + thread.getName());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});}
}

运行结果如下:

integer:3,currentThread:main
integer:4,currentThread:ForkJoinPool.commonPool-worker-3
integer:5,currentThread:ForkJoinPool.commonPool-worker-2
integer:1,currentThread:ForkJoinPool.commonPool-worker-1
integer:2,currentThread:ForkJoinPool.commonPool-worker-1
integer:0,currentThread:ForkJoinPool.commonPool-worker-3

从运行结果里面我们可以很清楚的看到parallelStream同时使用了主线程和ForkJoinPool.commonPool创建的线程。 值得说明的是这个运行结果并不是唯一的,实际运行的时候可能会得到多个结果。

看看流的parallel方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。

但是你可以通过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。

// 设置全局并行流并发线程数
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12

为什么两次的运行结果是一样的呢?上面刚刚说过了这是一个全局设置,java.util.concurrent.ForkJoinPool.common.parallelism是final类型的,整个JVM中只允许设置一次。既然默认的并发线程数不能反复修改,那怎么进行不同线程数量的并发测试呢?答案是:引入ForkJoinPool。

IntStream range = IntStream.range(1, 100000);
// 传入parallelism
new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();

因此,使用parallelStream时需要注意的一点是,多个parallelStream之间默认使用的是同一个线程池,所以IO操作尽量不要放进parallelStream中,否则会阻塞其他parallelStream。

// 获取当前机器CPU处理器的数量
System.out.println(Runtime.getRuntime().availableProcessors());// 输出 4
// parallelStream默认的并发线程数
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 3

为什么parallelStream默认的并发线程数要比CPU处理器的数量少1个?因为最优的策略是每个CPU处理器分配一个线程,然而主线程也算一个线程,所以要占一个名额。 这一点可以从源码中看出来:

static final int MAX_CAP      = 0x7fff;        // max #workers - 1
// 无参构造函数
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}

测试流的性能

下面通过几种方式计算数据的和来测试流的性能。

package com.morris.java8.parallel;import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;public class ParallerStreamExample {public static void main(String[] args) {long n = 100_000_000;System.out.println("normal:" + recordTime(ParallerStreamExample::normal, n) + " MS");System.out.println("iterator:" + recordTime(ParallerStreamExample::iterator, n) + " MS");// 太耗时,暂时注释// System.out.println("iteratorParallel:" + recordTime(ParallerStreamExample::iteratorParallel, n) + " MS");System.out.println("longStream:" + recordTime(ParallerStreamExample::longStream, n) + " MS");System.out.println("longStreamParallel:" + recordTime(ParallerStreamExample::longStreamParallel, n) + " MS");}public static long recordTime(Function<Long, Long> function, long n) {long lowestCostTime = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long startTime = System.currentTimeMillis();function.apply(n);long costTime = System.currentTimeMillis() - startTime;if(costTime < lowestCostTime) {lowestCostTime = costTime;}}return lowestCostTime;}/*** 正常for循环* @param n* @return*/public static long normal(long n) {long result = 0;for(long i = 1; i <= n; i++) {result += i;}return result;}/*** iterate顺序流* @param n* @return*/public static long iterator(long n) {return Stream.iterate(1L, t -> t + 1).limit(n).reduce(0L, Long::sum);}/*** iterate并行流* @param n* @return*/public static long iteratorParallel(long n) {return Stream.iterate(1L, t -> t + 1).parallel().limit(n).reduce(0L, Long::sum);}/*** rangeClosed顺序流* @param n* @return*/public static long longStream(long n) {return LongStream.rangeClosed(1, n).sum();}/*** rangeClosed并行流* @param n* @return*/public static long longStreamParallel(long n) {return LongStream.rangeClosed(1, n).parallel().sum();}
}

运行结果如下:

normal:33 MS
iterator:990 MS
longStream:44 MS
longStreamParallel:16 MS

结论:

  • Stream串行性能明显差于for循环迭代,因为Stream串行还有流水线成本在里面。

  • 并行的Stream API能够发挥多核特性,但是有时候不如串行流(比如后面的计算依赖前面的计算结果就不适宜用并行流)

高效使用并行流

下面是一些使用并行流需要思考的方面:

  • 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。

  • 有些操作本身在并行流上的性能就比顺序流差,比如后面的计算依赖前面的计算结果。

  • 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。

  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。

  • 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。

  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。

  • 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

【java8】并行流Stream相关推荐

  1. java8并行流_Java 8:CompletableFuture与并行流

    java8并行流 这篇文章展示了Java 8的CompletableFuture在执行异步计算时如何与并行流进行比较. 我们将使用以下类对长时间运行的任务进行建模: class MyTask {pri ...

  2. 用Java8并行流写一个WordCount功能,学到 了~

    作者:温安适 my.oschina.net/floor/blog/1619644 节前略闲,看了java8并行流,写个了wordCount.本以为易如反掌,结果却折腾了一下午! 在本文中wordcou ...

  3. java并行流 阻塞主线程_记一次使用Java8并行流导致的服务瓶颈问题排查

    一.业务背景# 二.服务架构# 服务使用线程池对请求进行业务处理,corePoolSize=32,maximumPoolSize=128. 三.问题描述# 服务部署到测试环境,将线上流量通过tcp-c ...

  4. Java8 并行流(parallelStream)原理分析及注意事项

    文章目录 前言 一.parallelStream是什么 二.parallelStream原理分析 1.Fork/Join框架 1.1 work-stealing(工作窃取算法) 1.2 常用方法 2. ...

  5. java8新特性【Lambda、Stream API、Optional、Date Time API 、并行流与串行流】

    文章目录 Lambda 表达式 Lambda 表达式的基础语法 方法引用 Lambda 表达式需要"函数式接口"的支持 Java8 内置的四大核心函数式接口 Stream API ...

  6. java并行流 阻塞主线程_多线程入门案例与java8的并行流

    java8 实例请移步https://www.cnblogs.com/ngLee/p/14021859.html 进程与线程 进程是所有线程的集合,每一个线程是进程中的一条执行路径. 多线程的创建方式 ...

  7. java foreach多线程_详解多线程入门案例与java8的并行流

    进程与线程 进程是所有线程的集合,每一个线程是进程中的一条执行路径. 多线程的创建方式,继承Thread实现Runable /*** 第一种创建线程的方式,继承Thread*/ public clas ...

  8. stream銆俠oxed_java-11-Stream优化并行流

    并行流    多线程    把一个内容分成多个数据块  不同线程分别处理每个数据块的流 串行流   单线程  一个线程处理所有数据 java8 对并行流优化  StreamAPI 通过parallel ...

  9. Stream流和ParallelStream并行流详解及对比

    目录 前言 一.Stream流是什么? 二.获取Stream流的方式 三.Stream流中的常用方法 1. forEach(遍历/终结方法) 2. filter(过滤) 3. map(映射转换) 4. ...

最新文章

  1. 清华大学邀请阿里专家授课 主讲大数据
  2. nginx 配置文件
  3. 常用数据库的 扩展名 格式 后缀 端口
  4. Pycharm 2018 虚拟环境创建及解释器的设置(小白图解教程)
  5. Bound Services
  6. Linux之VI命令详解
  7. amap vueamap 与_在vue中使用高德地图vue-amap
  8. Myeclipse创建第一个web项目
  9. android开发(13) 尝试在流布局中移动控件
  10. gta5显示nat较为严格_为何严格治理下雾霾天仍频发?哈尔滨市环保局解答重污染天3大疑问...
  11. Slimer软工课设日报-2016年7月1日
  12. php jq ajax 4个下拉框联动案列,AJAX_AJAX实现下拉框联动,想当年,为了实现三级联动, - phpStudy...
  13. python基础系列教程——Python中的编码问题,中文乱码问题
  14. go语言的特殊变量 iota
  15. digester java_Java-Digester:提取节点名称
  16. 计算机毕设(附源码)JAVA-SSM交通事故证据交易平台
  17. 艾永亮:百果园的商业模式是什么?打造超级产品引领生鲜电商行业
  18. jQuery closest() 方法
  19. Datastage数据装载报错:Consumed more than 1000000 bytes looking for record delimiter
  20. 微信小程序 canvas 笑脸

热门文章

  1. Metricbeat 指标采集工具应用示例
  2. 1-SII--SharedPreferences完美封装
  3. 微信机器人的构建与使用
  4. 击鼓传花击鼓次数相同c语言,击鼓传花
  5. Vue中components几个组件
  6. vs2010添加OCX控件并调用OCX中的函数
  7. python股票策略_用Python编写简单股票策略
  8. 关于SAP 启用新公司时 选用的会计准则
  9. 关于office2013打开后始终显示正在配置问题的解决方案集锦
  10. 计算机综述性论文范文例文,综述性论文范文