Java 8 中,流有一个非常大的(也可能是最大的)局限性,使用时,对它操作一次仅能得到一个处理结果。实际操作中,如果你试图多次遍历同一个流,结果只有一个,那就是遭遇下面这样的异常:
java.lang.IllegalStateException: stream has already been operated upon or closed
虽然流的设计就是如此,但我们在处理流时经常希望能同时获取多个结果。
本篇利用一个通用API,即Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现这一大有裨益的特性。

1.复制流

要达到在一个流上并发地执行多个操作的效果,你需要做的第一件事就是创建一个StreamForker,这个StreamForker会对原始的流进行封装,在此基础之上你可以继续定义你希望执行的各种操作。我们看看下面这段代码。

public class StreamForker<T> {private final Stream<T> stream;private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();public StreamForker(Stream<T> stream) {this.stream = stream;}/*** 这里的fork方法接受两个参数。* Function参数,它对流进行处理,将流转变为代表这些操作结果的任何类型。* key参数,通过它你可以取得操作的结果,并将这些键/函数对累积到一个内部的Map中。** @param key* @param f* @return*/public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {forks.put(key, f);return this; //返回this从而保证多次流畅地调用fork方法}public Results getResults() {ForkingStreamConsumer<T> consumer = build();try {stream.sequential().forEach(consumer);} finally {consumer.finish();}return consumer;}}

所有由fork方法添加的操作的执行都是通过getResults方法的调用触发的,该方法返回一个Results接口的实现,具体的定义如下:

 public interface Results {<R> R get(Object key);
}
1.1 使用 ForkingStreamConsumer 实现 Results 接口

你可以用下面的方式实现getResults方法:

 public Results getResults() {ForkingStreamConsumer<T> consumer = build();try {stream.sequential().forEach(consumer);} finally {consumer.finish();}return consumer;}

ForkingStreamConsumer同时实现了前面定义的Results接口和Consumer接口。随着我们进一步剖析它的实现细节,你会看到它主要的任务就是处理流中的元素,将它们分发到多个BlockingQueues中处理,BlockingQueues的数量和通过fork方法提交的操作数是一致的。注意,我们很明确地知道流是顺序处理的,不过,如果你在一个并发流上执行forEach方法,它的元素可能就不是顺序地被插入到队列中了。finish方法会在队列的末尾插入特殊元素表明该队列已经没有更多需要处理的元素了。build方法主要用于创建ForkingStreamConsumer。

