Java8中新增的Stream,相信使用过的同学都已经感受到了它的便利,允许你以声明性的方式处理集合,而不用去做繁琐的for-loop/while-loop,并且可以以极低的成本并行地处理集合数据

如果需要从菜单中筛选出卡路里在400以下的菜品,并按卡路里排序后,输出菜品名称

在java8之前,需要进行两次显示迭代,并且还需要借助中间结果存储

List

// 按照热量值进行筛选

for(Dish dish : dishes) {

if (dish.getCalories() < 400) {

lowCaloricDishes.add(dish);

}

}

// 按照热量进行排序

lowCaloricDishes.sort(new Comparator() {

@Override

public int compare(Dish d1, Dish d2) {

return d1.getCalories().compareTo(d2.getCalories);

}

})

// 提取名称

List lowCaloricDishesName = new LinkedList<>();

for(Dish dish : lowCaloricDishes) {

lowCaloricDishesName.add(dish.getName());

}

如果使用Stream API,只需要

List lowCaloricDishesName =

dishes.parallelStream() // 开启并行处理

.filter(d -> d.getCalories() < 400) // 按照热量值进行筛选

.sorted(Comparator.comparing(Dish::getCalories)) // 按照热量进行排序

.map(Dish::getName) // 提取名称

.collect(Collectors.toList()); // 将结果存入List

甚至,可以写出更复杂的功能

Map> lowCaloricDishesNameGroup =

dishes.parallelStream() // 开启并行处理

.filter(d -> d.getCalories() < 400) // 按照热量值进行筛选

.sorted(comparing(Dish::getCalories)) // 按照热量进行排序

.collect(Collectors.groupingBy( // 将菜品名按照热量进行分组

Dish::getCalories,

Collectors.mapping(Dish::getName, Collectors.toList())

));

是不是非常简洁,并且越发形似SQL

如此简洁的API是如何实现的?中间过程是如何衔接起来的?每一步都会进行一次迭代么,需要中间结果存储么?并行处理是怎么做到的?

什么是Stream?

Stream使用一种类似SQL语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等

数据流在管道中经过中间操作(intermediate operation)处理,由终止操作(terminal operation)得到前面处理的结果

和以往的集合操作不同,Stream操作有两个基础特征:

pipelining:中间操作会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,最终由终止操作触发数据在管道中的流动及处理,并收集最终的结果

Stream的实现使用流水线(pipelining)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作

内部迭代:区别于以往使用iterator或者for-each等显示地在集合外部进行迭代计算的方式,内部迭代隐式的在集合内部进行迭代计算

Stream操作分为两类:中间操作及终止操作

中间操作:将流一层层的进行处理,并向下一层进行传递,如 filter map sorted等

中间操作又分为有状态(stateful)及无状态(stateless)

有状态:必须等上一步操作完拿到全部元素后才可操作,如sorted

无状态:该操作的数据不收上一步操作的影响,如filter map

终止操作:触发数据的流动,并收集结果,如collect findFirst forEach等

终止操作又分为短路操作(short-circuiting)及非短路操作(non-short-circuiting)

短路操作:会在适当的时刻终止遍历,类似于break,如anyMatch findFirst等

非短路操作:会遍历所有元素,如collect max等

Stream采用某种方式记录用户每一步的操作,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,那么

用户的操作如何记录?

操作如何叠加?

叠加后的操作如何执行?

执行后的结果(如果有)在哪里?

Stream如何实现

操作如何记录

Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将各Pipeline按照先后顺序连接到一起,就构成了整个流水线

与Stream相关类和接口的继承关系如下图

Head用于表示第一个Stage,该Stage不包含任何操作

StatelessOp和StatefulOp分别表示无状态和有状态的Stage

使用Collection.stream Arrays.stream或Stream.of等接口会生成Head,其内部均采用StreamSupport.stream方法,将原始数据包装为Spliterator存放在Stage中

Head记录Stream起始操作,将包装为Spliterator的原始数据存放在Stage中

StatelessOp记录无状态的中间操作

StatefulOp记录有状态的中间操作

TerminalOp用于触发数据数据在各Stage间的流动及处理,并收集最终数据(如果有)

Head StatelessOp StatefulOp三个操作实例化会指向其父类AbstractPipeline

对于Head

/**

* Constructor for the head of a stream pipeline.

*

* @param source {@code Spliterator} describing the stream source

* @param sourceFlags the source flags for the stream source, described in

* {@link StreamOpFlag}

* @param parallel {@code true} if the pipeline is parallel

*/

