概述

什么是Stream

Stream就是一种流式的处理数据风格,这一种风格将要处理的元素集合看作一种流,流在管道中传输,并且可以在管道的节点上进行处理,比如进行筛选、排序和聚合。通俗地说,就是将Stream处理看作流水线作业,数据就是流水线上的原料,而对数据的操作就是流水线上对原料的加工。

元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。

Stream的特点

Stream流处理是一个来自数据源的元素队列并支持聚合操作。其包括的特征有:

  • 元素队列: 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
  • 数据源: 流的来源。 可以是集合,数组,I/O channel, 产生器generator,迭代器 等。
  • 聚合操作: 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。

与Collection集合操作不同, Stream操作还有两个基础的特征:

  • Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
  • 内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。

除了操作不同, 从实现角度比较, Stream和Collection也存在很多区别:

  • 不存储数据。 流不是一个存储元素的数据结构。 它只是传递源(source)的数据。
  • 功能性的(Functional in nature)。 在流上操作只是产生一个结果,不会修改源。 例如filter- 只是生成一个筛选后的stream,不会删除源里的元素。
  • 延迟搜索。 许多流操作, 如filter, map等,都是延迟执行。 中间操作总是lazy的。
  • Stream可能是无界的。 而集合总是有界的(元素数量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的时间内完成在无界的stream
  • 可消费的(Consumable)。 流的元素在流的声明周期内只能访问一次。 再次访问只能再重新从源中生成一个Stream

使用

流的生成方式:

  • 集合类的stream() 和 parallelStream()方法;
  • 数组Arrays.stream(Object[]);
  • Stream类的静态工厂方法: Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator);
  • 文件行 BufferedReader.lines();
  • Files类的获取文件路径列表: find(), lines(), list(), walk();
  • Random.ints() 随机数流, 无界的;
  • 其它一些产生流的方法:BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence),JarFile.stream().
  • 通过StreamSupport辅助类从spliterator产生流。

流的操作可以分为:

  • 中间操作(Intermediate Operations)

    • 无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如filter()、flatMap()、flatMapToDouble()、flatMapToInt()、flatMapToLong()、map()、mapToDouble()、mapToInt()、mapToLong()、peek()、unordered() 等;
    • 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如distinct()、sorted()、sorted(comparator)、limit()、skip() 等;
  • 终止操作(Terminal Operations)
    • 非短路操作:处理完所有数据才能得到结果。如collect()、count()、forEach()、forEachOrdered()、max()、min()、reduce()、toArray()等;
    • 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() 等。

源码分析

结构


其中,Stream是一个接口,没有操作的默认实现方式。最主要的实现类是ReferencePipeline,它继承自AbstractPipline,而AbstractPipline实现了BaseStream接口。ReferencePipeline内部定义了三个静态内部类,包括:输入流的Head、无状态中间操作StablessOp、有状态StatfulOp,但之后Head不是抽象类。

先看看Java8集合包中Iterator和Spliterator部分有利于我们了解Stream的数据源。

Iterator

就是集合框架中的迭代器,从Java8开始,Iterator中添加了一个缺省方法default void forEachRemaining(Consumer<? super T> action),这个方法用于对未处理的元素执行action,直到处理完或者action抛出异常。

    default void forEachRemaining(Consumer<? super E> action) {Objects.requireNonNull(action);while (hasNext())action.accept(next());}

Spliterator

顾名思义,Spliterator可以看作一个可分割迭代器“splittable Iterator”。Spliterator就是为了并行遍历元素而设计的一个迭代器,jdk1.8中的集合框架中的数据结构都默认实现了spliterator。它提供了trySplit(),为多线程提供处理数据片
它是为了并行处理流而新增的一个迭代类。

这个迭代器的主要作用就是把集合分成了好几段,每个线程执行一段,因此是线程安全的。基于这个原理,以及modCount的快速失败机制,如果迭代过程中集合元素被修改,会抛出异常。

它依然实现了顺序迭代方法default void forEachRemaining(Consumer<? super T> action)。 内部用一个循环执行:

default void forEachRemaining(IntConsumer action) {do { } while (tryAdvance(action));}

其中的tryAdvance方法对下一个处理的操作执行action并返回true,如果没有下一个元素,返回false。

ReferencePipeline

Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline和AbstractPipeline完成的。

