遇到的问题

Java 8 开始引入了 Stream, 其中的 api 一直在不断的优化更新完善,Java 9 中更是引入了 ofNullable 还有 takeWhiledropWhile 这两个关键 api。有时候,我们想对 Stream 中的对象进行排重,默认的可以用 distinct 这个 api,例如:

List<String> collect = Arrays.stream("test1,test2,test2,test3,test3".split(",")).distinct().collect(Collectors.toList());

底层实现是LinkedHashMap,其实这个和下面的实现几乎是等价的:

Set<String> collect = Arrays.stream("test1,test2,test2,test3,test3".split(",")).collect(Collectors.toCollection(LinkedHashSet::new));

结果是一样的,靠hashcode()方法定位槽,equals()方法判断是否是同一个对象,如果是则排重被去掉,不是的话保留,通过LinkedHashMap来保留原始顺序。

但是,对于同一个对象,有时候我们排重的方式并不统一,所以最好像sorted接口一样,能让我们传入比较器,来控制如何判断两个对象相等需要排重。

例如下面的这个对象,我们有时候想按照id排重,有时候想按照name进行排重。

@Data
@NoArgsConstructor
public class User {private int id;private String name;
}

解决思考

首先来实现这个distinct方法。首先,我们定义一个Key类用来代理 hashcode 还有 equals 方法:

private static final class Key<E> {//要比较的对象private final E e;//获取对象的hashcode的方法private final ToIntFunction<E> hashCode;//判断两个对象是否相等的方法private final BiPredicate<E, E> equals;public Key(E e, ToIntFunction<E> hashCode,BiPredicate<E, E> equals) {this.e = e;this.hashCode = hashCode;this.equals = equals;}@Overridepublic int hashCode() {return hashCode.applyAsInt(e);}@Overridepublic boolean equals(Object obj) {if (!(obj instanceof Key)) {return false;}@SuppressWarnings("unchecked")Key<E> that = (Key<E>) obj;return equals.test(this.e, that.e);}
}

然后,增加新的distinct方法:

