tream

Stream是在Java SE 8 API添加的用于增强集合的操作接口,可以让你以一种声明的方式处理集合数据。将要处理的集合看作一种流的创建者,将集合内部的元素转换为流并且在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选,排序,聚合等。元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。Stream的继承关系图如下,且容我慢慢抽丝剥茧细细道来。拉勾IT课小编为大家分解

过滤,转换,聚合,归约

Stream.of("one", "two", "three", "four")

.filter(e -> e.length() > 3)

.peek(e -> System.out.println("Filtered value: " + e))

.map(String::toUpperCase)

.peek(e -> System.out.println("Mapped value: " + e))

.collect(Collectors.toList());

在没有Stream之前,我们对集合数据的处理到多是外部遍历,然后做数据的聚合用算,排序,merge等等。这属于OO思想,在引入Java SE 8引入FP之后,FP的操作可以提高Java程序员的生产力,,基于类型推断的lambda表达式可以 让程序员写出高效率、干净、简洁的代码。可以避免冗余的代码。根据给定的集合操作通过

stream()

方法创建初始流,配合

map()

,

flatMap()

,

filter()

对集合数据进行过滤,转换。api调用我这里就不多说了。直接从源码入手,看上图最核心的就是类为

AbstractPipeline

ReferencePipeline

Sink

接口.

AbstractPipeline

抽象类是整个Stream中流水线的高度抽象了源头

sourceStage

,上游

previousStage

,下游

nextStage

,定义

evaluate

结束方法,而

ReferencePipeline

则是抽象了过滤,转换,聚合,归约等功能,每一个功能的添加实际上可以理解为卷心菜,菜心就是源头,每一次加入一个功能就相当于重新长出一片叶子包住了菜心,最后一个功能集成完毕之后整颗卷心菜就长大了。而

Sink

接口呢负责把整个流水线串起来,然后在执行聚合,归约时候调

AbstractPipeline

抽象类的

evaluate

结束方法,根据是否是并行执行,调用不同的结束逻辑,如果不是并行方法则执行

terminalOp.evaluateSequential

否则就执行

terminalOp.evaluateParallel

,非并行执行模式下则是执行的是

AbstractPipeline

抽象类的

wrapAndCopyInto

方法去调用

copyInto

,调用前会先执行一下

wrapSink

,用于剥开这个我们在流水线上产生的卷心菜。从下游向上游去遍历

AbstractPipeline

,然后包装到Sink,然后在

copyInto

方法内部迭代执行对应的方法。最后完成调用,

并行执行实际上是构建一个

ForkJoinTask

并执行

invoke

去提交到

ForkJoinPool

线程池。

BaseStream

流的基本接口,该接口制定流可以支持无序,顺序,并行的,Stream实现了BaseStream接口。

·

Iterator

iterator();

外部迭代器

·

Spliterator

spliterator();

用于创建一个内部迭代器

·isParallel用于判断该stream是否是并行的

·S sequential();标识该stream创建是顺序执行的

·S parallel();标识该stream创建是并行的,需要使用

ForkJoinPool

·S unordered();标识该stream创建是无序的

·S onClose(Runnable closeHandler);当stream关闭的时候执行一个方法回调去关闭流。

PipelineHelper

该抽象类主要定义了操作管道的核心方法,并且能收集到流管道内的所有信息。如通过

TerminalOp#evaluateParallel

用于执行并行流操作,通过

TerminalOp#evaluateSequential

执行顺序流的操作。

·abstract StreamShape getSourceShape();

·abstract int getStreamAndOpFlags();

·

abstract

long exactOutputSizeIfKnown(Spliterator

spliterator);

将此时间的管道内的元素应用到提供的

Spliterator

,并将结果发送到提供的接收器sink里

·

abstract

> S wrapAndCopyInto(S sink, Spliterator

spliterator);

用于输出返回值的大小。

·

abstract

void copyInto(Sink

wrappedSink, Spliterator

spliterator);

用于将从

Spliterator

获得的元素推入提供的接收器中

Sink

