上一篇文章《java8 stream运行原理之顺序流原理详解》介绍了顺序流的执行原理,本文接着上一篇介绍并行流的执行原理。

一、如何创建并行流

调用parallel()方法可以创建并行流,如下:

    public static void main(String argv[]){Stream<String> stream=Stream.of("1","2","","123");stream.filter(x->x.length()>=1).parallel().forEach(System.out::println);}

二、并行流原理

下面以第一小节的代码为例,介绍一下并行流原理。
这里只介绍最后的终端操作(forEach),对于如何创建Stream流,以及中间操作原理请参见上一篇文章。

1、parallel()

首先来看一下parallel()方法:

    public final S parallel() {//sourceStage是Head对象引用//将Head对象的parallel属性设置为truesourceStage.parallel = true;return (S) this;}

parallel()方法仅仅将Head对象的parallel属性设置为true。

2、forEach()

下面是forEach()方法源码:

    public void forEach(Consumer<? super P_OUT> action) {//makeRef()创建TerminalOp对象evaluate(ForEachOps.makeRef(action, false));}final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;//检查sourceStage.parallel的值,如果为true,表示是并行流return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

在forEach()里面调用了terminalOp.evaluateParallel()进行并行处理。下面是terminalOp.evaluateParallel()的方法:

    public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {//ordered表示是否有序遍历,true表示有序//forEach操作默认ordered都是falseif (ordered)new ForEachOrderedTask<>(helper, spliterator, this).invoke();elsenew ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();return null;}

在evaluateParallel()方法里面创建ForEachTask对象。
ForEachTask实现了ForkJoinTask类,而这个ForkJoinTask是Fork/Join框架中的,这是java7新增的功能。Fork/Join框架对并行处理做了很多优化。
上面代码的最后调用了invoker()方法,这个方法会调用到ForEachTask.compute()方法:

public void compute() {//将数据源拆分为两部分,分别交给两个不同的线程处理,//这两部分使用rightSplit和leftSplit记录Spliterator<S> rightSplit = spliterator, leftSplit;long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;if ((sizeThreshold = targetSize) == 0L)targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());boolean forkRight = false;Sink<S> taskSink = sink;ForEachTask<S, T> task = this;while (!isShortCircuit || !taskSink.cancellationRequested()) {//rightSplit.trySplit()可以对数据源的数据进行拆分,将数据一分为二//如果剩余的数据量不足以进行再次拆分,则直接使用当前线程处理if (sizeEstimate <= sizeThreshold ||(leftSplit = rightSplit.trySplit()) == null) {task.helper.copyInto(taskSink, rightSplit);break;}//将数据拆分为两部分后,左半部分的数据再创建ForEachTask对象ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);task.addToPendingCount(1);ForEachTask<S, T> taskToFork;//forkRight相当于一个开关,如果上次启动任务处理左半部分数据,那么这次启动任务处理右半部分数据if (forkRight) {forkRight = false;rightSplit = leftSplit;taskToFork = task;task = leftTask;}else {forkRight = true;taskToFork = leftTask;}//调用fork()可以将任务加入到待处理队列中,后续线程池中的线程会将任务取走处理taskToFork.fork();sizeEstimate = rightSplit.estimateSize();}task.spliterator = null;task.propagateCompletion();}
}

并行流处理的核心逻辑就在compute()方法里面,下面总结一下并行流的执行流程:

  1. 调用Spliterator.trySplit()拆分流,将流一拆为二,如果不能再拆分,那么调用Sink对象链表处理数据;
  2. 拆分为两部分数据后,右半部分的数据使用当前ForEachTask对象处理,左半部分数据新创建一个ForEachTask对象处理,之后分别启动线程处理这两个任务。

如果处理流的操作都是无状态的,那么执行会像上面介绍的一样在终端操作里面创建并行任务,如果中间操作有有状态的,那么在创建Spliterator对象时,会先将该有状态操作以及之前的所有操作并行执行一次,得到的结果作为Spliterator对象,然后将该Spliterator对象传递给终端操作,这一段处理逻辑可以参见方法sourceSpliterator():

    private Spliterator<?> sourceSpliterator(int terminalFlags) {// Get the source spliterator of the pipelineSpliterator<?> spliterator = null;if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}//如果是并行流且有有状态的操作,那么下面的if分支判断为trueif (isParallel() && sourceStage.sourceAnyStateful) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;//从Head开始遍历各个操作对象for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;//如果当前操作是有状态的if (p.opIsStateful()) {depth = 0;//操作对象链表深度记为0if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {// Clear the short circuit flag for next pipeline stage// This stage encapsulates short-circuiting, the next// stage may not have any short-circuit operations, and// if so spliterator.forEachRemaining should be used// for traversalthisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;}//将数据进行并行处理,只执行当前有状态的操作以及它前面的操作//将操作的结果创建一个Spliterator对象,//该Spliterator对象接下来就作为后面流处理的数据源spliterator = p.opEvaluateParallelLazy(u, spliterator);// Inject or clear SIZED on the source pipeline stage// based on the stage's spliteratorthisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;}//操作对象链表深度加1 ,如果中间操作有有状态的,那么该有状态的操作深度为0,//相当于接下来以该有状态操作对象作为链表起点p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}if (terminalFlags != 0)  {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);}return spliterator;}

java8 stream运行原理之并行流原理详解相关推荐

  1. 第41课:Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解

    第41课:Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解 一:Checkpoint到底是什么? 1,  Spark在生产环境下经常会面临Tranformations的R ...

  2. Sniff网络基础原理和软件实现技巧详解

    Sniff网络基础原理和软件实现技巧详解 前言 SNIFF真是一个古老的话题,关于在网络上采用SNIFF来获取敏感信息已经不是什么新鲜事,也不乏很多成功的案例,那么,SNIFF究竟是什么呢? SNIF ...

  3. 【 卷积神经网络CNN 数学原理分析与源码详解 深度学习 Pytorch笔记 B站刘二大人(9/10)】

    卷积神经网络CNN 数学原理分析与源码详解 深度学习 Pytorch笔记 B站刘二大人(9/10) 本章主要进行卷积神经网络的相关数学原理和pytorch的对应模块进行推导分析 代码也是通过demo实 ...

  4. 2. IMU原理及姿态融合算法详解

    文章目录 2. IMU原理及姿态融合算法详解 一.组合 二. 原理 a) 陀螺仪 b) 加速度计 c) 磁力计 三. 旋转的表达 a) 欧拉角 b) 旋转矩阵 c) 四元数 d) 李群 SO(3)\t ...

  5. 强化学习教程(四):从PDG到DDPG的原理及tf代码实现详解

    强化学习教程(四):从PDG到DDPG的原理及tf代码实现详解 原创 lrhao 公众号:ChallengeHub 收录于话题 #强化学习教程 前言 在前面强化学习教程(三)中介绍了基于策略「PG」算 ...

  6. 【转】什么是场效应管(FET)-场效应管(FET)分类、原理、用途等知识详解

    什么是场效应管(FET)-场效应管(FET)分类.原理.用途等知识详解 场效应管和双极晶体管不同,它属于仅以电子或空穴中的一种载子动作的晶体管.按照结构.原理可以分为:1.接合型场效应管 2.MOS型 ...

  7. 计算机自动化装配专机,自动化装配生产线结构原理及其组成形式的详解

    <自动化装配生产线结构原理及其组成形式的详解>由会员分享,可在线阅读,更多相关<自动化装配生产线结构原理及其组成形式的详解(2页珍藏版)>请在人人文库网上搜索. 1.自动化装配 ...

  8. 图像仿射变换原理1:齐次坐标来龙去脉详解

    ☞ ░ 老猿Python博文目录:https://blog.csdn.net/LaoYuanPython ░ 仿射变换博文传送门(带星号的为付费专栏文章): *图像仿射变换原理1:齐次坐标来龙去脉详解 ...

  9. Nginx源码研究之nginx限流模块详解

    这篇文章主要介绍了Nginx源码研究之nginx限流模块详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 高并发系统有三把利器:缓存.降级和限流: 限流的目的是通过对并 ...