定义:

abstract class ReferencePipeline<P_IN, P_OUT>extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>implements Stream<P_OUT>  {}

ReferencePipeline类几乎实现了所有的Stream中间操作和最终操作,这里挑选一些典型的代码进行分析。

先看看其中的三个重要的内部类。控制数据流入的 Head ,中间操作 StatelessOp,StatefulOp。

Head是ReferencePipeline数据源,其实内部就是一个集合的并行迭代器。

    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {// 构造器,数据源类型需要进程自SpliteratorHead(Supplier<? extends Spliterator<?>> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}// 构造器,数据源类型就是SpliteratorHead(Spliterator<?> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}@Overridefinal boolean opIsStateful() {throw new UnsupportedOperationException();}@Overridefinal Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {throw new UnsupportedOperationException();}// Optimized sequential terminal operations for the head of the pipeline@Overridepublic void forEach(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEach(action);}}@Overridepublic void forEachOrdered(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEachOrdered(action);}}}

无状态的链式加工,会返回一个StatelessOp对象,有状态的加工操作会返回一个StatefulOp对象。

    abstract static class StatelessOp<E_IN, E_OUT>extends ReferencePipeline<E_IN, E_OUT> {// 构造器,就是将当前的中间操作和旧的Stream组合成一个新的Stream,返回新的Stream,实现链式调用StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {super(upstream, opFlags);assert upstream.getOutputShape() == inputShape;}@Overridefinal boolean opIsStateful() {return false;}}abstract static class StatefulOp<E_IN, E_OUT>extends ReferencePipeline<E_IN, E_OUT> {// 构造器,和上面一样StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {super(upstream, opFlags);assert upstream.getOutputShape() == inputShape;}@Overridefinal boolean opIsStateful() {return true;}@Overrideabstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,Spliterator<P_IN> spliterator,IntFunction<E_OUT[]> generator);}

1.无状态的中间操作

下面是Stream中一个无状态的中间操作过滤。

    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};}

可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink对象链表。
Sink代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。

无状态的中间操作map如下:

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));}};}};}

stream.filter(....).map(...)怎么形成一个链的?
filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2。在调用StatelessOp1.map时, StatelessOp2是这样生成的:return new StatelessOp<P_OUT, R>(StatelessOp1,......);,管道中每个阶段的Stream保留前一个流的引用。

2.有状态的中间操作

有专门的类来处理有状态的中间操作。

    @Overridepublic final Stream<P_OUT> distinct() {return DistinctOps.makeRef(this);}@Overridepublic final Stream<P_OUT> sorted() {return SortedOps.makeRef(this);}

不管无状态还是有状态的中间操作都为返回一个StatelessOp或者StatefulOp传递给下一个操作,有点像设计模式中的职责链模式。

3.最终操作

以count为例。

 public final long count() {return mapToLong(e -> 1L).sum();}@Overridepublic final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {Objects.requireNonNull(mapper);return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedReference<P_OUT, Long>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.applyAsLong(u));}};}};}

Stream中的最终操作都是惰性的,是如何实现的呢。

首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept将触发链式操作, 将管道中的操作在一个迭代中执行一次。
事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。

并行操作是通过ForkJoinTask框架实现。

源码总结

其实还是可以把Stream的源码过程当成流水线来理解。

  1. 流水线的入口,也就是数据源,每个Stream具有一个Head内部对象,而Head中就是一个集合spliterator,通过迭代依次输出一个个数据。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。

  2. 流水线的中间操作组装,不管是有状态的还是无状态的,都会返回一个包含了上一个节点引用的中间节点,就这样把一个个中间操作拼接到了控制数据流入口的Head后面,但是并没有开始所任何数据处理。

  3. 启动流水线,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。