 private ForkingStreamConsumer<T> build() {//创建由队列组成的列表,每一个队列对应一个操作List<BlockingQueue<T>> queues = new ArrayList<>();//建立用于标识操作的键与包含操作结果的Future之间的映射关系HashMap<Object, Future<?>> actions = forks.entrySet().stream().reduce(new HashMap<>(),(map, e) -> {map.put(e.getKey(), getOperationResult(queues, e.getValue()));return map;},(m1, m2) -> {m1.putAll(m2);return m1;});return new ForkingStreamConsumer<>(queues, actions);}

首先创建了我们前面提到的由BlockingQueues组成的列表。紧接着,你创建了一个Map,Map的键就是你在流中用于标识不同操作的键,值包含在Future中,Future中包含了这些操作对应的处理结果。BlockingQueues的列表和Future组成的Map会被传递给ForkingStreamConsumer的构造函数。每个Future都是通过getOperationResult方法创建。

 private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {BlockingQueue<T> queue = new LinkedBlockingDeque<>();queues.add(queue);//创建一个队列并将其添加到队列的列表中Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);//创建一个流,将Spliterator作为数据源Stream<T> source = StreamSupport.stream(spliterator, false);//创建一个Future对象,以异步方式计算在流上执行特定函数的结果return CompletableFuture.supplyAsync(() -> f.apply(source));}

getOperationResult方法会创建一个新的BlockingQueue,并将其添加到队列的列表。这个队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的
Spliterator,它会遍历读取队列中的每个元素。
     接下来创建了一个顺序流对该Spliterator进行遍历,最终会创建一个Future在流上执行某个希望的操作并收集其结果。这里的Future使用CompletableFuture类的一个静态工厂方法创建,CompletableFuture实现了Future接口。

1.2 开发 ForkingStreamConsumer 和 BlockingQueueSpliterator
 static class ForkingStreamConsumer<T> implements Consumer<T>, Results {static final Object END_OF_STREAM = new Object();private final List<BlockingQueue<T>> queues;private final Map<Object, Future<?>> actions;ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {this.queues = queues;this.actions = actions;}@Overridepublic void accept(T t) {queues.forEach(q -> q.add(t));//将流中遍历的元素添加到所有的队列中}/*** 将最后一个元素添加到队列中,* 表明该流已经结束*/void finish() {accept((T) END_OF_STREAM);}/*** 等待futuire完成相关的计算,返回由特定键标识的处理结果** @param key* @param <R>* @return*/@Overridepublic <R> R get(Object key) {try {return ((Future<R>) actions.get(key)).get();} catch (Exception e) {throw new RuntimeException(e);}}}

这个类同时实现了Consumer和Results接口,并持有两个引用,一个指向由BlockingQueues组成的列表,另一个是执行了由Future构成的Map结构,它们表示的是即将在流上执行的各种操作。
    Consumer接口要求实现accept方法。这里,每当ForkingStreamConsumer接受流中的一个元素,它就会将该元素添加到所有的BlockingQueues中。另外,当原始流中的所有元素都添
加到所有队列后,finish方法会将最后一个元素添加所有队列。BlockingQueueSpliterators碰到最后这个元素时会知道队列中不再有需要处理的元素了。
    Results接口需要实现get方法。一旦处理结束,get方法会获得Map中由键索引的Future,解析处理的结果并返回。
    最后,流上要进行的每个操作都会对应一个BlockingQueueSpliterator。每个BlockingQueueSpliterator都持有一个指向BlockingQueues的引用,这个BlockingQueues是由ForkingStreamConsumer 生成的。

/*** 一个遍历BlockingQueue并读取其中元素的Spliterator* @param <T>*/
class BlockingQueueSpliterator<T> implements Spliterator<T> {private final BlockingQueue<T> q;BlockingQueueSpliterator(BlockingQueue<T> q) {this.q = q;}@Overridepublic boolean tryAdvance(Consumer<? super T> action) {T t;while (true) {try {t = q.take();break;} catch (InterruptedException e) {}}if (t != ForkingStreamConsumer.END_OF_STREAM) {action.accept(t);return true;}return false;}@Overridepublic Spliterator<T> trySplit() {return null;}@Overridepublic long estimateSize() {return 0;}@Overridepublic int characteristics() {return 0;}}

这段代码实现了一个Spliterator,不过它并未定义如何切分流的策略,仅仅利用了流的延迟绑定能力。由于这个原因,它也没有实现trySplit方法。

由于无法预测能从队列中取得多少个元素,所以estimatedSize方法也无法返回任何有意义的值。更进一步,由于你没有试图进行任何切分,所以这时的估算也没什么用处。

这一实现并没有体现Spliterator的任何特性,因此characteristic方法返回0。

这段代码中提供了实现的唯一方法是tryAdvance,它从BlockingQueue中取得原始流中的元素,而这些元素最初由ForkingSteamConsumer添加。依据getOperationResult方法创建Spliterator同样的方式,这些元素会被作为进一步处理流的源头传递给Consumer对象(在流上要执行的函数会作为参数传递给某个fork方法调用)。tryAdvance方法返回true通知调用方还有其他的元素需要处理,直到它发现由ForkingSteamConsumer添加的特殊对象,表明队列中已经没有更多需要处理的元素了。

2. 所有代码

public class StreamForker<T> {private final Stream<T> stream;private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();public StreamForker(Stream<T> stream) {this.stream = stream;}/*** 这里的fork方法接受两个参数。* Function参数,它对流进行处理,将流转变为代表这些操作结果的任何类型。* key参数,通过它你可以取得操作的结果,并将这些键/函数对累积到一个内部的Map中。** @param key* @param f* @return*/public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {forks.put(key, f);return this; //返回this从而保证多次流畅地调用fork方法}public Results getResults() {ForkingStreamConsumer<T> consumer = build();try {stream.sequential().forEach(consumer);} finally {consumer.finish();}return consumer;}private ForkingStreamConsumer<T> build() {//创建由队列组成的列表,每一个队列对应一个操作List<BlockingQueue<T>> queues = new ArrayList<>();//建立用于标识操作的键与包含操作结果的Future之间的映射关系HashMap<Object, Future<?>> actions = forks.entrySet().stream().reduce(new HashMap<>(),(map, e) -> {map.put(e.getKey(), getOperationResult(queues, e.getValue()));return map;},(m1, m2) -> {m1.putAll(m2);return m1;});return new ForkingStreamConsumer<>(queues, actions);}private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {BlockingQueue<T> queue = new LinkedBlockingDeque<>();queues.add(queue);//创建一个队列并将其添加到队列的列表中Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);//创建一个流,将Spliterator作为数据源Stream<T> source = StreamSupport.stream(spliterator, false);//创建一个Future对象,以异步方式计算在流上执行特定函数的结果return CompletableFuture.supplyAsync(() -> f.apply(source));}static class ForkingStreamConsumer<T> implements Consumer<T>, Results {static final Object END_OF_STREAM = new Object();private final List<BlockingQueue<T>> queues;private final Map<Object, Future<?>> actions;ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {this.queues = queues;this.actions = actions;}@Overridepublic void accept(T t) {queues.forEach(q -> q.add(t));//将流中遍历的元素添加到所有的队列中}/*** 将最后一个元素添加到队列中,* 表明该流已经结束*/void finish() {accept((T) END_OF_STREAM);}/*** 等待futuire完成相关的计算,返回由特定键标识的处理结果** @param key* @param <R>* @return*/@Overridepublic <R> R get(Object key) {try {return ((Future<R>) actions.get(key)).get();} catch (Exception e) {throw new RuntimeException(e);}}}/*** 一个遍历BlockingQueue并读取其中元素的Spliterator* @param <T>*/class BlockingQueueSpliterator<T> implements Spliterator<T> {private final BlockingQueue<T> q;BlockingQueueSpliterator(BlockingQueue<T> q) {this.q = q;}@Overridepublic boolean tryAdvance(Consumer<? super T> action) {T t;while (true) {try {t = q.take();break;} catch (InterruptedException e) {}}if (t != ForkingStreamConsumer.END_OF_STREAM) {action.accept(t);return true;}return false;}@Overridepublic Spliterator<T> trySplit() {return null;}@Overridepublic long estimateSize() {return 0;}@Overridepublic int characteristics() {return 0;}}interface Results {<R> R get(Object key);}public static void main(String[] args) {//测试List<Integer> menu = Arrays.asList(1, 2, 3, 4, 5, 6, 20, 40, 60);Results results = new StreamForker<Integer>(menu.stream()).fork("max", s -> s.max(Integer::compareTo)).fork("sum", s -> s.collect(Collectors.summarizingInt(Integer::intValue))).getResults();System.out.println("max:"+results.get("max"));System.out.println("sum:"+results.get("sum"));}
}

3. 性能的考量

提起性能,你不应该想当然地认为这种方法比多次遍历流的方式更加高效。如果构成流的数据都保存在内存中,阻塞式队列所引发的开销很容易就抵消了由并发执行操作所带来的性能
提升。

与此相反,如果操作涉及大量的I/O,譬如流的源头是一个巨型文件,那么单次访问流可能是个不错的选择;因此(大多数情况下)优化应用性能唯一有意义的规则是“好好地度量它”。

通过这个例子,我们展示了怎样一次性地在同一个流上执行多个操作。更重要地是,我们相信这个例子也证明了一点,即使某个特性原生的Java API暂时还不支持,充分利用Lambda表达式的灵活性和一点点的创意,整合现有的功能,你完全可以实现想要的新特性。

Java 8 如何以并发方式在同一个流 上执行多种操作相关推荐

  1. java创建对象时分配内存方式,是堆上分配还是栈上分配?

    创建对象的内存是分配在堆上还是栈上面?大部分童鞋的回答是这样的:"肯定分配在堆内存的嘛,栈内存是属于子线程和基本数据类型专用的内存空间,怎么会分配到栈上面呢?",这个回答嘛,也对, ...

  2. java file 其他电脑上,java - Jar文件无法在另一台PC上执行

    我有一个可编程的jar文件,我从我的程序编译,我在我的电脑上运行它 . 当我在命令提示符下使用 java -jar [nameofjar.jar] 运行它时它完全正常工作 但是,我尝试在另一台电脑上测 ...

  3. java动态同步_java并发基础-Synchronized

    基础使用 基本上Java程序员都简单的了解synchronized的使用: 无非就是用在多线程环境下的同步. 看如下简单的例子: publicclassUnsafeCounter{ privatein ...

  4. Java多线程与线程并发库高级应用笔记

    以下内容是学习张老师Java多线程与线程并发库高级应用时所做的笔记,很有用 网络编辑器直接复制Word文档排版有点乱,提供原始文件下载 先看源文件概貌 张孝祥_Java多线程与并发库高级应用 [视频介 ...

  5. Java多线程系列(三):Java线程池的使用方式,及核心运行原理

    之前谈过多线程相关的4种常用Java线程锁的特点,性能比较.使用场景,今天主要分享线程池相关的内容,这些都是属于Java面试的必考点. 为什么需要线程池 java中为了提高并发度,可以使用多线程共同执 ...

  6. java stream 有序_Java8新特性之Stream流专题四 并行流

    随着对流API认识的慢慢深入,本章我们要讨论的知识点是流API里面的并行流了. 在开始讨论并行流之前,我先引发一下大家的思考,就你看到这篇文章的时间,你们是不是经常听到,Intel i7 CPU什么8 ...

  7. 5W字高质量java并发系列详解教程(上)-附PDF下载

    文章目录 第一章 java.util.concurrent简介 主要的组件 Executor ExecutorService ScheduledExecutorService Future Count ...

  8. Java多线程(五) —— 线程并发库之锁机制

    参考文献: http://www.blogjava.net/xylz/archive/2010/07/08/325587.html 一.Lock与ReentrantLock 前面的章节主要谈谈原子操作 ...

  9. java逸出_Java并发编程 - 对象的共享

    编写正确的并发程序,关键问题在于:在访问共享的可变状态时需要进行正确的管理.同步代码块和同步方法可以确保以原子的方式执行操作,同步还有另一个重要的方面:内存可见性. 可见性 为了确保多个线程之间对内存 ...

最新文章

  1. 平衡二叉排序树的创建和实现调整过程
  2. IT运维管理方案 成就企业信息化建设
  3. day04-html
  4. 第十一周学习总结--助教
  5. 解决SpringBoot使用Quartz无法注入Bean的问题
  6. ITK:使用Viola Wells互信息执行多模式注册
  7. node --- 使用mongoose连接mongoDB,并初始化所有的Schema
  8. C语言多个变量运算存储过程,postgresql函数中的赋值运算和postgresql函数存储过程实现数据批量插入...
  9. P4555 最长双回文串
  10. python植树问题代码_BERT可以上几年级了?Seq2Seq“硬刚”小学数学应用题
  11. 人件管理与中国古代史:程序员豫让
  12. 饥荒联机版服务器启动慢_饥荒联机版大型攻略——简介与目录
  13. ua解析接口_VIP电影解析接口(80个)
  14. 单词记忆分类系统化--000
  15. SOA技术专家作客CSDN聊天室
  16. ssh: connect to host xx.xx.xxx.xxx port 22: Connection refused
  17. 《即刻电音》:大张伟遭遇“团灭”危机情绪失控
  18. PVC塑料加速老化测试介绍
  19. 华南理工大学计算机/软件 复试 经验贴整理
  20. 满足大别墅无线组网 这套无线路由产品竟毫不吃力!

热门文章

  1. 大脑结构及脑电信号基础知识
  2. 计算机编程人员英语翻译,计算机编程英语怎么说,电脑中常用的英文翻译
  3. 微信公众号上传永久图片素材、图文素材,以及群发已上传的图文素材
  4. 芝麻代理ip:你知道怎么选择l2tp和pptp协议嘛
  5. 什么是品牌营销?学会正确推广您的业务
  6. 解决Jetbrains旗下产品的插件下载失败问题(IntelliJ IDEA、RubyMine、WebStorm、PhpStorm、PyCharm、AppCode、Android Studio等)
  7. 安全行业从业者自研开源扫描器合辑
  8. android玩游戏怎么换绑,QQ炫舞手游该帐号已在安卓平台使用 请切换至安卓平台登录该账号或更换大区...
  9. 安卓淘宝商店界面之高仿类CollapsinToolBar 直接搬运到项目中
  10. 论 Web 2.0 时代PHP的地位