。如果已知流管道中有短路阶段(包含StreamOpflag#SHORT_CURRENT),则在每个元素之后执行一下

Sink#cancellationRequested()

,如果返回请求true,则执行终止。这个方法被实现之后需要遵守Sink的协议即:Sink#begin->Sink#accept->Sink->end

·

abstract

void copyIntoWithCancel(Sink

wrappedSink, Spliterator

spliterator);

用于将从

Spliterator

获得的元素推入提供的接收器中

Sink

。在每个元素之后执行一下

Sink#cancellationRequested()

,如果返回请求true,则执行终止。这个方法被实现之后需要遵守Sink的协议即:Sink#begin->Sink#accept->Sink->end

·

abstract

Sink

wrapSink(Sink

sink);

该方法主要用于包装sink,从下游向上游去遍历

AbstractPipeline

,然后包装到一个Sink内,用于然后在

copyInto

方法内部迭代执行对应的方法。

·

abstract Node.Builder

makeNodeBuilder(long exactSizeIfKnown,IntFunction

generator);

用于构造一个节点Builder,转换为数组去处理数组类型和PipelineHelper定义的输出类型一样。

·

abstract

Node

evaluate(Spliterator

spliterator,boolean flatten,IntFunction

generator);

该方法将源拆分器应用到管道内的所有元素。针对数组处理。如果管道没有中间(

filter,map

)

操作,并且源由一个节点支持(源头),则该节点将被返回(内部遍历然后返回)。这减少了由有状态操作和返回数组的终端操作组成的管道的复制.例如:stream.sorted().toArray();该方法对应到

AbstractPipeline

内部,代码如下:

@Override

@SuppressWarnings("unchecked")

final

Node

evaluate(Spliterator

spliterator,

boolean flatten,

IntFunction

generator) {

if (isParallel()) {

// @@@ Optimize if op of this pipeline stage is a stateful op

return evaluateToNode(this, spliterator, flatten, generator);

}

else {

Node.Builder

nb = makeNodeBuilder(

exactOutputSizeIfKnown(spliterator), generator);

return wrapAndCopyInto(nb, spliterator).build();

}

}

AbstractPipeline

“管道”类的抽象基类,是流接口及其原始专门化的核心实现。用来表示流管道的初始部分,封装流源和零个或多个中间操作。对于顺序流和没有状态中间操作的并行流、并行流,管道中数据的处理是在一次“阻塞”所有操作的过程中完成的也就是最后才去处理。对于具有状态操作的并行流,执行被分成多个段,其中每个状态操作标记一个段的结束,每个段被单独评估,结果被用作下一个段的输入。上述所有情况,都是达到终端操作才开始处理源数据。

AbstractPipeline(Supplier extends Spliterator>> source,

int sourceFlags, boolean parallel)

创建源Source stage第一个参数指定一个Supplier接口(工厂模式,只能生成Spliterator>的对象,根据传入的lambda实现,

extends Spliterator

泛型的PECS原则了解一下。)

AbstractPipeline(Spliterator> source,

int sourceFlags, boolean parallel)

创建源Source stage第一个参数制定这个拆分器,和上面的构造方式一样,直接分析一下这个方法:

AbstractPipeline(Spliterator> source,

int sourceFlags, boolean parallel) {

this.previousStage = null;

this.sourceSpliterator = source;

this.sourceStage = this;

this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;

// The following is an optimization of:

// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);

this.combinedFlags = (~(sourceOrOpFlags

this.depth = 0;

this.parallel = parallel;

}

创建Stream源阶段的时候

previousStage

null

this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;

用于设置当前阶段的标识位。

this.combinedFlags = (~(sourceOrOpFlags

添加源阶段的对流的操作标识,这个

combinedFlags

是流在整个管道内部所有操作的合集,在最后的规约操作的时候去解析出来。

·AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags)

根据上游创建下游

Pipeline

AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags) {

if (previousStage.linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

previousStage.linkedOrConsumed = true;

previousStage.nextStage = this;

this.previousStage = previousStage;

this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;

this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);

this.sourceStage = previousStage.sourceStage;

if (opIsStateful())

sourceStage.sourceAnyStateful = true;

this.depth = previousStage.depth + 1;

}

this.sourceStage = previousStage.sourceStage;

,用于上游和下游关联,

this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);

将上游的操作标识位添加到本阶段的操作标识位中。

depth

记录整个管道的中间操作数。

·

final

R evaluate(TerminalOp

terminalOp)

进行终端汇聚计算。执行最终的计算,得到结果,根据是否是并行执行,调用不同的结束逻辑,如果不是并行方法则执行

terminalOp.evaluateSequential

否则就执行

terminalOp.evaluateParallel

·

final Node

evaluateToArrayNode(IntFunction

generator)

处理流转换数组。

final Node

evaluateToArrayNode(IntFunction

generator) {

if (linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

linkedOrConsumed = true;

if (isParallel() && previousStage != null && opIsStateful()) {

depth = 0;

return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);

}

else {

return evaluate(sourceSpliterator(0), true, generator);

}

}

转换数组的时候,如果是并行流并且不是源阶段,而且调用过

sorted

||

limit

||

skip

||

distinct

这些有状态的操作之后,这里是个模版方法调用。实际上是通过调用

DistinctOps

||

SortedOps

||

SliceOps

这些实现的

opEvaluateParallel

方法,提交到ForkJoin线程池来转换数组。串行执行的时候直接执行

evaluate(sourceSpliterator(0), true, generator);

·evaluate(sourceSpliterator(0), true, generator);

具体的执行方法,用于吧管道内部的输出结果放到Node中。

@Override

@SuppressWarnings("unchecked")

final

Node

evaluate(Spliterator

spliterator,

boolean flatten,

IntFunction

generator) {

if (isParallel()) {

// @@@ Optimize if op of this pipeline stage is a stateful op

return evaluateToNode(this, spliterator, flatten, generator);

}

else {

Node.Builder

nb = makeNodeBuilder(

exactOutputSizeIfKnown(spliterator), generator);

return wrapAndCopyInto(nb, spliterator).build();

}

}

@Override

final

Node

evaluateToNode(PipelineHelper

helper,

Spliterator

spliterator,

boolean flattenTree,

IntFunction

generator) {

return Nodes.collect(helper, spliterator, flattenTree, generator);

}

// Nodes.collect方法

public static

Node

collect(PipelineHelper

helper,

Spliterator

spliterator,

boolean flattenTree,

IntFunction

generator) {

long size = helper.exactOutputSizeIfKnown(spliterator);

if (size >= 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {

if (size >= MAX_ARRAY_SIZE)

throw new IllegalArgumentException(BAD_SIZE);

P_OUT[] array = generator.apply((int) size);

new SizedCollectorTask.OfRef(spliterator, helper, array).invoke();

return node(array);

} else {

Node

node = new CollectorTask.OfRef(helper, generator, spliterator).invoke();

return flattenTree ? flatten(node, generator) : node;

}

}

如果是源是并行流的情况,以

ReferencePipeline

引用管道来看主要执行的是

return Nodes.collect(helper, spliterator, flattenTree, generator);

,该collect方法内部根据切割器有无

Spliterator.SUBSIZED

确定了生成的Node的长度,主要工作是创建一个Task提交到线程池。然后调用invoke拿到结果。示例代码

Arrays.asList("2","22","222").parallelStream().skip(2).toArray();

整个流程如下:

串行执行示例代码

Arrays.asList("2","22","222").stream().skip(2).toArray();

整个流程如下:

·

final Spliterator

sourceStageSpliterator()

获取Stream源头设置的拆分器,如果设置有则返回并且把源拆分器置空,如果有Supplier则调用get方法返回拆分器并且把源拆分器置空。

·public final S sequential()

设置为串行流 ,设置源的paraller属性为false。终态方法不允许重写

·public final S sequential()

设置为并行流 ,设置源的paraller属性为true。终态方法不允许重写

·public void close()

关闭管道的方法,在关闭的时候会把管道使用标志设置为false,拆分器设置为null,如果源的回调关闭Job存在不为null时则invoker这个回调Job。

·public S onClose(Runnable closeHandler)

用于注册关闭的回调job,在调用close的时候用于去执行这个回调job。

·

public Spliterator

spliterator()

sourceStageSpliterator

方法一样的功能,只不过不是终态方法,可以重写用于自定义的拓展。

·public final boolean isParallel()

用于盘带你当前管道是否是并行流。

·final int getStreamFlags()

获取流的标志和Stream的包含的所有操作。

·private Spliterator> sourceSpliterator(int terminalFlags) {

获取源拆分器,和

sourceStageSpliterator

方法一样的功能,针对是并行流时候,并且是创建Stream阶段的话有中间状态,会组合流标志和操作构建拆分器。如果传入的操作码不等于,那么则添加到拆分器的操作码中。

·final StreamShape getSourceShape()

输出Stream源的类型。(引用OR int OR Double OR Long)

·

final

long exactOutputSizeIfKnown(Spliterator

spliterator)

获取期望的size,如果拆分器如果有SIZE标志,调用拆分器的getExactSizeIfKnown方法,否则返回-1。

·

final

> S wrapAndCopyInto(S sink, Spliterator

spliterator)

封装整个管道的阶段,包装在Sink中。把每一个阶段串联起来。包装在Sink内部的

downstream

.

wrapAndCopyInto代码执行流程如下:

java命令行参数_一个 java 命令行参数顺序的坑相关推荐

  1. java string最大长度_一个Java字符串中到底有多少个字符?

    作者:鸟窝 依照Java的文档, Java中的字符内部是以UTF-16编码方式表示的,最小值是 (0),最大值是(65535), 也就是一个字符以2个字节来表示,难道Java最多只能表示 65535个 ...

  2. java有几大对象_一个 Java 对象到底有多大?

    阅读本文大概需要 2.8 分钟. 出处:http://u6.gg/swLPg 编写 Java 代码的时候,大多数情况下,我们很少关注一个 Java 对象究竟有多大(占据多少内存),更多的是关注业务与逻 ...

  3. 一个java类可以有_一个.java文件中可以有几个同级类?

    1.在一个.java文件中可以有多个同级类(和public一样的位置,注意不是内部类).其修饰符只可以public/abstract/final/和无修饰符,不能是其他的protected/priva ...

  4. java背单词软件_一个JAVA写的背单词程序

    一个JAVA写的背单词程序 2007-6-9文字大小:大中小 俺看了一些Java, 写个程序出来玩玩.由于界面是用Jbuilder生成的,可能代码比较乱,而且还没合起来. 目前版本是0.00001 / ...

  5. java恶作剧小程序_一个Java恶搞小程序

    运用Java程序控制某个应用程序的运行(以网易云音乐为例),步骤如下 1.建立bat文件分别是start.bat(控制程序的运行)和kill.bat(控制程序的结束): start.bat 的内容如下 ...

  6. 第一个java程序的错误_我是一名java初学者,执行第一个java程序welcome.java出现了以下错误,这是为什么?...

    我是一名java初学者,执行第一个java程序welcome.java出现了以下错误,这是为什么? welcome.java: import javax.swing.*; public class w ...

  7. java项目----教务管理系统_基于Java的教务管理系统

    java项目----教务管理系统_基于Java的教务管理系统 2022-04-22 18:18·java基础 最近为客户开发了一套学校用教务管理系统,主要实现学生.课程.老师.选课等相关的信息化管理功 ...

  8. java -jar 指定端口_「Linux命令」-Java程序员需要掌握的10个命令

    作为服务端开发的同学,经常会与linux服务器打交道,一些用的命令必须要掌握. 1.top命令-观察服务端负载情况 top命令是Linux下常用的性能分析工具,能够实时显示系统中各个进程的资源占用状况 ...

  9. epub java虚拟机精讲_高级 Java 必须掌握:JVM 分析工具和查看命令,超详细!

    来源:http://boendev.iteye.com/blog/882479 jinfo 可以输出并修改运行时的java 进程的opts. jps 与unix上的ps类似,用来显示本地的java进程 ...

最新文章

  1. Configure,Makefile.am, Makefile.in, Makefile文件之间关系
  2. 2014-11-25nbsp;11:26
  3. Nature发布第一张人类造血干细胞发育的全面路线图
  4. transitionend、change、classList、兼容代码、元素样式属性的操作、-Attribute自定义属性、阻止跳转、元素绑定相同事件、元素解绑事件、事件冒泡、事件三阶段
  5. 聊聊Top2计算机博士2021年就业选择
  6. fedora学习笔记 6:浅谈linux文件系统
  7. 图片的alt(替换文本)属性描述
  8. mysql的dml全程是_MySQL中的DML、DDL、DCL到底是什么呢?
  9. 软件成分分析(SCA)完全指南
  10. Matlab科研绘图颜色补充(特别篇)—51种中国传统颜色
  11. 百度指数、淘宝指数学习笔记
  12. 微型计算机的 CPU主要由两部分构成,微机是由哪两部分组成
  13. 题解 P2184 【贪婪大陆】
  14. [py练习] 人口增长的问题
  15. 如何优雅的判断一个对象的属性是否全部为空
  16. 劝酒的话(男人必修)
  17. 【职场心路】一个老DBA的自白
  18. STM32F1和F4的区别
  19. 在华为P50 Pro中,听到AI异构通信的朱弦三叹
  20. OllyDBG 完美教程 ( 超强入门级 1)

热门文章

  1. php7 on winxp 支持的模块
  2. poj 1776 Task Sequences
  3. java 控件汉字显示方格
  4. Web之CSS开发技巧: CSS @media
  5. 发布一个基于 Reactor 模式的 C++ 网络库
  6. 每日一句:We are a happy crew in our office.
  7. WSSv3 Technical Articles_使用Visual Studio 2005扩展创建Windows SharePoint Services 3.0 Web Part...
  8. Centos下安装JDK环境配置
  9. Oracle start with.connect by prior子句实现递归查询
  10. [UE4]函数和事件的区别