AbstractPipeline(Spliterator> source, int sourceFlags, boolean parallel) {

this.previousStage = null;

this.sourceSpliterator = source;

this.sourceStage = this;

this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;

// The following is an optimization of:

// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);

this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;

this.depth = 0;

this.parallel = parallel;

}

其会将包装为Spliterator的原始数据存放在Stage中,并将自身存放在sourceStage中

对于StatelessOp及StatefulOp

/**

* Constructor for appending an intermediate operation stage onto an

* existing pipeline.

*

* @param previousStage the upstream pipeline stage

* @param opFlags the operation flags for the new stage, described in

* {@link StreamOpFlag}

*/

AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags) {

if (previousStage.linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

previousStage.linkedOrConsumed = true;

previousStage.nextStage = this;

this.previousStage = previousStage;

this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;

this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);

this.sourceStage = previousStage.sourceStage;

if (opIsStateful())

sourceStage.sourceAnyStateful = true;

this.depth = previousStage.depth + 1;

}

每一个Stage都会存放原始的sourceStage,即Head

通过previousStage及nextStage,将各Stage串联为一个双向链表,使得每一步都知道上一步与下一步的操作

操作如何叠加

以上已经解决了如何记录操作的问题,想要让pipeline运行起来,需要一种将所有操作叠加到一起的方案

由于前面的Stage并不知道后面的Stage导致需要执行何种操作,只有当前Stage本身知道该如何执行自己包含的动作,这就需要某种协议来协调相邻Stage之间的调用关系

Stream类库采用了Sink接口来协调各Stage之间的关系

interface Sink extends Consumer {

/**

* Resets the sink state to receive a fresh data set. This must be called

* before sending any data to the sink. After calling {@link #end()},

* you may call this method to reset the sink for another calculation.

* @param size The exact size of the data to be pushed downstream, if

* known or {@code -1} if unknown or infinite.

*

*

Prior to this call, the sink must be in the initial state, and after

* this call it is in the active state.

*

* 开始遍历前调用,通知Sink做好准备

*/

default void begin(long size) {}

/**

* Indicates that all elements have been pushed. If the {@code Sink} is

* stateful, it should send any stored state downstream at this time, and

* should clear any accumulated state (and associated resources).

*

*

Prior to this call, the sink must be in the active state, and after

* this call it is returned to the initial state.

*

* 所有元素遍历完成后调用,通知Sink没有更多元素了

*/

default void end() {}

/**

* Indicates that this {@code Sink} does not wish to receive any more data.

*

* @implSpec The default implementation always returns false.

*

* @return true if cancellation is requested

*

* 是否可以结束操作,可以让短路操作尽早结束

*/

default boolean cancellationRequested() {}

/**

* Accepts a value.

*

* @implSpec The default implementation throws IllegalStateException.

*

* @throws IllegalStateException if this sink does not accept values

*

* 遍历时调用,接收的一个待处理元素,并对元素进行处理

* Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept方法即可

*/

default void accept(T value) {}

}

Sink的四个接口方法常常相互协作,共同完成计算任务

实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法,下面结合具体源码来理解Stage是如何将自身的操作包装秤Sink,以及Sink是如何将处理结果转发给下一个Sink的

无状态Stage,Stream.map

// Stream.map 将生成一个新Stream

