Java Stream源码分析及知识点总结
概述
什么是Stream
Stream就是一种流式的处理数据风格,这一种风格将要处理的元素集合看作一种流,流在管道中传输,并且可以在管道的节点上进行处理,比如进行筛选、排序和聚合。通俗地说,就是将Stream处理看作流水线作业,数据就是流水线上的原料,而对数据的操作就是流水线上对原料的加工。
元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。
Stream的特点
Stream流处理是一个来自数据源的元素队列并支持聚合操作。其包括的特征有:
- 元素队列: 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
- 数据源: 流的来源。 可以是集合,数组,I/O channel, 产生器generator,迭代器 等。
- 聚合操作: 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。
与Collection集合操作不同, Stream操作还有两个基础的特征:
- Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
- 内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。
除了操作不同, 从实现角度比较, Stream和Collection也存在很多区别:
- 不存储数据。 流不是一个存储元素的数据结构。 它只是传递源(source)的数据。
- 功能性的(Functional in nature)。 在流上操作只是产生一个结果,不会修改源。 例如filter- 只是生成一个筛选后的stream,不会删除源里的元素。
- 延迟搜索。 许多流操作, 如filter, map等,都是延迟执行。 中间操作总是lazy的。
- Stream可能是无界的。 而集合总是有界的(元素数量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的时间内完成在无界的stream
- 可消费的(Consumable)。 流的元素在流的声明周期内只能访问一次。 再次访问只能再重新从源中生成一个Stream
使用
流的生成方式:
- 集合类的stream() 和 parallelStream()方法;
- 数组Arrays.stream(Object[]);
- Stream类的静态工厂方法: Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator);
- 文件行 BufferedReader.lines();
- Files类的获取文件路径列表: find(), lines(), list(), walk();
- Random.ints() 随机数流, 无界的;
- 其它一些产生流的方法:BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence),JarFile.stream().
- 通过StreamSupport辅助类从spliterator产生流。
流的操作可以分为:
- 中间操作(Intermediate Operations)
- 无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如filter()、flatMap()、flatMapToDouble()、flatMapToInt()、flatMapToLong()、map()、mapToDouble()、mapToInt()、mapToLong()、peek()、unordered() 等;
- 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如distinct()、sorted()、sorted(comparator)、limit()、skip() 等;
- 终止操作(Terminal Operations)
- 非短路操作:处理完所有数据才能得到结果。如collect()、count()、forEach()、forEachOrdered()、max()、min()、reduce()、toArray()等;
- 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() 等。
源码分析
结构
其中,Stream是一个接口,没有操作的默认实现方式。最主要的实现类是ReferencePipeline,它继承自AbstractPipline,而AbstractPipline实现了BaseStream接口。ReferencePipeline内部定义了三个静态内部类,包括:输入流的Head、无状态中间操作StablessOp、有状态StatfulOp,但之后Head不是抽象类。
先看看Java8集合包中Iterator和Spliterator部分有利于我们了解Stream的数据源。
Iterator
就是集合框架中的迭代器,从Java8开始,Iterator中添加了一个缺省方法default void forEachRemaining(Consumer<? super T> action)
,这个方法用于对未处理的元素执行action,直到处理完或者action抛出异常。
default void forEachRemaining(Consumer<? super E> action) {Objects.requireNonNull(action);while (hasNext())action.accept(next());}
Spliterator
顾名思义,Spliterator可以看作一个可分割迭代器“splittable Iterator”。Spliterator就是为了并行遍历元素而设计的一个迭代器,jdk1.8中的集合框架中的数据结构都默认实现了spliterator。它提供了trySplit()
,为多线程提供处理数据片。
它是为了并行处理流而新增的一个迭代类。
这个迭代器的主要作用就是把集合分成了好几段,每个线程执行一段,因此是线程安全的。基于这个原理,以及modCount的快速失败机制,如果迭代过程中集合元素被修改,会抛出异常。
它依然实现了顺序迭代方法default void forEachRemaining(Consumer<? super T> action)
。 内部用一个循环执行:
default void forEachRemaining(IntConsumer action) {do { } while (tryAdvance(action));}
其中的tryAdvance
方法对下一个处理的操作执行action并返回true,如果没有下一个元素,返回false。
ReferencePipeline
Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline和AbstractPipeline完成的。
定义:
abstract class ReferencePipeline<P_IN, P_OUT>extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>implements Stream<P_OUT> {}
ReferencePipeline类几乎实现了所有的Stream中间操作和最终操作,这里挑选一些典型的代码进行分析。
先看看其中的三个重要的内部类。控制数据流入的 Head ,中间操作 StatelessOp,StatefulOp。
Head是ReferencePipeline数据源,其实内部就是一个集合的并行迭代器。
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {// 构造器,数据源类型需要进程自SpliteratorHead(Supplier<? extends Spliterator<?>> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}// 构造器,数据源类型就是SpliteratorHead(Spliterator<?> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}@Overridefinal boolean opIsStateful() {throw new UnsupportedOperationException();}@Overridefinal Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {throw new UnsupportedOperationException();}// Optimized sequential terminal operations for the head of the pipeline@Overridepublic void forEach(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEach(action);}}@Overridepublic void forEachOrdered(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEachOrdered(action);}}}
无状态的链式加工,会返回一个StatelessOp对象,有状态的加工操作会返回一个StatefulOp对象。
abstract static class StatelessOp<E_IN, E_OUT>extends ReferencePipeline<E_IN, E_OUT> {// 构造器,就是将当前的中间操作和旧的Stream组合成一个新的Stream,返回新的Stream,实现链式调用StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {super(upstream, opFlags);assert upstream.getOutputShape() == inputShape;}@Overridefinal boolean opIsStateful() {return false;}}abstract static class StatefulOp<E_IN, E_OUT>extends ReferencePipeline<E_IN, E_OUT> {// 构造器,和上面一样StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {super(upstream, opFlags);assert upstream.getOutputShape() == inputShape;}@Overridefinal boolean opIsStateful() {return true;}@Overrideabstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,Spliterator<P_IN> spliterator,IntFunction<E_OUT[]> generator);}
1.无状态的中间操作
下面是Stream中一个无状态的中间操作过滤。
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};}
可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink对象链表。
Sink代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。
无状态的中间操作map如下:
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));}};}};}
stream.filter(....).map(...)
怎么形成一个链的?
filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2。在调用StatelessOp1.map时, StatelessOp2是这样生成的:return new StatelessOp<P_OUT, R>(StatelessOp1,......);
,管道中每个阶段的Stream保留前一个流的引用。
2.有状态的中间操作
有专门的类来处理有状态的中间操作。
@Overridepublic final Stream<P_OUT> distinct() {return DistinctOps.makeRef(this);}@Overridepublic final Stream<P_OUT> sorted() {return SortedOps.makeRef(this);}
不管无状态还是有状态的中间操作都为返回一个StatelessOp或者StatefulOp传递给下一个操作,有点像设计模式中的职责链模式。
3.最终操作
以count为例。
public final long count() {return mapToLong(e -> 1L).sum();}@Overridepublic final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {Objects.requireNonNull(mapper);return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedReference<P_OUT, Long>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.applyAsLong(u));}};}};}
Stream中的最终操作都是惰性的,是如何实现的呢。
首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept将触发链式操作, 将管道中的操作在一个迭代中执行一次。
事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。
并行操作是通过ForkJoinTask框架实现。
源码总结
其实还是可以把Stream的源码过程当成流水线来理解。
流水线的入口,也就是数据源,每个Stream具有一个Head内部对象,而Head中就是一个集合spliterator,通过迭代依次输出一个个数据。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。
流水线的中间操作组装,不管是有状态的还是无状态的,都会返回一个包含了上一个节点引用的中间节点,就这样把一个个中间操作拼接到了控制数据流入口的Head后面,但是并没有开始所任何数据处理。
启动流水线,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。
Java Stream源码分析及知识点总结相关推荐
- java+stream+源码分析_java8学习之Stream源码分析
上一次已经将Collectors类中的各种系统收集器的源代码进行了完整的学习,而在之前咱们已经花了大量的篇幅对其Stream进行了详细的示例学习,如: 那接下来则通过源代码的角度来对Stream的运作 ...
- java web开源项目源码_超赞!推荐一个专注于Java后端源码分析的Github项目!
大家好,最近有小伙伴们建议我把源码分析文章及源码分析项目(带注释版)放到github上,这样小伙伴们就可以把带中文注释的源码项目下载到自己本地电脑,结合源码分析文章自己本地调试,总之对于学习开源项目源 ...
- Java IO源码分析(四)——PrintStream
简介 PrintStream继承于FilterOutputStream,而FilterOutputStream用于封装其他的输出流. PrintStream用于给其他的输出流封装了一层打印的功能,它内 ...
- Java集合源码分析(二)ArrayList
ArrayList简介 ArrayList是基于数组实现的,是一个动态数组,其容量能自动增长,类似于C语言中的动态申请内存,动态增长内存. ArrayList不是线程安全的,只能用在单线程环境下,多线 ...
- java abstractlist_源码分析-java-AbstractList-Itr和ListItr的实现
AbstractList API文档 AbstractList实现了List接口,又因为List继承自Collection,Collection继承自Iterable.因此List接口包含很多的方法. ...
- 【java】java boolean 源码分析
1.概述 转载:jdk源码分析-------------boolean 这篇文章小编只是对boolean类型的几个方法进行一次知识梳理,大家有好的意见可以在留言区留下宝贵的意见,小编会做出相应的调整. ...
- OkHttp3错误异常: java.net.ProtocolException: unexpected end of stream 源码分析
之前在项目中调试部分上传附件的接口时会遇到unexpected end of stream错误,在项目所使用的网络框架是我基于OkGo封装的一个网络请求库,而OkGo内部则其实是基于OkHttp封装的 ...
- java编译器源码分析之语法分析器
token流到抽象语法树的过程是语法分析. 前面认识到token流,这部分将介绍抽象语法树(AST). 那么什么是抽象语法树(AST)?AST长啥样?我们的token流是如何转变成AST的?下面围绕这 ...
- java 反编译器源码分析
简介 由于工作需要反编译分析 java 源码,于是需要反编译器做些改动,所以就有了这篇文章. 这次要分析的反编译器是 Femflower,是著名 IDE Idea 的反编译器.源码也是从 Idea 开 ...
最新文章
- 灰色预测模型代码_生信审稿人最常问的验证!临床预测模型中的PCA主成分分析!这点你注意到了没!(附代码)...
- 机器人工具箱 V9.10(Robotics Toolbook) (1):建立机器人模型
- ASC0106硬件连接注意事项
- mysql的字符型系统数据类型主要包括_MySQL的数据类型主要包括哪些
- Strtus2工作流程及原理
- C# 之 Int16 Int32 Int64 的区别
- in-nan(ind)_NaN16 Constant in Julia
- 全球首个!腾讯优图开源3D医疗影像大数据预训练模型
- Android 驱动(8)---简单实例讲解linux的module模块编译步骤
- PRTR论文代码解读
- python拟合曲线_用python做曲线拟合
- 化妆品选购指南_痘痘肌专属
- postgresql 查找慢sql之二: pg_stat_statements
- SkeyeVSS综合安防Onvif、RTSP、GB28181视频云服务H5无插件直播点播卡顿的解决方案
- 【Linux系列文章】基础与Vim
- tail -f和tail -F的区别
- zepto-selector.js简单分析
- Properties综合应用,冲冲冲
- Swift中键盘的弹出隐藏,页面抬高,Return键等的配置
- 复制:高效程序员的45个习惯敏捷开发修炼之道 读书笔记
热门文章
- 编译器LLVM-MLIR-Intrinics-llvm backend-instruction
- linux下音乐播放软件
- CAN总线的AUTOSAR网络管理
- 笃行致远 不负芳华!新荣耀终端公司招聘AI算法专家和工程师
- 网站建设常用英文翻译对照
- 阿里干货课堂丨Android 之 Listview
- 【专题】深入解析软件测试外包
- @JsonFormat时间格式化注解使用
- SAP中采购合同与采购计划协议关联性测试
- 对给定的字符串(只包含'z','o','j'三种字符),判断他是否能AC。 是否AC的规则如下: 1. zoj能AC; 2. 若字符串形式为xzojx,则也能AC,其中x可以是N个'o' 或者为空;