Java Stream源码分析及知识点总结相关推荐

  1. java+stream+源码分析_java8学习之Stream源码分析

    上一次已经将Collectors类中的各种系统收集器的源代码进行了完整的学习,而在之前咱们已经花了大量的篇幅对其Stream进行了详细的示例学习,如: 那接下来则通过源代码的角度来对Stream的运作 ...

  2. java web开源项目源码_超赞!推荐一个专注于Java后端源码分析的Github项目!

    大家好,最近有小伙伴们建议我把源码分析文章及源码分析项目(带注释版)放到github上,这样小伙伴们就可以把带中文注释的源码项目下载到自己本地电脑,结合源码分析文章自己本地调试,总之对于学习开源项目源 ...

  3. Java IO源码分析(四)——PrintStream

    简介 PrintStream继承于FilterOutputStream,而FilterOutputStream用于封装其他的输出流. PrintStream用于给其他的输出流封装了一层打印的功能,它内 ...

  4. Java集合源码分析(二)ArrayList

    ArrayList简介 ArrayList是基于数组实现的,是一个动态数组,其容量能自动增长,类似于C语言中的动态申请内存,动态增长内存. ArrayList不是线程安全的,只能用在单线程环境下,多线 ...

  5. java abstractlist_源码分析-java-AbstractList-Itr和ListItr的实现

    AbstractList API文档 AbstractList实现了List接口,又因为List继承自Collection,Collection继承自Iterable.因此List接口包含很多的方法. ...

  6. 【java】java boolean 源码分析

    1.概述 转载:jdk源码分析-------------boolean 这篇文章小编只是对boolean类型的几个方法进行一次知识梳理,大家有好的意见可以在留言区留下宝贵的意见,小编会做出相应的调整. ...

  7. OkHttp3错误异常: java.net.ProtocolException: unexpected end of stream 源码分析

    之前在项目中调试部分上传附件的接口时会遇到unexpected end of stream错误,在项目所使用的网络框架是我基于OkGo封装的一个网络请求库,而OkGo内部则其实是基于OkHttp封装的 ...

  8. java编译器源码分析之语法分析器

    token流到抽象语法树的过程是语法分析. 前面认识到token流,这部分将介绍抽象语法树(AST). 那么什么是抽象语法树(AST)?AST长啥样?我们的token流是如何转变成AST的?下面围绕这 ...

  9. java 反编译器源码分析

    简介 由于工作需要反编译分析 java 源码,于是需要反编译器做些改动,所以就有了这篇文章. 这次要分析的反编译器是 Femflower,是著名 IDE Idea 的反编译器.源码也是从 Idea 开 ...

最新文章

  1. 灰色预测模型代码_生信审稿人最常问的验证!临床预测模型中的PCA主成分分析!这点你注意到了没!(附代码)...
  2. 机器人工具箱 V9.10(Robotics Toolbook) (1):建立机器人模型
  3. ASC0106硬件连接注意事项
  4. mysql的字符型系统数据类型主要包括_MySQL的数据类型主要包括哪些
  5. Strtus2工作流程及原理
  6. C# 之 Int16 Int32 Int64 的区别
  7. in-nan(ind)_NaN16 Constant in Julia
  8. 全球首个!腾讯优图开源3D医疗影像大数据预训练模型
  9. Android 驱动(8)---简单实例讲解linux的module模块编译步骤
  10. PRTR论文代码解读
  11. python拟合曲线_用python做曲线拟合
  12. 化妆品选购指南_痘痘肌专属
  13. postgresql 查找慢sql之二: pg_stat_statements
  14. SkeyeVSS综合安防Onvif、RTSP、GB28181视频云服务H5无插件直播点播卡顿的解决方案
  15. 【Linux系列文章】基础与Vim
  16. tail -f和tail -F的区别
  17. zepto-selector.js简单分析
  18. Properties综合应用,冲冲冲
  19. Swift中键盘的弹出隐藏,页面抬高,Return键等的配置
  20. 复制:高效程序员的45个习惯敏捷开发修炼之道 读书笔记

热门文章

  1. 编译器LLVM-MLIR-Intrinics-llvm backend-instruction
  2. linux下音乐播放软件
  3. CAN总线的AUTOSAR网络管理
  4. 笃行致远 不负芳华!新荣耀终端公司招聘AI算法专家和工程师
  5. 网站建设常用英文翻译对照
  6. 阿里干货课堂丨Android 之 Listview
  7. 【专题】深入解析软件测试外包
  8. @JsonFormat时间格式化注解使用
  9. SAP中采购合同与采购计划协议关联性测试
  10. 对给定的字符串(只包含'z','o','j'三种字符),判断他是否能AC。 是否AC的规则如下: 1. zoj能AC; 2. 若字符串形式为xzojx,则也能AC,其中x可以是N个'o' 或者为空;