public final Stream map(Function super P_OUT, ? extends R> mapper) {

Objects.requireNonNull(mapper);

return new StatelessOp(this, StreamShape.REFERENCE,

StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {

// 该方法将回调函数(处理逻辑)包装成Sink

@Override

Sink opWrapSink(int flags, Sink sink) {

return new Sink.ChainedReference(sink) {

@Override

public void accept(P_OUT u) {

// 接收数据,使用当前包装的回调函数处理数据,并传递给下游Sink

downstream.accept(mapper.apply(u));

}

};

}

};

}

上述代码逻辑非常简单,接下来可以看一下有状态Stage,Stream.sorted

private static final class RefSortingSink extends AbstractRefSortingSink {

// 存放用于排序的元素

private ArrayList list;

RefSortingSink(Sink super T> sink, Comparator super T> comparator) {

super(sink, comparator);

}

@Override

public void begin(long size) {

if (size >= Nodes.MAX_ARRAY_SIZE)

throw new IllegalArgumentException(Nodes.BAD_SIZE);

// 创建用于存放排序元素的列表

list = (size >= 0) ? new ArrayList((int) size) : new ArrayList();

}

@Override

public void end() {

// 只有在接收到所有元素后才开始排序

list.sort(comparator);

downstream.begin(list.size());

// 排序完成后,将数据传递给下游Sink

if (!cancellationWasRequested) {

// 下游Sink不包含短路操作,将数据依次传递给下游Sink

list.forEach(downstream::accept);

}

else {

// 下游Sink包含短路操作

for (T t : list) {

// 对于每一个元素,都要询问是否可以结束处理

if (downstream.cancellationRequested()) break;

// 将元素传递给下游Sink

downstream.accept(t);

}

}

// 告知下游Sink数据传递完毕

downstream.end();

list = null;

}

@Override

public void accept(T t) {

// 依次将需要排序的元素加入到临时列表中

list.add(t);

}

}

Stream.sorted会在接收到所有元素之后再进行排序,在此之后才开始将数据依次传递给下游Sink

叠加后的操作如何执行

Sink就如齿轮,每一步的操作逻辑是封装在Sink中的,那各Sink是如何串联咬合在一起的,首个Sink又是如何启动来触发整个pipeline执行的?

结束操作(TerminalOp)之后不能再有别的操作,结束操作会创建一个包装了自己操作的Sink,这个Sink只处理数据而不会将数据传递到下游Sink

TerminalOp的类图非常简单

FindOp: 用于查找,如findFirst,findAny,生成FindSink

ReduceOp: 用于规约,如reduce collect,生成ReduceSink

MatchOp: 用于匹配,如allMatch anyMatch,生成MatchSink

ForEachOp: 用于遍历,如forEach,生成ForEachSink

在调用Stream的终止操作时,会执行AbstractPipeline.evaluate

/**

* Evaluate the pipeline with a terminal operation to produce a result.

*

* @param the type of result

* @param terminalOp the terminal operation to be applied to the pipeline.

* @return the result

*/

final R evaluate(TerminalOp terminalOp /* 各种终止操作 */) {

assert getOutputShape() == terminalOp.inputShape();

if (linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

linkedOrConsumed = true;

return isParallel()

? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */

: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */

}

最终会根据是否并行执行TerminalOp中不同的的evaluate方法,在TerminalOp的evaluate方法中会调用helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get()来串联各层Sink,触发pipeline,并获取最终结果,那TerminalOp到底是如何串联各层Sink的?

final > S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator spliterator) {

copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);

return sink;

}

其中玄机尽在warpSink

final Sink wrapSink(Sink sink) {

Objects.requireNonNull(sink);

// AbstractPipeline.this,最后一层Stage

for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {

// 从下游向上游遍历,不断包装Sink

sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层Stage的Sink */);

}

return (Sink) sink;

}

还记得opWrapSink么?它会返回一个新的Sink,实现begin end accept等方法,当前Stage的处理逻辑封装在其中,并将处理后的结果传递给下游的Sink

这样,便将从开始到结束的所有操作都包装到了一个Sink里,执行这个Sink就相当于执行首个Sink,并带动所有下游的Sink,使整个pipeline运行起来

有了包含所有操作的Sink,如何执行Sink呢?wrapAndCopyInto中还有一个copyInto方法

final void copyInto(Sink wrappedSink, Spliterator spliterator) {

Objects.requireNonNull(wrappedSink);

if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {

// 不包含短路操作

// 1. begin

wrappedSink.begin(spliterator.getExactSizeIfKnown());

// 2. 遍历调用 sink.accept

spliterator.forEachRemaining(wrappedSink);

// 3. end

wrappedSink.end();

}

else {

// 包含短路操作

copyIntoWithCancel(wrappedSink, spliterator);

}

}

final void copyIntoWithCancel(Sink wrappedSink, Spliterator spliterator) {

@SuppressWarnings({"rawtypes","unchecked"})

AbstractPipeline p = AbstractPipeline.this;

while (p.depth > 0) {

p = p.previousStage;

}

// 1. begin

wrappedSink.begin(spliterator.getExactSizeIfKnown());

// 2. 遍历调用 sink.accept

// 每一次遍历都询问cancellationRequested结果

// 如果cancellationRequested为true,则中断遍历

p.forEachWithCancel(spliterator, wrappedSink);

// 3. end

wrappedSink.end();

}

copyInto会根据不同的情况依次

调用sink.bigin

遍历调用sink.accept

如果包含短路操作,则每次遍历都需要询问cancellationRequested,适时中断遍历

调用sink.end

执行后的结果在哪里

各层Stage通过Sink协议将所有的操作串联到一起,遍历原始数据并执行,终止操作会创建一个包装了自己操作的TerminalSink,该Sink中处理最终的数据并做数据收集(如果需要),每一种TerminalSink中均会提供一个获取最终数据的方法

