目录

  • 前言
  • Collector的基础知识
  • Collector源码
  • 一个简单的Collector实现类
  • 模拟Stream,使用Collector实现一个简单的年龄计算

Stream的用法可以参考下文:

  • Java8 Stream使用方法:筛选、排序、最大值、最小值、计数求和平均数、分组、合并、映射、去重等
  • Collectors.groupingBy的四种用法 解决分组统计(计数、求和、平均数等)、范围统计、分组合并、分组结果自定义映射等问题

前言

本篇还处于待完善阶段,目前仅仅是使用了自己的方法来实现Stream对流的处理。因此暂时先写一篇文章做记录。


Collector的基础知识

Collector<T, A, R>范型的含义:

  • <T>:规约操作(reduction operation)的输入元素类型
  • <A>:是规约操作的输出结果类型,该类型是可变可累计的,可以是各种集合容器,或者具有累计操作(如add)的自定义对象。
  • <R>:规约操作结果经过转换操作后返回的最终结果类型

Collector中方法定义,下面的方法的返回值都可以看作函数(function):

  • Supplier<A> supplier():该函数创建并返回新容器对象。
  • BiConsumer<A, T> accumulator():该函数将把元素值放入容器对象,并返回容器。
  • BinaryOperator<A> combiner():该函数会把两个容器(此时每个容器都是处理流元素的部分结果)合并,该函数可以返回这两个容器中的一个,也可以返回一个新的容器。
  • Function<A, R> finisher():该函数将执行最终的转换,它会将combiner的最终合并结果A转变为R。
  • Set<Characteristics> characteristics():提供集合列表,该列表将提供当前Collector的一些特征值。这些特征将会影响上述函数的表现。

上述函数的语法:

  • Supplier<T>#T get():调用一个无参方法,返回一个结果。一般来说是构造方法的方法引用。
  • BiConsumer<T, U>#void accept(T t, U u):根据给定的两个参数,执行相应的操作。
  • BinaryOperator<T> extends BiFunction<T,T,T>#T apply(T t, T u):合并t和u,返回其中之一,或创建一个新对象放回。
  • Function<T, R>#R apply(T t):处理给定的参数,并返回一个新的值。

Collector源码

public interface Collector<T, A, R> {Supplier<A> supplier();BiConsumer<A, T> accumulator();BinaryOperator<A> combiner();Function<A, R> finisher();Set<Characteristics> characteristics();}
// Collector#Characteristics
enum Characteristics {// 表明Collector是否用于并发CONCURRENT,// 表明Collector是否会保留原容器的顺序UNORDERED,// 表明accumulator函数结果类型是否等于finisher函数,默认为空,当设置该特征时,那么finisher函数将执行A到R的未经检查的强制转换。IDENTITY_FINISH
}

一个简单的Collector实现类

Collector的实现类很简单,它将用于存储用户输出的各项函数。

public class SimpleCollector<T, A, R> implements Collector<T, A, R> {private final Supplier<A> supplier;private final BiConsumer<A, T> accumulator;private final BinaryOperator<A> combiner;private final Function<A, R> finisher;private final Set<Characteristics> characteristics;public SimpleCollector(Supplier<A> supplier,BiConsumer<A, T> accumulator,BinaryOperator<A> combiner,Function<A, R> finisher,Set<Characteristics> characteristics) {this.supplier = supplier;this.accumulator = accumulator;this.combiner = combiner;this.finisher = finisher;this.characteristics = characteristics;}@Overridepublic Supplier<A> supplier() {return supplier;}@Overridepublic BiConsumer<A, T> accumulator() {return accumulator;}@Overridepublic BinaryOperator<A> combiner() {return combiner;}@Overridepublic Function<A, R> finisher() {return finisher;}@Overridepublic Set<Characteristics> characteristics() {return characteristics;}
}

模拟Stream,使用Collector实现一个简单的年龄计算

基于对Collector原理的粗浅了解和StreamBuilderImplReferencePipelineForEachTask等源码的解析,模仿Stream的思路自己写了一个使用Collector的流程,方便理解。不过由于ReferencePipeline源码较为复杂,对一些实现的理解还不够深刻,有错难免。

1 执行入口,根据特征判断是否使用多线程,并对每个线程的结果进行合并,最后将合并的结果转为最终返回值

public static <T, R, A> A execute(ExecutorService threadPool, Collection<T> data, Collector<T, R, A> collector) throws ExecutionException, InterruptedException {Objects.requireNonNull(threadPool, "threadPool");Objects.requireNonNull(data, "data");Objects.requireNonNull(collector, "collector");// 查询特征,判断是否要进行分段处理Set<Collector.Characteristics> characteristics = collector.characteristics();int segment = 1;if (characteristics.contains(Collector.Characteristics.CONCURRENT)) {segment = data.size() / Runtime.getRuntime().availableProcessors() + 1;}// 集合分段用于多线程,以便不会对同一数据多次计算Collection<Collection<T>> segmentList = ListUtil.segmentList(data, segment);List<CompletableFuture<R>> completableFutureList = new ArrayList<>(segmentList.size());for (Collection<T> collection : segmentList) {// 并发情况下就不能保证累积函数执行的顺序,也就无法保证最终结果的顺序性(源码中分别使用了ForEachOrderedTask | ForEachTask)CompletableFuture<R> async = CompletableFuture.supplyAsync(() -> {return CollectorUsageDemo.dealWithElement(collection, collector);});completableFutureList.add(async);}CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));CompletableFuture<R> result = allOf.thenApply(v -> {// 初始化容器 起初初始容器也将作为函数计算的一部分, 这里将容器合并,并返回新的容器R r = collector.supplier().get();for (CompletableFuture<R> f1 : completableFutureList) {R r2 = f1.join();r = collector.combiner().apply(r, r2);}return r;});// 合并容器后的最终结果R last = result.get();if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) {return (A) last;}// 将R转为最终的结果类型Areturn collector.finisher().apply(last);
}