public Stream<T> distinct (ToIntFunction<T> hashCode,BiPredicate<T, T> equals,//排重的时候,保留哪一个?BinaryOperator<T> merger
) {return this.collect(Collectors.toMap(t -> new Key<>(t, hashCode, equals),Function.identity(),merger,//通过LinkedHashMap来保持原有的顺序LinkedHashMap::new)).values().stream();
}

然后,这个方法如何放入 Stream 呢? 我们首先想到的就是代理 Stream 接口,最简单的实现:

public class EnhancedStream<T> implements Stream<T> {private Stream<T> delegate;public EnhancedStream(Stream<T> delegate) {this.delegate = delegate;}private static final class Key<E> {//要比较的对象private final E e;//获取对象的hashcode的方法private final ToIntFunction<E> hashCode;//判断两个对象是否相等的方法private final BiPredicate<E, E> equals;public Key(E e, ToIntFunction<E> hashCode,BiPredicate<E, E> equals) {this.e = e;this.hashCode = hashCode;this.equals = equals;}@Overridepublic int hashCode() {return hashCode.applyAsInt(e);}@Overridepublic boolean equals(Object obj) {if (!(obj instanceof Key)) {return false;}@SuppressWarnings("unchecked")Key<E> that = (Key<E>) obj;return equals.test(this.e, that.e);}}public EnhancedStream<T> distinct(ToIntFunction<T> hashCode,BiPredicate<T, T> equals,//排重的时候,保留哪一个?BinaryOperator<T> merger) {return new EnhancedStream<>(delegate.collect(Collectors.toMap(t -> new Key<>(t, hashCode, equals),Function.identity(),merger,//通过LinkedHashMap来保持原有的顺序LinkedHashMap::new)).values().stream());}@Overridepublic EnhancedStream<T> filter(Predicate<? super T> predicate) {return new EnhancedStream<>(delegate.filter(predicate));}@Overridepublic <R> EnhancedStream<R> map(Function<? super T, ? extends R> mapper) {return new EnhancedStream<>(delegate.map(mapper));}@Overridepublic IntStream mapToInt(ToIntFunction<? super T> mapper) {return delegate.mapToInt(mapper);}@Overridepublic LongStream mapToLong(ToLongFunction<? super T> mapper) {return delegate.mapToLong(mapper);}@Overridepublic DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {return delegate.mapToDouble(mapper);}@Overridepublic <R> EnhancedStream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {return new EnhancedStream<>(delegate.flatMap(mapper));}@Overridepublic IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {return delegate.flatMapToInt(mapper);}@Overridepublic LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {return delegate.flatMapToLong(mapper);}@Overridepublic DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {return delegate.flatMapToDouble(mapper);}@Overridepublic EnhancedStream<T> distinct() {return new EnhancedStream<>(delegate.distinct());}@Overridepublic EnhancedStream<T> sorted() {return new EnhancedStream<>(delegate.sorted());}@Overridepublic EnhancedStream<T> sorted(Comparator<? super T> comparator) {return new EnhancedStream<>(delegate.sorted(comparator));}@Overridepublic EnhancedStream<T> peek(Consumer<? super T> action) {return new EnhancedStream<>(delegate.peek(action));}@Overridepublic EnhancedStream<T> limit(long maxSize) {return new EnhancedStream<>(delegate.limit(maxSize));}@Overridepublic EnhancedStream<T> skip(long n) {return new EnhancedStream<>(delegate.skip(n));}@Overridepublic void forEach(Consumer<? super T> action) {delegate.forEach(action);}@Overridepublic void forEachOrdered(Consumer<? super T> action) {delegate.forEachOrdered(action);}@Overridepublic Object[] toArray() {return delegate.toArray();}@Overridepublic <A> A[] toArray(IntFunction<A[]> generator) {return delegate.toArray(generator);}@Overridepublic T reduce(T identity, BinaryOperator<T> accumulator) {return delegate.reduce(identity, accumulator);}@Overridepublic Optional<T> reduce(BinaryOperator<T> accumulator) {return delegate.reduce(accumulator);}@Overridepublic <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {return delegate.reduce(identity, accumulator, combiner);}@Overridepublic <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {return delegate.collect(supplier, accumulator, combiner);}@Overridepublic <R, A> R collect(Collector<? super T, A, R> collector) {return delegate.collect(collector);}@Overridepublic Optional<T> min(Comparator<? super T> comparator) {return delegate.min(comparator);}@Overridepublic Optional<T> max(Comparator<? super T> comparator) {return delegate.max(comparator);}@Overridepublic long count() {return delegate.count();}@Overridepublic boolean anyMatch(Predicate<? super T> predicate) {return delegate.anyMatch(predicate);}@Overridepublic boolean allMatch(Predicate<? super T> predicate) {return delegate.allMatch(predicate);}@Overridepublic boolean noneMatch(Predicate<? super T> predicate) {return delegate.noneMatch(predicate);}@Overridepublic Optional<T> findFirst() {return delegate.findFirst();}@Overridepublic Optional<T> findAny() {return delegate.findAny();}@Overridepublic Iterator<T> iterator() {return delegate.iterator();}@Overridepublic Spliterator<T> spliterator() {return delegate.spliterator();}@Overridepublic boolean isParallel() {return delegate.isParallel();}@Overridepublic EnhancedStream<T> sequential() {return new EnhancedStream<>(delegate.sequential());}@Overridepublic EnhancedStream<T> parallel() {return new EnhancedStream<>(delegate.parallel());}@Overridepublic EnhancedStream<T> unordered() {return new EnhancedStream<>(delegate.unordered());}@Overridepublic EnhancedStream<T> onClose(Runnable closeHandler) {return new EnhancedStream<>(delegate.onClose(closeHandler));}@Overridepublic void close() {delegate.close();}
}

测试下:

public static void main(String[] args) {List<User> users = new ArrayList<>() {{add(new User(1, "test1"));add(new User(2, "test1"));add(new User(2, "test2"));add(new User(3, "test3"));add(new User(3, "test4"));}};List<User> collect1 = new EnhancedStream<>(users.stream()).distinct(User::getId,(u1, u2) -> u1.getId() == u2.getId(),(u1, u2) -> u1).collect(Collectors.toList());List<User> collect2 = new EnhancedStream<>(users.stream()).distinct(user -> user.getName().hashCode(),(u1, u2) -> u1.getName().equalsIgnoreCase(u2.getName()),(u1, u2) -> u1).collect(Collectors.toList());
}

通过动态代理

上面这种实现有很多冗余代码,可以考虑使用动态代理实现,首先编写代理接口类,通过EnhancedStream继承Stream接口,增加distinct接口,并让所有返回Stream的接口返回EnhancedStream,这样才能让返回有新的distinct接口可以使用。

public interface EnhancedStream<T> extends Stream<T> {EnhancedStream<T> distinct(ToIntFunction<T> hashCode,BiPredicate<T, T> equals,BinaryOperator<T> merger);@OverrideEnhancedStream<T> filter(Predicate<? super T> predicate);@Override<R> EnhancedStream<R> map(Function<? super T, ? extends R> mapper);@Override<R> EnhancedStream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);@OverrideEnhancedStream<T> distinct();@OverrideEnhancedStream<T> sorted();@OverrideEnhancedStream<T> sorted(Comparator<? super T> comparator);@OverrideEnhancedStream<T> peek(Consumer<? super T> action);@OverrideEnhancedStream<T> limit(long maxSize);@OverrideEnhancedStream<T> skip(long n);@OverrideEnhancedStream<T> takeWhile(Predicate<? super T> predicate);@OverrideEnhancedStream<T> dropWhile(Predicate<? super T> predicate);@OverrideEnhancedStream<T> sequential();@OverrideEnhancedStream<T> parallel();@OverrideEnhancedStream<T> unordered();@OverrideEnhancedStream<T> onClose(Runnable closeHandler);
}

然后,编写代理类EnhancedStreamHandler实现方法代理:

public class EnhancedStreamHandler<T> implements InvocationHandler {private Stream<T> delegate;public EnhancedStreamHandler(Stream<T> delegate) {this.delegate = delegate;}private static final Method ENHANCED_DISTINCT;static {try {ENHANCED_DISTINCT = EnhancedStream.class.getMethod("distinct", ToIntFunction.class, BiPredicate.class,BinaryOperator.class);} catch (NoSuchMethodException e) {throw new Error(e);}}/*** 将EnhancedStream的方法与Stream的方法一一对应*/private static final Map<Method, Method> METHOD_MAP =Stream.of(EnhancedStream.class.getMethods()).filter(m -> !m.equals(ENHANCED_DISTINCT)).filter(m -> !Modifier.isStatic(m.getModifiers())).collect(Collectors.toUnmodifiableMap(Function.identity(),m -> {try {return Stream.class.getMethod(m.getName(), m.getParameterTypes());} catch (NoSuchMethodException e) {throw new Error(e);}}));@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.equals(ENHANCED_DISTINCT)) {//调用方法为扩展方法distinctreturn distinct((EnhancedStream<T>) proxy,(ToIntFunction<T>) args[0],(BiPredicate<T, T>) args[1],(BinaryOperator<T>) args[2]);} else if (method.getReturnType() == EnhancedStream.class) {//对于返回类型为EnhancedStream的,证明是代理的方法调用,走代理Method match = METHOD_MAP.get(method);//更相信代理对象为新的Streamthis.delegate = (Stream) match.invoke(this.delegate, args);return proxy;} else {//否则,直接用代理类调用return method.invoke(this.delegate, args);}}private static final class Key<E> {private final E e;private final ToIntFunction<E> hashCode;private final BiPredicate<E, E> equals;public Key(E e, ToIntFunction<E> hashCode,BiPredicate<E, E> equals) {this.e = e;this.hashCode = hashCode;this.equals = equals;}@Overridepublic int hashCode() {return hashCode.applyAsInt(e);}@Overridepublic boolean equals(Object obj) {if (!(obj instanceof Key)) {return false;}@SuppressWarnings("unchecked")Key<E> that = (Key<E>) obj;return equals.test(this.e, that.e);}}private EnhancedStream<T> distinct(EnhancedStream<T> proxy,ToIntFunction<T> hashCode,BiPredicate<T, T> equals,BinaryOperator<T> merger) {delegate = delegate.collect(Collectors.toMap(t -> new Key<>(t, hashCode, equals),Function.identity(),merger,//使用LinkedHashMap,保持入参原始顺序LinkedHashMap::new)).values().stream();return proxy;}
}

最后编写工厂类,生成EnhancedStream代理类:

public class EnhancedStreamFactory {public static <E> EnhancedStream<E> newEnhancedStream(Stream<E> stream) {return (EnhancedStream<E>) Proxy.newProxyInstance(//必须用EnhancedStream的classLoader,不能用Stream的,因为Stream是jdk的类,ClassLoader是rootClassLoaderEnhancedStream.class.getClassLoader(),//代理接口new Class<?>[] {EnhancedStream.class},//代理类new EnhancedStreamHandler<>(stream));}
}

这样,代码看上去更优雅了,就算 JDK 以后扩展更多方法,这里也可不用修改

增强 Stream 接口的 distinct 方法的一些思考相关推荐

  1. java8新特新:接口流-Stream()接口和所有方法(map,foreach,filter,count,distinct,limit,collect,skip,sorted)

    Steam<T> 接口流 类型参数: T - 流元素的类型 所有超级接口: AutoCloseable,BaseStream <T,Stream <T >> 公共接 ...

  2. Java8 Stream接口流式方法:map操作、filter操作以及flatMap操作

    点击关注公众号,利用碎片时间学习 关于stream 流式操作,在rt.jar 包里面,ReferencePipeline管道方式操作数据 下面集成所有操作方法,利用这些流,处理大数据的方式,效率提升明 ...

  3. java9新特性--jShell--命令语法改进:try语和接口的私有方法--String存储结构变更--集合工厂方法--模块化系统--InputStream 加强--增强的 Stream API

    目录 Java 9 的新特性 JDK 和 JRE 目录结构的改变 模块化系统: Jigsaw -> Modularity 实现目标 模块 结构 jShell命令 产生背景 设计理念 实现目标 j ...

  4. Java23-day14【函数式接口(Supplier\Consumer\Predicate\Function)、Stream流(生产方式\中间方法\终结方法)】

    视频+资料[链接:https://pan.baidu.com/s/1MdFNUADVSFf-lVw3SJRvtg   提取码:zjxs] Java基础--学习笔记(零起点打开java世界的大门)--博 ...

  5. 学习笔记之-java8的新特性-函数式接口,lambda表达式,方法引用,Stream API,Optional类

    1.Lambda表达式 用匿名内部类的方法去创建多线程1.new Thread2.参数传递new Runnable3.重写run方法4.在run方法中去设置线程任务5.调用start问题:我们最终目标 ...

  6. 十三、Java高级特性 Lambda表达式 | 接口组成更新 | 方法引用 | 函数式接口

    文章目录 十三.Java高级特性 1.Lambda表达式 1.1体验Lambda表达式[理解] 1.2Lambda表达式的标准格式[理解] 1.3Lambda表达式练习1[应用] 1.4Lambda表 ...

  7. Java 8 Stream中间操作(方法)示例

    Java 8 Streams中间操作的完整指南. 所有内置Stream API中间操作(方法)的列表以及示例. 1.概述 在本教程中,我们将学习什么是 Java 8 Stream 中的中间操作 . 所 ...

  8. Java中 方法引用、Stream流、及方法实例 D190401

    Java中 方法引用.Stream流.及方法实例 D190401 01.第三章:方法引用_什么是方法引用 1).什么是"方法引用":指引用现有的方法代替Lambda表达式--当我们 ...

  9. Java基础之JDK8之Stream接口

    JDK1.8新特性之Stream 什么是Stream? Stream是元素的集合,这点让Stream看起来用些类似Iterator: 可以支持顺序和并行的对原Stream进行汇聚的操作: 我们可以把S ...

最新文章

  1. 自己写的哈希表以及解决哈希冲突
  2. Qt中使用线程的几种方式及区别
  3. 2020-11-24
  4. windows Server 2003 尝试安装.NET Framework 4 失败
  5. windows 下的文件对比工具
  6. 剑指offer之Runnable和Callable的区别
  7. 函数和常用模块【day04】:内置函数(十)
  8. [Erlang 0041] 详解io:format
  9. 02 Oracle 批量导出建表语句和数据
  10. 鸿蒙os系统3.0电脑,华为鸿蒙系统升级指南,速看!
  11. thinkpad T580加装内存条
  12. windows系统上安装.cab文件
  13. 智能电导率系统电路设计详解
  14. 集集自助服务密码重置
  15. 联想小新一键恢复小孔_联想一键恢复系统怎么用?小新Air 13 Pro怎么还原操作系统?...
  16. uniapp微信小程序实现地图导航
  17. web前端不用怕,外卖平台的项目开发流程,大全!!
  18. Alibaba Arthas 3.1.2版本发布:增加logger/heapdump/vmoption命令,支持tunnel server
  19. 计算机职业生涯规划书一万字,大学生职业规划一万字
  20. ISE FPGA时钟系统

热门文章

  1. Linux 自带的 LED 灯驱动
  2. ubuntu光盘定制化(在系统ISO里装自定义软件)
  3. Python3.8的下载与安装
  4. Microsoft Platform SDK Febrary 2003下载(更新VC6的SDK)
  5. 虚拟机无法建立与服务器的连接,无法通过 Vmconnect 连接到虚拟机 - Windows Server | Microsoft Docs...
  6. 【无标题】不出门怎么连接公司电脑,这6个工具让你不用来回跑。
  7. Hadoop-HDFS学习课件
  8. 手撸一个网页版看板(仿照板栗看板样式)
  9. 互联网时代养猪资源平台
  10. WAMPServer3.1.4集成环境的下载(最新版)