TerminalOp通过调用TerminalSink中的对应方法,获取最终的数据并返回,如ReduceOp中

@Override

public R evaluateSequential(PipelineHelper helper,

Spliterator spliterator) {

return helper.wrapAndCopyInto(makeSink(), spliterator)/* 执行各Sink */.get()/* 获取最终数据 */;

}

并发是如何做到的

使用Collection.parallelStream或Stream.parallel等方法可以将当前的流标记为并发,重新来看AbstractPipeline.evaluate,该方法会在终止操作时被执行

/**

* Evaluate the pipeline with a terminal operation to produce a result.

*

* @param the type of result

* @param terminalOp the terminal operation to be applied to the pipeline.

* @return the result

*/

final R evaluate(TerminalOp terminalOp /* 各种终止操作 */) {

assert getOutputShape() == terminalOp.inputShape();

if (linkedOrConsumed)

throw new IllegalStateException(MSG_STREAM_LINKED);

linkedOrConsumed = true;

return isParallel()

? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */

: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */

}

如果被标记为sequential,则会调用TerminalOp.evaluateSequential,evaluateSequential的调用过程上文已经讲述的很清楚

如果被标记为parallel,则会调用TerminalOp.evaluateParallel,对于该方法不同的TerminalOp会有不同的实现,但都使用了ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果

默认情况下,ForkJoin的线程数即为机器的CPU核数,如果想自定义Stream并行执行的线程数,可以参考Custom Thread Pools In Java 8 Parallel Streams

在将原始数据进行拆分的时候,拆分的策略是什么?拆分的粒度又是什么(拆分到什么程度)?

还记得上文所说,原始数据是如何存放的么?Spliterator(可分迭代器 splitable iterator),无论使用何种API,均会将原始数据封装为Spliterator后存放在Stage中,在进行parallel计算时,对原始数据的拆分以及拆分粒度都是基于Spliterator的,和Iterator一样,Spliterator也用于遍历数据源中的数据,但它是专门为并行执行而设计的

public interface Spliterator {

/**

* 如果还有元素需要遍历,则遍历该元素并执行action,返回true,否则返回false

*/

boolean tryAdvance(Consumer super T> action);

/**

* 如果可以,则将一部分元素划分出去,构造另一个Spliterator,使得两个Spliterator可以并行处理

*/

Spliterator trySplit();

/**

* 估算还有多少元素需要遍历

*/

long estimateSize();

/**

* 遍历所有未遍历的元素

*/

default void forEachRemaining(Consumer super T> action) {

do { } while (tryAdvance(action));

}

}

动图如下

在使用Stream parallel时,如果默认Spliterator的拆分逻辑不能满足你的需求,便可以自定义Spliterator,具体示例可以参考《Java 8 in Action》中『7.3.2 实现你自己的Spliterator』

结语

Head会生成一个不包含任何操作的Stage,并将原始数据Spliterator存放在sourceStage中

中间操作StagelessOp StagefulOp将当前操作封装在Sink中,生成一个新的Stage,并使用双链表结构将前后的Stage链接在一起,Sink用于调用当前指定的操作处理数据,并将处理后的结果传递给下游Sink

终止操作TerminalOp生成一个TerminalSink,从下游向上游遍历Stage,不断包装各Stage中的Sink,最终生成一个串联了所有操作的TerminalSink,适时调用该Sink的begin accept end等方法,触发整个pipeline的数据流动及处理,最终调用TerminalSink的get方法,获取最终结果(如果有)

被标记为parallel的流,会使用ForkJoin框架,将原始流拆分为更小的单元,对每一个单元分别作计算,并将各单元的计算结果进行整合,得到最终结果