2 执行容器对每个元素的处理

public static <T, R, A> R dealWithElement(Collection<T> data, Collector<T, R, A> collector) {// 初始化一个容器R container = collector.supplier().get();// 遍历data集合,将每个元素通过accumulator函数进行规约for (T t : data) {collector.accumulator().accept(container, t);}return container;
}

3 测试

public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(5);List<Student> student = Student.getStudent();// 比如我们想实现一个类似Collectors.joining()的功能Set<Collector.Characteristics> characteristics = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,Collector.Characteristics.UNORDERED));Collector<Student, AtomicInteger, Integer> collector = new SimpleCollector<>(AtomicInteger::new, (AtomicInteger i, Student s) -> i.addAndGet(s.getAge()),(i, i1) -> {i.addAndGet(i1.get());return i;}, AtomicInteger::get, characteristics);Integer execute = execute(executorService, student, collector);System.out.println(execute);
}

输出结果:

121

源码:CollectorUsageDemo

【Java8 Stream】:探秘Stream实现的核心:Collector,模拟Stream的实现相关推荐

  1. Java8新特性学习(lambda,函数式接口,stream,Optional)

    一. Lambda Lambda 是一个匿名函数,我们可以把 Lambda表达式理解为是一段可以传递的代码(将代码像数据一样进行传递).可以写出更简洁.更灵活的代码.作为一种更紧凑的代码风格,使Jav ...

  2. Java8 Lambda表达式(三)Lambda表达式与Stream API

    目录 1 Stream基本概念 1.1 什么是Stream 1.2 Stream的特点 2 Stream API初体验 2.1 构造数据 2.1 使用for循环实现 2.2 使用Lambda表达式结合 ...

  3. java8从入门到精通2:强大的Stream

    Stream(流)是什么 流是Java API的新成员,它允许你以声明性方式处理数据集合,你可以把它们看成遍历数据集的高级迭代器.此外,流还可以透明地并行处理,你无需写任何多线程代码了,不过并行处理在 ...

  4. stream of java_java8新特性之强大的Stream API

    Stream API Stream是Java8中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找.过滤和映射数据等操作. 使用Stream API 对集合数据进行操作, ...

  5. stream流常用方法_Java8 中用法优雅的 Stream,怪不得我之前总是看不懂别人写的代码!...

    Java8的新特性主要是Lambda表达式和流,当流和Lambda表达式结合起来一起使用时,因为流申明式处理数据集合的特点,可以让代码变得简洁易读 放大招,流如何简化代码 如果有一个需求,需要对数据库 ...

  6. hashmap用stream流操作_基础篇:JAVA.Stream函数,优雅的数据流操作

    前言 平时操作集合数据,我们一般都是for或者iterator去遍历,不是很好看.java提供了Stream的概念,它可以让我们把集合数据当做一个个元素在处理,并且提供多线程模式 流的创建 流的各种数 ...

  7. 描述 C 库函数 int fseek(FILE *stream, long int offset, int whence) 设置流 stream 的文件位置为给定的偏移 offset,参数 offs

    描述 C 库函数 int fseek(FILE *stream, long int offset, int whence) 设置流 stream 的文件位置为给定的偏移 offset,参数 offse ...

  8. ffmpeg error:mp4 stream error: no moov before mdat and the stream is not seekable

    VLC播放器报错 mp4 stream error: no moov before mdat and the stream is not seekable 解决方法 ffmpeg生成moov是在mda ...

  9. tcmalloc(google开源项目核心部分模拟实现)

    TcMalloc项目实现--高并发内存池(google开源项目核心部分模拟实现) 一.项目介绍 二.什么是内存池 2.1 池化技术 2.2 内存池 2.3 内存池 三.定长内存池 四.整体框架设计 T ...

最新文章

  1. Log4j显示异常的奇怪问题
  2. 使用jQuery更改图片来源
  3. A - 小C语言--词法分析程序
  4. ftp连接 java.net.ConnectException: Connection refused
  5. php Closure 类型
  6. JavaOne 2012:非阻塞数据结构如何工作?
  7. WordPress开发暗黑系列流量主收益高清壁纸小程序-可二开-无授权
  8. linux-文件管理-不完整版
  9. 元宵快乐:看SQL大师们用SQL绘制的团圆
  10. 线程,Python 实现多任务的方式之一
  11. 为什么要预留字段_PaperYY为什么会免费论文查重?
  12. windows API 菜鸟学习之路(二)
  13. 深圳市房事与公积金相关疑问
  14. VirtualBox6.1安装及使用教程
  15. 关键时刻救一命:旧手机改造求生工具
  16. 电商系统PC商城模块介绍
  17. 解决安装MySQL后,Excel打开很慢的问题
  18. 修复版动态视频壁纸微信小程序源码下载,支持多种类型流量主收益
  19. AVR中C语言while语言,AVR单片机汇编语言程序实例
  20. 计算机域名(domain name)

热门文章

  1. 三级linux网络管理及应用技术,浙江省大学计算机三级Linux网络管理及应用.doc
  2. C++ 重载操作符与转换
  3. 错过短视频,微博奔向新浪
  4. 骁龙 820A 汽车系统级芯片的优越性
  5. 过山车之星 Planet Coaster Mac
  6. 利用ADS进行低噪声放大器(LAN)的设计
  7. 动态pptp、l2tp、sstp搭建脚本
  8. Windows server 2008搭建pptp
  9. hp 硒鼓对照表及其平均打印页数
  10. autojs自动化脚本怎么在电脑上开发, 详细的靠谱教程!!!