最新文章

  1. ASP.NET文件的下载
  2. 基于深度学习的位姿估计方法
  3. 使用Keras计算余弦相似度(Cosine Similarity)
  4. 《深入理解Java虚拟机》读书笔记五
  5. 基于Kubeflow建立的星辰算力训练平台背后的技术架构
  6. mysql 不需要@的变量_mysql参数变量
  7. mysql+5.6+左连接_第5章 索引与算法
  8. window10 物理网卡无法启用
  9. python 对话框开发_python文件选择对话框的操作方法
  10. Ubuntu 下监控进程网络流量
  11. Wicket实战(二)hello world
  12. 打开和关闭Hadoop,Hbase 命令
  13. 聊聊如何对员工做绩效考核
  14. halcon 相似度_怎样用深度学习判断两张图片的相似度?
  15. mysql安装时初始密码错误_踩坑之MySQL安装及修改初始密码
  16. 5G发展困难,贪婪的手机企业不愿降价,运营商无奈再开4G价格战
  17. h5 input type 属性为tel苹果系统可以直接获取数字短信验证码
  18. 小甲鱼python猜题_[Python]小甲鱼Python视频第033课(except)课后题及参考解答
  19. css居中怎么移动,移动端css水平垂直居中
  20. Leetcode-1109:航班预订统计(20210831打卡题)

热门文章

  1. 肖仰华谈知识图谱:知识将比数据更重要,得知识者得天下
  2. inventor如何画心_Illustrator | 如何画一个心型图案
  3. 升级笔记本CPU的常见知识汇总
  4. Python分布式爬虫实战 - 豆瓣读书
  5. 微信小程序获取并修改app.js中的值
  6. 洛谷—— AT_pakencamp_2021_day2_a Participants 2
  7. 写给新人的话——谈谈应届生入职后应该怎样快速成长
  8. 经典力学(动力学)——牛顿定律
  9. 给你的个人网站领养只萌萌的小仓鼠
  10. MPLS/BGP虚拟专用网络路由通告和数据转发