Stream—一个早产的婴儿
当你会关注这篇文章时,那么意味着你对Stream
或多或少有些了解,甚至你在许多业务中有所应用。正如你所知,业界对Stream
、lambda
褒贬不一,有人认为它是银弹,也有人认为其降低了代码的可读性。事实上,很多东西我们应该辩证的去看待,一方面Stream
相关的api的确提供了诸多的便利,如果你愿意花时间去理解和使用的话;然而另一方面,它像一个早产的婴儿,当你去阅读它源码时,你会觉得诧异,像是一个临时拼凑而成的模块。
在前面的Java函数式编程的前生今世篇章中,我们已经了解了lambda
表达式的原理,以及常见的四大函数式接口。
我们可以先看一个Stream
的demo:
Stream.of(1, 2, 3).filter(num -> num > 2).forEach(System.out::println);
语义比较清晰,从一个list
中获取数值大于2的,最后给打印出来。
源头
在调用Stream
的API
之前,我们都需要先创建一个Steam
流,Stream
流的创建方式有很多种,比如上述demo
中的Stream.of
,其使用的是StreamSupport
这个类提供的方法;还有在集合类中在1.8
之后预留了stream
的获取方法等。
//StreamSupport
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {Objects.requireNonNull(spliterator);return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);}
//Collectiondefault Stream<E> stream() {return StreamSupport.stream(spliterator(), false);}
这里可以稍微留意一下,有一个parallel
参数,为我们后文去执行作准备。
不知道看到这里你是否也会有同样的疑惑:为什么Stream
明明是一个接口,要在里面做static
的实现?
这与以往的JDK
代码有较大的出入,一般静态功能都会提供一个xxxs
来处理,比如Path
与Paths
,File
与Files
等。而且更令人诧异的是,在1.8
之后,这种静态方法在List
、Collection
中比比皆是。
坦率地讲,这并非一种好的设计,严格来讲,接口只是声明,不应该承载具体实现,虽然从语法而言提供了这种能力,但这并不意味着我们只有这样才能实现。而这也像是对过去设计的妥协。
我们回到Stream
,前面两种方法都提到了,会返回一个Stream
流。
default Stream<E> stream() {return StreamSupport.stream(spliterator(), false);
}
最开始当我看到StreamSupport
这个类时,我第一感觉是类似于LockSupport
,用于「辅助」,而非「创建」。然而事与愿违的是,它更多的做的是「创建」。其实熟悉JDK
源码的人应该比较清楚,这种「创建」的事情,一般是在xxs
(比如Paths
)这种类中处理。
当然,这个仅是个人主观的臆断,也许他们内部并没有这种「约定俗成」的东西。
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {Objects.requireNonNull(spliterator);return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);}
ReferencePipeline.Head
是所有流处理的源头,ReferencePipeline
继承自AbstractPipeline
。Spliterator
用于对数据迭代并加工,其中有一个较为关键的方法forEachRemaining
,我们后面也会提到。
//创建头节点AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {this.previousStage = null;this.sourceSpliterator = source;this.sourceStage = this;this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;// The following is an optimization of:// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;this.depth = 0;this.parallel = parallel;}
头节点,包括后面流水线的节点都继承自这个AbstractPipeline
,你会发现这里的结构是一个双向链表,通过previousStage
和nextStage
来分别用于指向前一个和后一个节点。
流水线
在Stream
体系中,操作被划分成了两种,一种流操作,他所做的事情是对数据的加工,而在流操作内部,又被划分成了两种,一种是有状态的流(StatefulOp
),一种是无状态的流(StatelessOp
),二者的区别在于,数据是否会随着操作中的变化而变化,举个例子,filter
是无状态的,你要过滤什么就是什么,而sort
是有状态的,在两个线程中,如果你在数据层增加了数据或修改了数据,那么二者最后得到的结果可能并不一致;
A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. On the other hand, a stateless lambda expression is one whose result does not depend on any state that might change during the execution of a pipeline.
另外一种是终止操作(TerminalOp
),他意味着开始对流进行执行操作,如果代码中仅有流操作,那么这个流是不会开始执行的,因为流返回的都是一个新的对象。
在Stream
中,流操作有很多种,比如常见的filter
、map
、mapToInt
等,都会在方法中返回一个新建的流操作对象,而这个对象也继承了AbstractPipeline
。
//filter操作@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);//这里的this就是前面提到的流的源头return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};}//StatelessOp类abstract static class StatelessOp<E_IN, E_OUT>extends ReferencePipeline<E_IN, E_OUT> {/*** Construct a new Stream by appending a stateless intermediate* operation to an existing stream.** @param upstream The upstream pipeline stage* @param inputShape The stream shape for the upstream pipeline stage* @param opFlags Operation flags for the new stage*/StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {super(upstream, opFlags);assert upstream.getOutputShape() == inputShape;}@Overridefinal boolean opIsStateful() {return false;}}//StatelessOp最终也继承自AbstractPipelineAbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);previousStage.linkedOrConsumed = true;previousStage.nextStage = this;this.previousStage = previousStage;this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);this.sourceStage = previousStage.sourceStage;if (opIsStateful())sourceStage.sourceAnyStateful = true;this.depth = previousStage.depth + 1;}
StatelessOp
对象在创建时,会注入一个参数this
,而这个this
即我们前面提到的流的源头。在AbstractPipeline
的另外一个构造方法中,完成了双向链表的指定以及深度的自增。
这里有一个方法opIsStateful
,用于判定前面提到的是否是有状态的。
终止符
所有的流操作的执行,都取决于最终的终止操作(TerminalOp
),如果流中没有这个操作,那么前面提到的操作流都无法执行。
而所有的终止操作都实现了TerminalOp
这个接口,包括向我们常见的foreach
、reduce
、find
等。我们还是以前面例子中提到的foreach
来演示我们的原理。
//Stream
void forEach(Consumer<? super T> action);//ReferencePipeline中的forEach实现@Override
public void forEach(Consumer<? super P_OUT> action) {evaluate(ForEachOps.makeRef(action, false));
}
在Stream
的forEach
方法中,有一个参数Consumer
,是一个函数式接口,我们在前面的文章中有所涉及,有兴趣的可以自行查阅其原理。
//ForEachOps
static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;OfRef(Consumer<? super T> consumer, boolean ordered) {super(ordered);this.consumer = consumer;}@Overridepublic void accept(T t) {consumer.accept(t);}}
在ForEachOps
有一个ForEachOp
类用于生成操作类,同时,ForEachOp
还实现了TerminalSink
,后面会提到。不过,还有另外一个OfRef
来继承自ForEachOp
作为调用入口去使用,不过至今我还没明白这里为何单独需要在ForEachOp
下面再嵌套一层,有了解的可以告知我一下。
//AbstractPipelinefinal <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;//用于判定是并行还是串行return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}@Overridepublic final boolean isParallel() {return sourceStage.parallel;}
这里会根据最开始的源头注入的parallel
来判定,在前面也有所提及。这里有一个方法sourceSpliterator
用于协助我们去获取数据源分割器,其实在前面有所提及,在创建流的时候,就已经有自动创建一个spliterator
,如果是串行流,那么会直接使用源头流的分割器,如果是并行流,而且其中有有状态的操作,那么会使用这个状态流实现的方法去返回。
//AbstractPipeline@SuppressWarnings("unchecked")private Spliterator<?> sourceSpliterator(int terminalFlags) {// Get the source spliterator of the pipelineSpliterator<?> spliterator = null;//最开始的源头流的分割器if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}//如果是并行流并且有有状态的操作流if (isParallel() && sourceStage.sourceAnyStateful) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {// Clear the short circuit flag for next pipeline stage// This stage encapsulates short-circuiting, the next// stage may not have any short-circuit operations, and// if so spliterator.forEachRemaining should be used// for traversalthisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;}spliterator = p.opEvaluateParallelLazy(u, spliterator);// Inject or clear SIZED on the source pipeline stage// based on the stage's spliteratorthisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}if (terminalFlags != 0) {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);}return spliterator;}
在我们拿到分割器之后,我们会调用terminalOp.evaluateSequential
方法去处理。需要说明的是,并行流我暂时没有深入研究,所以暂时不在此章的讨论范畴,后续有机会我会补充上去。
//ForEachOps@Overridepublic <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {
//这里的helper也就是前面在AbstractPipeline中注入的thisreturn helper.wrapAndCopyInto(this, spliterator).get();}//AbstractPipeline @Overridefinal <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);return sink;}@Override@SuppressWarnings("unchecked")final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);
//遍历流链表,逐一执行前面的opWrapSink方法for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;}
在操作流中,一般会返回一个StatelessOp
类,这里前面有所提及,中间有一个opWrapSink
就是现在我们在调用的方法,而在这个方法中,又会继续返回一个类Sink.ChainedReference
,这个类会在downstream
记录我们传入的sink
,也就是我们目前正在操作的ForEachOp
。
//前面的filter@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {//继续返回一个类,记录terminalOpreturn new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};}
sink
也是一个简单的单项链表,他的顺序与Stream
相反,通过downStream
一层层向前指定。在获取到最前面一层包装好的sink
之后,我们继续看copyInto
方法。
//AbstractPipeline@Overridefinal <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {//这里的wrappedSink是最前面的流操作,也就是我们生成流之后的第一个操作,在此案例中也就是filterObjects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());//调用分割器的遍历方法spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {copyIntoWithCancel(wrappedSink, spliterator);}}//Spliteratorspublic void forEachRemaining(Consumer<? super T> action) {Object[] a; int i, hi; // hoist accesses and checks from loopif (action == null)throw new NullPointerException();if ((a = array).length >= (hi = fence) &&(i = index) >= 0 && i < (index = hi)) {//将数据源遍历,执行sink中的accept方法do { action.accept((T)a[i]); } while (++i < hi);}}//filter accept方法被遍历执行@Overridepublic void accept(P_OUT u) {
//这里的predicate也就是我们最开始通过lambda表达式创建的actionif (predicate.test(u))
//如果检测通过,那么执行downstream也就是ForEach.OfRef类的accept方法downstream.accept(u);}//OfRef accept被调用@Overridepublic void accept(T t) {
//这里的consumer也就是我们stream.foreach调用时注入的System.out::printlnconsumer.accept(t);}
Spliterators
通过遍历所有数据源,执行filter
的accept
方法,如果校验通过,那么会执行downstream
的accept
方法,而这个downstream
我们已经提及很多次,也就是我们这个例子中的foreach
,foreach
的accept
被调用时,此时又有一个consumer
,这里的consumer
也就是我们最开始例子中的System.out::println
。
至此,整体流程就执行完毕了。
回到我们的标题,为什么说stream
是一个“早产的婴儿”呢?在对stream
整体源码有所大体阅读之后,你会发现很多类的命名、类的设计风格、以及结构的整理设计能力与之前的模块有较大的差异,有些命名明明可以更为规范,有些设计明明可以设计的更为优雅,甚至于,许多地方的设计还不够简练,这里就不一一举例了。当然,这一切都只是我个人的想法,也有可能是我的水平还没到达另外一个层次,或许几年之后再拜读时又会有不一样的感悟。
欢迎关注我的公众号,每周至少一篇比较有深度的原创文章:
Stream—一个早产的婴儿相关推荐
- 新生儿(早产)婴儿护理行业调研报告 - 市场现状分析与发展前景预测
新生儿(早产)婴儿护理市场的企业竞争态势 该报告涉及的主要国际市场参与者有Abbott Nutrition.Analogic.Atom Medical.Arjohuntleigh.Carefusion ...
- 终极孵化器:仿生婴儿的美丽新世界
Conceptual Photograph: The Voorhes 来源: IEEE电气电子工程师 子宫是人类生物学中最复杂的构造之一:可以帮助完成从胚胎到胎儿再到婴儿的壮举.但是如果没有胎盘,这种 ...
- 【Java8新特性】面试官问我:Java8中创建Stream流有哪几种方式?
写在前面 先说点题外话:不少读者工作几年后,仍然在使用Java7之前版本的方法,对于Java8版本的新特性,甚至是Java7的新特性几乎没有接触过.真心想对这些读者说:你真的需要了解下Java8甚至以 ...
- 【Java8新特性】关于Java8的Stream API,看这一篇就够了!!
写在前面 Java8中有两大最为重要的改变.第一个是 Lambda 表达式:另外一个则是 Stream API(java.util.stream.*) ,那什么是Stream API呢?Java8中 ...
- 孕期出血是否先兆流产——B超看婴儿是否在子宫内+hcg值是否过低孕激素不足...
转自:http://blog.sina.com.cn/s/blog_4a869c130102e7nu.html 很多人都经历过孕早期阴道出血,但结局大不一样. 人类受孕后,从一个单细胞逐渐发育成为一个 ...
- node.js之stream模块
为什么80%的码农都做不了架构师?>>> stream 流是一个抽象接口,在 Node 里被不同的对象实现.例如 request to an HTTP server 是流,st ...
- 学习笔记之-java8的新特性-函数式接口,lambda表达式,方法引用,Stream API,Optional类
1.Lambda表达式 用匿名内部类的方法去创建多线程1.new Thread2.参数传递new Runnable3.重写run方法4.在run方法中去设置线程任务5.调用start问题:我们最终目标 ...
- Java 8 新特性之Stream API
1. 概述 1.1 简介 Java 8 中有两大最为重要的改革,第一个是 Lambda 表达式,另外一个则是 Stream API(java.util.stream.*). Stream 是 Java ...
- Java8新特性:Stream介绍和总结
Java8新特性:Stream介绍和总结 什么是Stream 流(Stream)是数据渠道,用于操作数据源(集合.数组等)所生成的元素序列. 集合讲的是数据,流讲的是计算 注意: Stream自己不会 ...
最新文章
- nodejs-REPL/回调函数/事件循环
- Erdaicms旅游网站系统微信和手机端分销系统正式上线发布啦
- ITK:计算网格的法线
- 机器学习之线性回归 (Python SKLearn)
- matlab图像去毛刺_信号去毛刺,去零漂
- 计算机硬件:固态硬盘选购的技巧
- 简单高效地控制高亮度LED
- python 进程池_Python实践17-进程池
- Golang 连接池的几种实现案例
- java jpopupmenu 无法显示_JAVA :为什么使用Jpopupmenu()有参构造方法 不显示标题
- 单例对象会被jvm的gc时回收吗_设计模式专题02-单例五种创建方式
- 你的第一个Windows程序——绘制窗口
- 给定一个数组和滑动窗口的大小,找出所有滑动窗口里数值的最大值。例如,如果输入数组{2,3,4,2,6,2,5,1}及滑动窗口的大小3,那么一共存在6个滑动窗口,他们的最大值分别为...
- Hadoop HA HDFS启动错误之org.apache.hadoop.ipc.Client: Retrying connect to server问题解决
- iTunes历史各个版本下载地址
- RTI DDS的xml说明
- web服务器和数据库服务器分离的优势
- PS更新升级Adobe Camera Raw(ACR)15.3
- 【C语言】之实现简单的打字程序
- Poj·Accumulation Degree