java8 stream运行原理之并行流原理详解
上一篇文章《java8 stream运行原理之顺序流原理详解》介绍了顺序流的执行原理,本文接着上一篇介绍并行流的执行原理。
一、如何创建并行流
调用parallel()方法可以创建并行流,如下:
public static void main(String argv[]){Stream<String> stream=Stream.of("1","2","","123");stream.filter(x->x.length()>=1).parallel().forEach(System.out::println);}
二、并行流原理
下面以第一小节的代码为例,介绍一下并行流原理。
这里只介绍最后的终端操作(forEach),对于如何创建Stream流,以及中间操作原理请参见上一篇文章。
1、parallel()
首先来看一下parallel()方法:
public final S parallel() {//sourceStage是Head对象引用//将Head对象的parallel属性设置为truesourceStage.parallel = true;return (S) this;}
parallel()方法仅仅将Head对象的parallel属性设置为true。
2、forEach()
下面是forEach()方法源码:
public void forEach(Consumer<? super P_OUT> action) {//makeRef()创建TerminalOp对象evaluate(ForEachOps.makeRef(action, false));}final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;//检查sourceStage.parallel的值,如果为true,表示是并行流return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
在forEach()里面调用了terminalOp.evaluateParallel()进行并行处理。下面是terminalOp.evaluateParallel()的方法:
public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {//ordered表示是否有序遍历,true表示有序//forEach操作默认ordered都是falseif (ordered)new ForEachOrderedTask<>(helper, spliterator, this).invoke();elsenew ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();return null;}
在evaluateParallel()方法里面创建ForEachTask对象。
ForEachTask实现了ForkJoinTask类,而这个ForkJoinTask是Fork/Join框架中的,这是java7新增的功能。Fork/Join框架对并行处理做了很多优化。
上面代码的最后调用了invoker()方法,这个方法会调用到ForEachTask.compute()方法:
public void compute() {//将数据源拆分为两部分,分别交给两个不同的线程处理,//这两部分使用rightSplit和leftSplit记录Spliterator<S> rightSplit = spliterator, leftSplit;long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;if ((sizeThreshold = targetSize) == 0L)targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());boolean forkRight = false;Sink<S> taskSink = sink;ForEachTask<S, T> task = this;while (!isShortCircuit || !taskSink.cancellationRequested()) {//rightSplit.trySplit()可以对数据源的数据进行拆分,将数据一分为二//如果剩余的数据量不足以进行再次拆分,则直接使用当前线程处理if (sizeEstimate <= sizeThreshold ||(leftSplit = rightSplit.trySplit()) == null) {task.helper.copyInto(taskSink, rightSplit);break;}//将数据拆分为两部分后,左半部分的数据再创建ForEachTask对象ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);task.addToPendingCount(1);ForEachTask<S, T> taskToFork;//forkRight相当于一个开关,如果上次启动任务处理左半部分数据,那么这次启动任务处理右半部分数据if (forkRight) {forkRight = false;rightSplit = leftSplit;taskToFork = task;task = leftTask;}else {forkRight = true;taskToFork = leftTask;}//调用fork()可以将任务加入到待处理队列中,后续线程池中的线程会将任务取走处理taskToFork.fork();sizeEstimate = rightSplit.estimateSize();}task.spliterator = null;task.propagateCompletion();}
}
并行流处理的核心逻辑就在compute()方法里面,下面总结一下并行流的执行流程:
- 调用Spliterator.trySplit()拆分流,将流一拆为二,如果不能再拆分,那么调用Sink对象链表处理数据;
- 拆分为两部分数据后,右半部分的数据使用当前ForEachTask对象处理,左半部分数据新创建一个ForEachTask对象处理,之后分别启动线程处理这两个任务。
如果处理流的操作都是无状态的,那么执行会像上面介绍的一样在终端操作里面创建并行任务,如果中间操作有有状态的,那么在创建Spliterator对象时,会先将该有状态操作以及之前的所有操作并行执行一次,得到的结果作为Spliterator对象,然后将该Spliterator对象传递给终端操作,这一段处理逻辑可以参见方法sourceSpliterator():
private Spliterator<?> sourceSpliterator(int terminalFlags) {// Get the source spliterator of the pipelineSpliterator<?> spliterator = null;if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}//如果是并行流且有有状态的操作,那么下面的if分支判断为trueif (isParallel() && sourceStage.sourceAnyStateful) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;//从Head开始遍历各个操作对象for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;//如果当前操作是有状态的if (p.opIsStateful()) {depth = 0;//操作对象链表深度记为0if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {// Clear the short circuit flag for next pipeline stage// This stage encapsulates short-circuiting, the next// stage may not have any short-circuit operations, and// if so spliterator.forEachRemaining should be used// for traversalthisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;}//将数据进行并行处理,只执行当前有状态的操作以及它前面的操作//将操作的结果创建一个Spliterator对象,//该Spliterator对象接下来就作为后面流处理的数据源spliterator = p.opEvaluateParallelLazy(u, spliterator);// Inject or clear SIZED on the source pipeline stage// based on the stage's spliteratorthisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;}//操作对象链表深度加1 ,如果中间操作有有状态的,那么该有状态的操作深度为0,//相当于接下来以该有状态操作对象作为链表起点p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}if (terminalFlags != 0) {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);}return spliterator;}
java8 stream运行原理之并行流原理详解相关推荐
- 第41课:Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解
第41课:Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解 一:Checkpoint到底是什么? 1, Spark在生产环境下经常会面临Tranformations的R ...
- Sniff网络基础原理和软件实现技巧详解
Sniff网络基础原理和软件实现技巧详解 前言 SNIFF真是一个古老的话题,关于在网络上采用SNIFF来获取敏感信息已经不是什么新鲜事,也不乏很多成功的案例,那么,SNIFF究竟是什么呢? SNIF ...
- 【 卷积神经网络CNN 数学原理分析与源码详解 深度学习 Pytorch笔记 B站刘二大人(9/10)】
卷积神经网络CNN 数学原理分析与源码详解 深度学习 Pytorch笔记 B站刘二大人(9/10) 本章主要进行卷积神经网络的相关数学原理和pytorch的对应模块进行推导分析 代码也是通过demo实 ...
- 2. IMU原理及姿态融合算法详解
文章目录 2. IMU原理及姿态融合算法详解 一.组合 二. 原理 a) 陀螺仪 b) 加速度计 c) 磁力计 三. 旋转的表达 a) 欧拉角 b) 旋转矩阵 c) 四元数 d) 李群 SO(3)\t ...
- 强化学习教程(四):从PDG到DDPG的原理及tf代码实现详解
强化学习教程(四):从PDG到DDPG的原理及tf代码实现详解 原创 lrhao 公众号:ChallengeHub 收录于话题 #强化学习教程 前言 在前面强化学习教程(三)中介绍了基于策略「PG」算 ...
- 【转】什么是场效应管(FET)-场效应管(FET)分类、原理、用途等知识详解
什么是场效应管(FET)-场效应管(FET)分类.原理.用途等知识详解 场效应管和双极晶体管不同,它属于仅以电子或空穴中的一种载子动作的晶体管.按照结构.原理可以分为:1.接合型场效应管 2.MOS型 ...
- 计算机自动化装配专机,自动化装配生产线结构原理及其组成形式的详解
<自动化装配生产线结构原理及其组成形式的详解>由会员分享,可在线阅读,更多相关<自动化装配生产线结构原理及其组成形式的详解(2页珍藏版)>请在人人文库网上搜索. 1.自动化装配 ...
- 图像仿射变换原理1:齐次坐标来龙去脉详解
☞ ░ 老猿Python博文目录:https://blog.csdn.net/LaoYuanPython ░ 仿射变换博文传送门(带星号的为付费专栏文章): *图像仿射变换原理1:齐次坐标来龙去脉详解 ...
- Nginx源码研究之nginx限流模块详解
这篇文章主要介绍了Nginx源码研究之nginx限流模块详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 高并发系统有三把利器:缓存.降级和限流: 限流的目的是通过对并 ...
最新文章
- ASP.NET文件的下载
- 基于深度学习的位姿估计方法
- 使用Keras计算余弦相似度(Cosine Similarity)
- 《深入理解Java虚拟机》读书笔记五
- 基于Kubeflow建立的星辰算力训练平台背后的技术架构
- mysql 不需要@的变量_mysql参数变量
- mysql+5.6+左连接_第5章 索引与算法
- window10 物理网卡无法启用
- python 对话框开发_python文件选择对话框的操作方法
- Ubuntu 下监控进程网络流量
- Wicket实战(二)hello world
- 打开和关闭Hadoop,Hbase 命令
- 聊聊如何对员工做绩效考核
- halcon 相似度_怎样用深度学习判断两张图片的相似度?
- mysql安装时初始密码错误_踩坑之MySQL安装及修改初始密码
- 5G发展困难,贪婪的手机企业不愿降价,运营商无奈再开4G价格战
- h5 input type 属性为tel苹果系统可以直接获取数字短信验证码
- 小甲鱼python猜题_[Python]小甲鱼Python视频第033课(except)课后题及参考解答
- css居中怎么移动,移动端css水平垂直居中
- Leetcode-1109:航班预订统计(20210831打卡题)