java8 stream 原理_【修炼内功】[Java8] Stream是怎么工作的相关推荐

  1. java8新特性_乐字节-Java8新特性-接口默认方法

    总概 JAVA8 已经发布很久,而且毫无疑问,java8是自java5(2004年发布)之后的最重要的版本.其中包括语言.编译器.库.工具和JVM等诸多方面的新特性. Java8 新特性列表如下: 接 ...

  2. java8新特性_乐字节-Java8新特性-函数式接口

    上一篇小乐带大家学过 Java8新特性-Lambda表达式,那什么时候可以使用Lambda?通常Lambda表达式是用在函数式接口上使用的.从Java8开始引入了函数式接口,其说明比较简单:函数式接口 ...

  3. java8 并行执行方法_如何在Java8中执行此并行任务

    使用CompletionService,更可能是ExecutorCompletionService. class Matcher { ExecutorService threadPool = Exec ...

  4. 红外测距模块工作原理_共享单车里的通讯模块,工作原理是啥呢?

    现在我们所看到了共享单车除了小黄车(OFO)没有配备GPS智能锁外,其他品牌的共享单车都有安装,那么这么高科技的东西具体是怎么工作的呢?下面由我给大家讲解下其中的奥秘. 其实这个东西也谈不上太多高科技 ...

  5. mybatis工作原理_万字好文!MyBatis 的工作原理,你了解过吗?

    回复 1024 有特别礼包 作者:江南入直 | 来源:cnblogs.com/scuury/p/10371246.html 上一篇:微信支付的架构到底有多牛? 近来想写一个mybatis的分页插件,但 ...

  6. rl滤波器原理_入门篇,层层讲解滤波电路工作原理

    在整流电路输出的电压是单向脉动性电压,不能直接给电子电路使用.所以要对输出的电压进行滤波, 消除电压中的交流成分,成为直流电后给电子电路使用.在滤波电路中,主要使用对交流电有特殊阻抗特性的器件,如:电 ...

  7. chroma负载机恒压工作原理_一款恒压/恒流充电器工作原理分析

    该充电器工作原理介绍如下,电路见附图所示. 1 .主电路 采用220V电网直接供电,经 KZ1 - KZ4 全控桥式整流,再经极性切换开关输出接负载 ( 蓄电池 ) .当蓄电池在充电工作方式时,切换开 ...

  8. bmp180气压传感器工作原理_【科普】40种传感器工作原理

    传感器(英文名称:transducer/sensor)是一种检测装置,能感受到被测量的信息,并能将感受到的信息,按一定规律变换成为电信号或其他所需形式的信息输出,以满足信息的传输.处理.存储.显示.记 ...

  9. 燃气灶电气线路图及原理_家用燃气灶解析之 热电偶的工作原理

    在家用燃气灶的炉头上通常配有点火针和热电偶熄火保护针.热电偶是燃气灶中很重要的一个组成部分,热电偶的好坏关系到燃气灶的点火反应时间和点火成功率.热电偶实际上是一种感温元件,它直接测量温度,并把温度信号 ...

  10. 数字调制系统工作原理_水暖BA系统组成及各部工作原理

    原标题:水暖BA系统组成及各部工作原理 来源:机电人脉 如有侵权,请联系删除 01 暖通空调系统 系统监控功能: 智能中的空调系统是指空调机组.新风机组,变风量机组,风机盘管等设备.其控制主要是指温. ...

最新文章

  1. Python环境配置保姆教程(Anaconda、Jupyter、GPU环境)!
  2. 顶会ASPLOS 新成果解析:用“弹性异构”防御DNN加速器对抗攻击
  3. elasticsearch查询
  4. c语言线性表顺序存储实验小结,数据结构学习笔记-线性表顺序存储(C语言实现)...
  5. php课程 6-20 字符串基础和去除空格和字符串填补函数
  6. Spring集成–从头开始应用程序,第2部分
  7. Session、Dialog和Transaction的区别
  8. 一个整型数组里除了两个数字之外,其他的数字都出现了两次。请写程序找出这两个只出现一次的数字
  9. 动态规划基础——爬楼梯(Leetcode 70)
  10. Gitlab分支保护
  11. python程序开发入门_程序设计入门—Python
  12. android modbus 串口,手机Modbus 安卓Modbus调试软件
  13. 统计学习方法——统计学习基础(一)
  14. 最新最全的微信小程序入门学习教程,微信小程序零基础入门到精通
  15. html5压缩视频文件大小,格式工厂怎么压缩视频大小 只需5步大视频变小视频
  16. 小游戏正在毁灭微信群聊(文中有福利)
  17. Flutter插件开发--获取Android手机电池信息
  18. Linux 文件属性详解
  19. Ubuntu中编写C语言程序
  20. 基尔霍夫电流/电压定律

热门文章

  1. 《android开发艺术探索》笔记:Activity界面跳转到透明主题界面,不调用onStop()方法
  2. 【毕业设计】 单片机自动写字机器人设计与实现 - 物联网 嵌入式 stm32
  3. 数学建模——相关系数
  4. 记2021沙尘漫天的春
  5. 2016杭州云栖大会随笔
  6. 有关“十二生肖”的成语
  7. 使用wireshark分析HTTPS数据包
  8. 使用Elasticsearch聚合搜索进行数据的分类统计
  9. Adversarial Machine Learning 经典算法解读(FGSM, DeepFool)
  10. html5中display flex,display:flex属性