文章目录

  • 收集器简介
    • 什么是流收集器
    • 一个get start告诉你流收集器的强大之处
    • 流收集器工作原理简析
  • 规约和汇总
    • 查找流中的最大值和最小值
    • 汇总和和一次性获取所有规约结果
    • 连接字符串
    • 关于规约汇总的一些补充和建议
  • 分组
    • 基本示例
    • 多级分组
    • 按子组收集数据
  • 分区
    • 分区的优势
    • 将数字按质数和非质数分区
  • 一个示例了解流收集器工作原理(重点)
  • 基于自定义收集提高质数收集器性能
    • 简介
    • 开发过程
      • 分析现有质数收集的不足
      • 逐步完成编码
    • 性能测试
  • 源码地址
  • 参考文献

收集器简介

什么是流收集器

用过流编程的小伙伴大概都会写过这样的代码

userList.stream().map(User::getName).collect(toList());

可以看出流让某些规约操作变得很简单,只需我们整理好自己所需要的数据的流,就像上文一样,我们希望创建一个用户名字的列表,我们只需要通过map映射出所有用户名的流,然后使用终端操作即collect方法完成流的最终结果生成即可。
本文就是介绍collect中可以传的参数,流收集器,通过各种各样的流收集器,你就可以完成你所需要的数据的收集。

一个get start告诉你流收集器的强大之处

为了介绍流收集,我们得先介绍一个示例,我们现在有一个交易系统,交易系统中记录着所有的交易订单。每笔交易订单都是由金额和交易币种完成,对应的类描述如下

public static class Transaction {private final Currency currency;private final double value;public Transaction(Currency currency, double value) {this.currency = currency;this.value = value;}public Currency getCurrency() {return currency;}public double getValue() {return value;}@Overridepublic String toString() {return currency + " " + value;}}public enum Currency {EUR, USD, JPY, GBP, CHF}

使用交易订单类构成下面一个集合

public static List<Transaction> transactions = Arrays.asList( new Transaction(Currency.EUR, 1500.0),new Transaction(Currency.USD, 2300.0),new Transaction(Currency.GBP, 9900.0),new Transaction(Currency.EUR, 1100.0),new Transaction(Currency.JPY, 7800.0),new Transaction(Currency.CHF, 6700.0),new Transaction(Currency.EUR, 5600.0),new Transaction(Currency.USD, 4500.0),new Transaction(Currency.CHF, 3400.0),new Transaction(Currency.GBP, 3200.0),new Transaction(Currency.USD, 4600.0),new Transaction(Currency.JPY, 5700.0),new Transaction(Currency.EUR, 6800.0) );

这时候,我们希望能够根据货币的种类划分所有的订单,该怎么做呢?
假如你在java8之前,你一定会遍历这个list,然后声明一个map,根据货币的种类,将订单塞到对应的键的list中,代码如下

private static void groupImperatively() {Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();for (Transaction transaction : transactions) {Currency currency = transaction.getCurrency();List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);if (transactionsForCurrency == null) {transactionsForCurrency = new ArrayList<>();transactionsByCurrencies.put(currency, transactionsForCurrency);}transactionsForCurrency.add(transaction);}System.out.println(transactionsByCurrencies);}

可以看到传统的指令式编码仅仅为了一个分组,写了这么多代码,但是java8的函数式编程改变了这一现状,可以看到一行代码,仅仅用了一个简单的终端操作collect(groupingBy(Transaction::getCurrency))既有语义又完美的解决了问题,这就是本文所要介绍的流收集器

private static void groupFunctionally() {Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));System.out.println(transactionsByCurrencies);//输出结果 {USD=[USD 2300.0, USD 4500.0, USD 4600.0], CHF=[CHF 6700.0, CHF 3400.0], EUR=[EUR 1500.0, EUR 1100.0, EUR 5600.0, EUR 6800.0], JPY=[JPY 7800.0, JPY 5700.0], GBP=[GBP 9900.0, GBP 3200.0]}}

流收集器工作原理简析

以上文交易分组示例为例,实际上流的底层所作的工作也很简单,分为三步:

  1. 遍历list
  2. 使用转换函数,提取出每个订单的货币
  3. 将订单存到对应的货币键的list中

规约和汇总

查找流中的最大值和最小值

为了演示流收集器获取最大值和最小值,我们再介绍一个需求,我们现在有一个菜肴集合,通过遍历这个集合我们可以知道菜肴的热量,菜名等信息
菜肴类描述和菜肴的集合代码如下

public class Dish {private final String name;private final boolean vegetarian;private final int calories;private final Type type;public Dish(String name, boolean vegetarian, int calories, Type type) {this.name = name;this.vegetarian = vegetarian;this.calories = calories;this.type = type;}public String getName() {return name;}public boolean isVegetarian() {return vegetarian;}public int getCalories() {return calories;}public Type getType() {return type;}public enum Type { MEAT, FISH, OTHER }@Overridepublic String toString() {return name;}public static final List<Dish> menu =asList( new Dish("pork", false, 800, Dish.Type.MEAT),new Dish("beef", false, 700, Dish.Type.MEAT),new Dish("chicken", false, 400, Dish.Type.MEAT),new Dish("french fries", true, 530, Dish.Type.OTHER),new Dish("rice", true, 350, Dish.Type.OTHER),new Dish("season fruit", true, 120, Dish.Type.OTHER),new Dish("pizza", true, 550, Dish.Type.OTHER),new Dish("prawns", false, 400, Dish.Type.FISH),new Dish("salmon", false, 450, Dish.Type.FISH));public static final Map<String, List<String>> dishTags = new HashMap<>();static {dishTags.put("pork", asList("greasy", "salty"));dishTags.put("beef", asList("salty", "roasted"));dishTags.put("chicken", asList("fried", "crisp"));dishTags.put("french fries", asList("greasy", "fried"));dishTags.put("rice", asList("light", "natural"));dishTags.put("season fruit", asList("fresh", "natural"));dishTags.put("pizza", asList("tasty", "salty"));dishTags.put("prawns", asList("tasty", "roasted"));dishTags.put("salmon", asList("delicious", "fresh"));}
}

假如我们希望获取菜肴中热量最高的一道菜,要怎么做呢?
假如在之前你一定会声明一个比较器,然后通过sort排序,然后找到最大值或者最小值。亦或者直接来一个中间变量记录当前遍历时热量最大的菜肴,直到遍历结束。
无论哪种方法,这种指令性代码的代码行数都不少,而且易读性也非常差,但是流收集器的出现解决了这个问题。我们只需要按照如下三步即可解决

  1. 将集合转为流
  2. 使用终端操作collect
  3. 调用Collectors为我们已经定义好的收集操作maxBy
  4. maxBy中传入我们的比较条件

所以代码就变成如下所示,可以看到这段代码一下子语义化提高了很多

Optional<Dish> dish = menu.stream().collect(Collectors.maxBy(Comparator.comparing(Dish::getCalories)));//输出结果 porkSystem.out.println(dish.get());

同理,既然我们知道最大值,最小值的逻辑也是一样的,只不过maxBy改为minBy即可

public static void main(String[] args) {Optional<Dish> dish = menu.stream().//使用收集器collect(Collectors.minBy(Comparator.comparing(Dish::getCalories)));//输出结果 season fruitSystem.out.println(dish.get());}

汇总和和一次性获取所有规约结果

还是上面的例子,假如我们希望计算出所有菜肴的热量,又该怎么做呢?很简单,步骤很上文差不多

  1. 拿到流
  2. 调用collect终端操作
  3. 在collect传入Collectors为我们提供好的收集方法summingInt
  4. 告诉summingInt我们要计算总和的属性是菜肴类的热量
public static void main(String[] args) {Integer calories = menu.stream().collect(Collectors.summingInt(Dish::getCalories));//4300System.out.println(calories);}

讲到这里,其实笔者也希望大家能够明白一个道理,叫举一反三,既然收集方法又summingInt,我们看到了int,自然是又double等各种计算方法,所以后续我们要计算double类型的变量时,只需将计算方法改为summingLong、summingDouble即可

再来说说求平均数把,很简单,计算逻辑观改为averagingInt即可

Double calories = menu.stream().collect(Collectors.averagingInt(Dish::getCalories));//4300System.out.println(calories);

计算菜肴数量也很简单,如下所示

 Long sum = menu.stream().collect(Collectors.counting());System.out.println(sum);

有时候你希望对某些数据进行监控统计,例如你希望一次性知道这个菜肴热量的平均、最大、最小、总和的数据又该怎么办呢?
很简单,强大的Collectors已经为我们提供了这样的一个函数summarizingInt,通过summarizingInt的返回值,我们可以任意获取需要的结果,代码如图所示

 public static void main(String[] args) {IntSummaryStatistics summaryStatistics = menu.stream().collect(Collectors.summarizingInt(Dish::getCalories));//4300System.out.println(summaryStatistics);}

连接字符串

可以看到流收集器配合java已经默认提供的收集方法工具类Collectors可以完成非常多的操作,操作String也是一样的方便,假如我们需要将所有菜肴的名字拼接起来,我们又该怎么做呢?
代码如下所示,只要使用Collectors.joining()这个收集方法即可

public static void main(String[] args) {String names = menu.stream().map(Dish::getName).collect(Collectors.joining());//porkbeefchickenfrench friesriceseason fruitpizzaprawnssalmonSystem.out.println(names);}

但是这样使得菜名都挤在一起了,我们看着很不舒服,我们能不能使用逗号分割一下呢?可以的,只需调用joining的重载方法即可

public static void main(String[] args) {String names = menu.stream().map(Dish::getName).collect(Collectors.joining(","));//pork,beef,chicken,french fries,rice,season fruit,pizza,prawns,salmonSystem.out.println(names);}

关于规约汇总的一些补充和建议

实际上,我们介绍了这么多规约和汇总的api,完完全全可以使用reducing就能够实现。
例如,我们要计算菜肴的热量总和,我们可以这样计算

public static void main(String[] args) {Integer sum = menu.stream().collect(Collectors.reducing(0,Dish::getCalories,(d1,d2)->d1+d2));System.out.println(sum);}

或者这样计算

public static void main(String[] args) {Integer sum = menu.stream().collect(Collectors.reducing(0,Dish::getCalories,Integer::sum));System.out.println(sum);}

所以我们也了解了reducing的使用传参逻辑

参数1:初始值(也可以不传,这样返回值就是optional便于判空处理)
参数2:处理的值
参数3:我要做什么:计算总和、还是比大小

同样的获取最大值也可以使用reducing,可以看出reducing的工作原理,就是不断将当前的计算结果传到下一次,如下图所示,根据第3个参数的计算逻辑得到最大值的结果,再将这个结果带到流的下一个值中继续处理,如此往复

public static void main(String[] args) {Integer sum = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, (d1, d2) -> d1 > d2 ? d1 : d2));System.out.println(sum);}

同理,计算了解了工作原理,我们在使用counting介绍统计就很方便了,例如我们想计算总和也很简单了,只需将计算逻辑改为当前统计的数量+1即可,代码如下所示

 public static void main(String[] args) {Integer sum = menu.stream().collect(Collectors.reducing(0, d->1,Integer::sum));System.out.println(sum);}

说了这么多是不是reducing就是无敌,我们今后就用这个就可以了呢?实际上不是的,俗话说每个工具都有适用的场景,但是reducing按照上述的用法会存在很多问题

例如代码,就存在频繁拆箱,造成开销过大

  Integer sum = menu.stream().collect(Collectors.reducing(0, d->1,Integer::sum));System.out.println(sum);

所以我们建议这种计算统计的逻辑还是使用

Integer sum = menu.stream().mapToInt(Dish::getCalories).sum();System.out.println(sum);

另一个就是并发问题了,我们都知道reducing的第三个参数传的时计算逻辑,假如我们参数2传是添加将数组中一个元素添加到列表中,第三3参数将这个只有一个值的列表的元素都添加到自己的集合中,从而完成 将stream转为list,猛地一看没什么问题

public static void main(String[] args) {Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5, 6).stream();List<Integer> numbers = stream.reduce(new ArrayList<Integer>(),(List<Integer> l, Integer e) -> {l.add(e);return l;}, (List<Integer> l1, List<Integer> l2) -> {l1.addAll(l2);return l1;});
//[1, 2, 3, 4, 5, 6]System.out.println(numbers);}

但是这种逻辑一旦用到了并行流身上就会爆炸,代码如下所示,原因很简单,设计者的初衷是让reduce作为无状态的计算,即当前结果计算成一个新的值给下一个流元素使用,这个计算结果从此之后与之前的逻辑就没有任何瓜葛,是个全新的东西,而不是像本示例一样做"有状态"的组装操作,这样的做法导致一旦我们使用并行流,并行的线程添加的元素会永远存在第2个参数的list中,永远的累加下去

 Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5, 6).parallelStream();List<Integer> numbers = stream.reduce(new ArrayList<Integer>(),(List<Integer> l, Integer e) -> {l.add(e);return l;}, (List<Integer> l1, List<Integer> l2) -> {l1.addAll(l2);return l1;});
//[4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3, 4, 6, 5, 4, 6, 5, 4, 6, 5, 4, 6, 5, 1, 2, 3]System.out.println(numbers);

分组

基本示例

我们希望根据菜肴的类型,将不同的菜肴分类要怎么做呢?在java8之前你可能会声明一个map,然后遍历list,将这些结果不断塞到map中,但是java8的做法非常简单,一行代码即,如下所示,可以看到只需告诉流收集器我们要做的事情即可,即使用dish的type进行分组

private static Map<Dish.Type, List<Dish>> groupDishesByType() {
//Dishes grouped by type: {FISH=[prawns, salmon], MEAT=[pork, beef, chicken], OTHER=[french fries, rice, season fruit, pizza]}return menu.stream().collect(groupingBy(Dish::getType));}

多级分组

有时候我们希望能够根据菜肴类型分组完,再基于菜肴的卡路里将其范围健康、不健康、肥胖的类型,又该怎么做呢?
很简单,如下所示在根据group type分类完成后,内部再传一个分类逻辑即可,这样做就会使得外层的分组完成将分组的流传给内部分组流再处理一次

private static Map<Dish.Type, Map<CaloricLevel, List<Dish>>> groupDishedByTypeAndCaloricLevel() {//Dishes grouped by type and caloric level: {MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]}, FISH={DIET=[prawns], NORMAL=[salmon]}, OTHER={DIET=[rice, season fruit], NORMAL=[french fries, pizza]}}return menu.stream().collect(groupingBy(Dish::getType,groupingBy((Dish dish) -> {if (dish.getCalories() <= 400) return CaloricLevel.DIET;else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;else return CaloricLevel.FAT;} )));}

具体工作原理如下图所示,说白了就是将流分组后,再将这个流传给下一个分组逻辑继续处理而已

按子组收集数据

如果我们希望拿到分组结果后进行相应的统计处理要怎么办呢?
例如我们按照菜肴的类型分组,再分别找出每个类型中热量最高的,我们要怎么做呢?
也很简单,在分组函数内部传入比较的属性和逻辑即可
我们发现这样的做法返回值是Map<Dish.Type, Optional<Dish>>,为什么呢?原因很简单,很可能你的某个菜肴类型中没有菜,实际上这种可能性完完全全是reducing返回值做的孽,在groupby的处理逻辑下,完全没可能出现某个菜肴类型下没有菜的情况,原因很简单,菜肴类型下没有菜,这不就说明这个集合中压根没有这个菜肴类型,没有这个菜肴类型就说明这个菜肴类型的key压根没有,所以这个Optional在groupby的逻辑下完完全全是碍眼的东西,我们必须想办法去掉

private static Map<Dish.Type, Optional<Dish>> mostCaloricDishesByType() {return menu.stream().collect(groupingBy(Dish::getType,reducing((Dish d1, Dish d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2)));}

好在java8为我们提供了解决方案,实际上在groupby的第二个参数,即计算逻辑外层加一个collectingAndThen即可,他就会按照collectingAndThen的第2个参数完成数据计算完成后的后置逻辑,这样我们就能将现有map中存在的key中的value的值一个个都用option get出来,因为我们知道map中既然存在某个key自然说明集合中有这个菜肴类型的菜,所以我们大胆用Optional::get,它完完全全不会报NoSuchElementException

private static Map<Dish.Type, Dish> mostCaloricDishesByTypeWithoutOprionals() {return menu.stream().collect(groupingBy(Dish::getType,collectingAndThen(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2),Optional::get)));}

具体的工作流程如下图所示


有时候我们希望按照菜肴类别对菜肴进行分组,按照根据类别对每个类别的菜肴设定菜肴属于高热量还是低热量食物,使用按子组收集数据又该怎么做呢?
可以看到笔者处理的步骤非常简单,将菜肴集合进行分组之后,便通过mapping进行映射处理,将流中每道菜都设置会对应的热量类型食物,然后存到set集合中以便去重,最终构成一个key为DishType,值为set的食物类型集合

private static Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType2() {//{OTHER=[DIET, NORMAL], MEAT=[FAT, DIET, NORMAL], FISH=[DIET, NORMAL]}return menu.stream().collect(groupingBy(Dish::getType, mapping(dish -> {if (dish.getCalories() <= 400) return CaloricLevel.DIET;else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;else return CaloricLevel.FAT;}, toSet())));}

当然了,这里的 toSet()用的是默认的集合类型,如果你有特殊要求,例如你想拿到有序的set就可以使用treeSet,代码如下所示

private static Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType() {return menu.stream().collect(groupingBy(Dish::getType, mapping(dish -> {if (dish.getCalories() <= 400) return CaloricLevel.DIET;else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;else return CaloricLevel.FAT;},toCollection(TreeSet::new))));}

分区

分区的优势

一句话,使用分区的流操作使得代码简洁且优雅,为了展示它的优雅和简洁,我们不妨用一个需求来展示这个操作:

我们希望按照是否是蔬菜类被对菜肴进行一级分类,然后再根据菜肴类别进行分类

当然,学过groupbying的你一定想过这样的写法

 Map<Boolean, Map<Dish.Type, List<Dish>>> result = menu.stream().collect(
//                        按照是否素菜类进行分组groupingBy(Dish::isVegetarian,//拿着外层分组的结果,再按照菜肴类型进行分组groupingBy(Dish::getType)));//{false={MEAT=[pork, beef, chicken], FISH=[prawns, salmon]}, true={OTHER=[french fries, rice, season fruit, pizza]}}System.out.println(result);

可以看得输出结果最外层的key为true的false,所以我们可以介绍一个语义化更明显的收集操作partitioningBy,代码如下所示

Map<Boolean, Map<Dish.Type, List<Dish>>> result2 = menu.stream().collect(
//                        按照true和false进行分类partitioningBy(Dish::isVegetarian,//拿着外层分组的结果,再按照菜肴类型进行分组groupingBy(Dish::getType)));

将数字按质数和非质数分区

这个partitioningBy专门为处理布尔值的分类而生的,我们不妨再看看一个例子:

我们希望能够编写一个方法能够统计指定范围的内的质数和非质数,最终我们希望能够返回的结果如下所示
{false=[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, 34, 35, 36, 38, 39, 40, 42, 44, 45, 46, 48, 49, 50, 51, 52, 54, 55, 56, 57, 58, 60, 62, 63, 64, 65, 66, 68, 69, 70, 72, 74, 75, 76, 77, 78, 80, 81, 82, 84, 85, 86, 87, 88, 90, 91, 92, 93, 94, 95, 96, 98, 99, 100], * true=[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]}

所以我们首先需要编写一个判断是否是质数的逻辑,代码如isPrime所示,然后使用这个方法作为partitioningBy的分类收集逻辑

 private static boolean isPrime(int num) {//实际上判断是否是质数不需要除到num,只需除到质数的开方即可int dividend = (int) Math.sqrt(num);return IntStream.rangeClosed(2, dividend).boxed().noneMatch(i -> num % i == 0);}public static Map<Boolean, List<Integer>> partitionPrimes(int n) {/*** Numbers partitioned in prime and non-prime: {false=[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, 34, 35, 36, 38, 39, 40, 42, 44, 45, 46, 48, 49, 50, 51, 52, 54, 55, 56, 57, 58, 60, 62, 63, 64, 65, 66, 68, 69, 70, 72, 74, 75, 76, 77, 78, 80, 81, 82, 84, 85, 86, 87, 88, 90, 91, 92, 93, 94, 95, 96, 98, 99, 100], * true=[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]}*/
//        使用特值流构建数据范围 return IntStream.rangeClosed(2, n)
//                用boxed转为原始流.boxed()//根据是否质数进行分类.collect(partitioningBy(num -> isPrime(num)));}

一个示例了解流收集器工作原理(重点)

可以看到我们上文就知道collect是完成流收集的关键,我们通过源码了解到collect的参数如下所示:

 <R, A> R collect(Collector<? super T, A, R> collector);

所以我们不妨基于此来了解一下Collector<? super T, A, R> collector

首先我们必须了解Collector三个泛型代表什么

  1. T:target,这个泛型代表着你最终的返回的集合的类型
  2. A:ccumulator,我们都知道流数据的收集必须是需要一个容器,所以A就代表我们声明的容器的类型
  3. R:result,这就是最终的返回值类型

所以,当我们自定义一个收集器其泛型为Collector<T, List<T>,List<T>>,这就代表着,我们最终会返回一个List<T>类型的集合。

了解了Collector泛型的意义,我们就用一个示例了解一下Collector接口中的方法。

我们希望能够自己编写一个tolist收集器,将菜肴集合中所有的热量值都存到list集合中。所以我们要继承Collector编写一个自己的收集器。所以我们的流收集器类型为Collector<T, List<T>,List<T>>
当我们继承这个接口的时候,我们需要编写这些方法,没关系我们一步一步来

@Overridepublic Supplier<List<T>> supplier() {return null;}@Overridepublic BiConsumer<List<T>, T> accumulator() {return null;}@Overridepublic BinaryOperator<List<T>> combiner() {return null;}@Overridepublic Function<List<T>, List<T>> finisher() {return null;}@Overridepublic Set<Characteristics> characteristics() {return null;}

supplier,他是个Supplier类型的函数式接口,声明为A即返回A类型,在Collector中所作的工作也很简单,就是提供一个累加器的容器

Supplier<A> supplier();

所以我们的逻辑为

 @Overridepublic Supplier<List<T>> supplier() {return ArrayList::new;}

然后是accumulator,可以发现他的返回值是BiConsumer,所以他的工作是处理List<T>T之间关系后,返回一个void,根据方法语义我们就知道这个方法是处理累加器中累加的逻辑,我们收集器的目的将流元素放到累加器中,所以我们的逻辑如代码段2所示

BiConsumer<List<T>, T> accumulator
@Overridepublic BiConsumer<List<T>, T> accumulator() {
//        return ((list, t) -> list.add(t));return List::add;}

然后是finisher,他是一个Function的函数式接口,做的就是传入两个一样类型的泛型,返回第二个泛型类型的是。而我们的方法名为finisher,意思也很简单,就是将累加器中的流变为最终结果流

Function<List<T>, List<T>> finisher()

所以我们的代码如下所示,直接将累加器的结果作为最终结果返回出去,可以看到笔者为了代码的语义化,注释了return list->list;取而代之的是return Function.identity();实际他们做的事情都一样,将累加器的东西原原本本扔出去

 @Overridepublic Function<List<T>, List<T>> finisher() {
//        return list->list;
//        这种恒等关系可以用用下面一段代码表示return Function.identity();}

总结起来他们工作流程就如下所示

完成这些操作后,原本因为流的处理导致集合被拆分无数个不可再分割的子流都变成一个个的list,我们需要将他们合并在一起,这时候就需要使用到combiner,他是一个BinaryOperator函数式接口,我们不妨看看源码看看他的工作原理

BinaryOperator<List<T>> combiner()

如下图所示,可以看到他继承了BiFunction,所作的就是将两个T作为参数然后返回一个T。

而我们的逻辑很简单,就是将无数个子流汇聚成一个大流就好了,所以逻辑如下所示

@Overridepublic Function<List<T>, List<T>> finisher() {
//        return list->list;
//        这种恒等关系可以用用下面一段代码表示return Function.identity();}

工作原理图如下所示

最后一个方法—— characteristics 会返回一个不可变的 Characteristics 集合,它定义了收集器的行为——尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。
Characteristics 是一个包含三个项目的枚举。

1.   UNORDERED ——归约结果不受流中项目的遍历和累积顺序的影响。
2.  CONCURRENT —— accumulator 函数可以从多个线程同时调用,且该收集器可以并行归
约流。如果收集器没有标为 UNORDERED ,那它仅在用于无序数据源时才可以并行归约。
3.  IDENTITY_FINISH ——这表明完成器方法返回的函数是一个恒等函数,可以跳过。

这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器 A 不加检查地转换为结果 R 是安全的。
我们迄今开发的 ToListCollector 是 IDENTITY_FINISH 的,因为用来累积流中元素的List 已经是我们要的最终结果,用不着进一步转换了,但它并不是 UNORDERED ,因为用在有序
流上的时候,我们还是希望顺序能够保留在得到的 List 中。最后,它是 CONCURRENT 的,但我们刚才说过了,仅仅在背后的数据源无序时才会并行处理。

所以我们的代码为

@Overridepublic Set<Characteristics> characteristics() {return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH,Characteristics.CONCURRENT));}

完整的代码

public class ToList<T> implements Collector<T, List<T>,List<T>> {@Overridepublic Supplier<List<T>> supplier() {return ArrayList::new;}@Overridepublic BiConsumer<List<T>, T> accumulator() {
//        return ((list, t) -> list.add(t));return List::add;}@Overridepublic BinaryOperator<List<T>> combiner() {return (list1,list2)->{list1.addAll(list2);return list1;};}@Overridepublic Function<List<T>, List<T>> finisher() {
//        return list->list;
//        这种恒等关系可以用用下面一段代码表示return Function.identity();}@Overridepublic Set<Characteristics> characteristics() {return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH,Characteristics.CONCURRENT));}
}

测试用例

public class Main {public static void main(String[] args) {List<Integer> list = menu.stream().map(Dish::getCalories).collect(new ToList<>());/*** [800, 700, 400, 530, 350, 120, 550, 400, 450]*/System.out.println(list);}
}

了解了这么多,其实自定义构建起完全可以简写,如下所示,只是语义化不强而已,我们可以根据collect传入对应的存储源,累加器处理,流组合器完成上文的操作

<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner);

代码如下所示

List<Object> list = menu.stream().map(Dish::getCalories).collect(ArrayList::new,List::add,List::addAll);

基于自定义收集提高质数收集器性能

简介

经过上文的解释我们大概了解了自定义流收集器的工作原理和实践步骤,所以我们就完完全全可以基于此去自定义自己所需的收集器从而优化收集器的性能。
以上文为例,我们有一个质数收集器,我们希望传入一个数字n就可以得到这个数字范围内的质数和非质数。而这个质数和非质数的存储集合我们用的是<Map<Boolean, List<Integer>>>,结果示例如下所示,这个输出结果的true就是存储质数,key为false就是存储非质数。

{false=[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20], true=[2, 3, 5, 7, 11, 13, 17, 19]}

开发过程

分析现有质数收集的不足

可以看到,这就是我们原有的代码编写逻辑,咋一看其实非常完美,解决了频繁拆箱,流的操作已经api的使用都很恰当。
但是它还可以做的更好。
其实学过数学的同学都是,判断一个数是否是质数,我们完完全全可以拿这个这个数,和它的开方以内的质数进行相处。
例如我们现在有一个数字19,我们如果想判断它是否是质数,我们首先需要算出它的开方 √19 ≈ 4,而4以内的质数有2,3。我们将这两个数和19进行相处,发现都不能被整除,所以19是质数。
由此得出,我们现有的解决方案还不是特别完美,假如我们能够拿到当前要判断是否是质数的数的开方以内的质数进行相处判断,就会减少很多没必要的计算,进而提高效率。

public static Map<Boolean, List<Integer>> partitionPrimes(int n) {/*** Numbers partitioned in prime and non-prime: {false=[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, 34, 35, 36, 38, 39, 40, 42, 44, 45, 46, 48, 49, 50, 51, 52, 54, 55, 56, 57, 58, 60, 62, 63, 64, 65, 66, 68, 69, 70, 72, 74, 75, 76, 77, 78, 80, 81, 82, 84, 85, 86, 87, 88, 90, 91, 92, 93, 94, 95, 96, 98, 99, 100],* true=[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]}*/
//        使用特值流构建数据范围return IntStream.rangeClosed(2, n)
//                用boxed转为原始流.boxed()//根据是否质数进行分类.collect(partitioningBy(num -> isPrime(num)));}

逐步完成编码

有了这个想法,我们就能展开编码了,由上文可知,我们希望拿着要判断的数字的开方以内的质数进行相除,所以我们会先编写出被除数与除数的代码逻辑,如下所示,可以看到笔者使用takeWhile方法来筛选出被除数的开方以内的质数,我们只需通过第2个参数传入判断是否小于被除数开方逻辑就可以确定方法,然后在使用isPrime就可以完成判断当前数字是否是质数

/*** 上一个求分离质数和非质数方案采用的是被除数除所有小于被除数开方范围的数,* 这次的优化是除小于被除数开方的质数** @param primeList* @param candidate* @return*/private static boolean isPrime(List<Integer> primeList, Integer candidate) {//被除数的开方int candidateRoot = (int) Math.sqrt(candidate);return takeWhile(primeList, i -> i <= candidateRoot).stream().noneMatch(i -> candidate % i == 0);}/*** 找出质数list中小于被除数开发的范围** @param list* @param predicate* @param <A>* @return*/private static <A> List<A> takeWhile(List<A> list, Predicate<A> predicate) {int i = 0;for (A num : list) {if (!predicate.test(num)) {return list.subList(0, i);}++i;}return list;}

有了这个思路我们再来完成剩下逻辑
我们会编写一个类PrimeNumCollector implements Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> ,相应实现如下,因为有了上文自定义流收集器的介绍笔者这里就不多赘述了,需要重点了解的就是accumulator方法,我们知道他完成的是供应源和下一个流元素的逻辑处理,由此得知这个方法我们可以拿到当前的供应源中存在的质数,然后将这些质数传入刚刚编写好的isPrime中完成我们所说的数学定理,再将符合要求的质数存到key为true的map中,从而更加高效的完成数字的收集

public class PrimeNumCollector implements Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {/*** 上一个求分离质数和非质数方案采用的是被除数除所有小于被除数开方范围的数,* 这次的优化是除小于被除数开方的质数** @param primeList* @param candidate* @return*/private static boolean isPrime(List<Integer> primeList, Integer candidate) {//被除数的开方int candidateRoot = (int) Math.sqrt(candidate);return takeWhile(primeList, i -> i <= candidateRoot).stream().noneMatch(i -> candidate % i == 0);}/*** 找出质数list中小于被除数开发的范围** @param list* @param predicate* @param <A>* @return*/private static <A> List<A> takeWhile(List<A> list, Predicate<A> predicate) {int i = 0;for (A num : list) {if (!predicate.test(num)) {return list.subList(0, i);}++i;}return list;}@Overridepublic Supplier<Map<Boolean, List<Integer>>> supplier() {return () -> new HashMap<>() {{put(true, new ArrayList<>());put(false, new ArrayList<>());}};}/*** 因为累加器是基于供应源和下一个流元素进行下一步逻辑操作* 所以这里我们就可以进行一个取巧,拿着当前流元素中的质数去和新值比较从而提高性能* @return*/@Overridepublic BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {return (map, num) -> {map.get(isPrime(map.get(true), num)).add(num);};}/*** 实际上本示例做法是顺序的,这个方法只有在并发的情况下会触发,所以并没有什么卵用** @return*/@Overridepublic BinaryOperator<Map<Boolean, List<Integer>>> combiner() {return (map1, map2) -> {map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));return map1;};}@Overridepublic Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {return Function.identity();}@Overridepublic Set<Characteristics> characteristics() {return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));}
}

性能测试

编写原有的测试用例,和自定义流收集器的测试用例,所以在了解现有需求的情况下,使用自定义流收集器可能会获得更好的效率

public static Map<Boolean, List<Integer>> partitionPrimes(int n) {/*** Numbers partitioned in prime and non-prime: {false=[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, 34, 35, 36, 38, 39, 40, 42, 44, 45, 46, 48, 49, 50, 51, 52, 54, 55, 56, 57, 58, 60, 62, 63, 64, 65, 66, 68, 69, 70, 72, 74, 75, 76, 77, 78, 80, 81, 82, 84, 85, 86, 87, 88, 90, 91, 92, 93, 94, 95, 96, 98, 99, 100],* true=[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]}*/
//        使用特值流构建数据范围return IntStream.rangeClosed(2, n)
//                用boxed转为原始流.boxed()//根据是否质数进行分类.collect(partitioningBy(num -> isPrime(num)));}
public static Map<Boolean, List<Integer>> myPartitionPrimesWithCustomCollector(int n) {return IntStream.rangeClosed(2, n).boxed().collect(new PrimeNumCollector());}
public class CollectorHarness {public static void main(String[] args) {
//        测试自己所写的结果
//        System.out.println(""+PartitionPrimeNumbers.partitionPrimes(20));
//        System.out.println(""+PartitionPrimeNumbers.myPartitionPrimesWithCustomCollector(20));
//        性能测试System.out.println("Partitioning done in: " + execute(PartitionPrimeNumbers::partitionPrimes) + " msecs");System.out.println("Partitioning done in: " + execute(PartitionPrimeNumbers::partitionPrimesWithCustomCollector) + " msecs" );System.out.println("Partitioning done in: " + execute(PartitionPrimeNumbers::myPartitionPrimesWithCustomCollector) + " msecs" );}private static long execute(Consumer<Integer> primePartitioner) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();primePartitioner.accept(1_000_000);long duration = (System.nanoTime() - start) / 1_000_000;if (duration < fastest) fastest = duration;System.out.println("done in " + duration);}return fastest;}
}

输出结果,可以看到效率比原本要快了降级60%

done in 877
done in 748
done in 874
done in 854
done in 858
done in 858
done in 850
done in 786
done in 904
done in 985
Partitioning done in: 748 msecs
done in 427
done in 474
done in 327
done in 274
done in 491
done in 263
done in 261
done in 456
done in 264
done in 275
Partitioning done in: 261 msecs
done in 766
done in 456
done in 619
done in 435
done in 406
done in 586
done in 574
done in 464
done in 451
done in 1059
Partitioning done in: 406 msecs

当然你也可以使用重载方法进行如下的缩写,只是语义没那么清晰而已

public static Map<Boolean, List<Integer>> partitionPrimesWithInlineCollector(int n) {return Stream.iterate(2, i -> i + 1).limit(n).collect(() -> new HashMap<Boolean, List<Integer>>() {{put(true, new ArrayList<>());put(false, new ArrayList<>());}},(acc, candiate) -> {acc.get(isPrime(acc.get(true), candiate)).add(candiate);},(map1, map2) -> {map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));});}

源码地址

https://github.com/shark-ctrl/Java8InAction

参考文献

Java 8 in Action

java8流实战-用流收集数据实践简记相关推荐

  1. SparkStreaming 系列(二)kafka与Streaming集成direct流实战---多流集群高并发场景代码演示

    写在前面: 我是「nicedays」,一枚喜爱做特效,听音乐,分享技术的大数据开发猿.这名字是来自world order乐队的一首HAVE A NICE DAY.如今,走到现在很多坎坷和不顺,如今终于 ...

  2. 《Java8实战》-第六章读书笔记(用流收集数据-01)

    用流收集数据 我们在前一章中学到,流可以用类似于数据库的操作帮助你处理集合.你可以把Java 8的流看作花哨又懒惰的数据集迭代器.它们支持两种类型的操作:中间操作(如 filter 或 map )和终 ...

  3. 《Java8实战》笔记(06):用流收集数据

    文章目录 收集器简介 收集器用作高级归约 预定义收集器 归约和汇总 查找流中的最大值和最小值 汇总 连接字符串 广义的归约汇总 Stream接口的collect和reduce有何不同 收集框架的灵活性 ...

  4. java8学习:用流收集数据

    内容来自< java8实战 >,本篇文章内容均为非盈利,旨为方便自己查询.总结备份.开源分享.如有侵权请告知,马上删除. 书籍购买地址:java8实战 下面我们将采用这样一个实体类 @Da ...

  5. 用流收集数据Collectors的用法介绍分组groupingBy、分区partitioningBy(一)

    文章目录 一.收集器简介 二.归约和汇总 1.查找流中最大值和最小值Collectors.maxBy和,Collectors.minBy 2.汇总 3.连接字符串 4.广义归约汇总 三.分组 1.多级 ...

  6. Java 8 (5) Stream 流 - 收集数据

    在前面已经使用过collect终端操作了,主要是用来把Stream中的所有元素结合成一个List,在本章中,你会发现collect是一个归约操作,就像reduce一样可以接受各种做法作为参数,将流中的 ...

  7. Java8新特性——Stream流:不同于IO流的流,操作集合数据

    文章目录 Stream流 1.认识Stream流(源码说明) 1.1.Stream流和Collection的区别 1.2.流的获取方式 1.3.流操作和管道 1.4.并行性 1.5.不干扰内政 1.6 ...

  8. Java8 新特性之流式数据处理(转)

    转自:https://www.cnblogs.com/shenlanzhizun/p/6027042.html 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作 ...

  9. java8新特性stream流

    参考自深蓝至尊的Java8 新特性之流式数据处理 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作变得简洁了许多,通常我们需要多行代码才能完成的操作,借助于流式 ...

最新文章

  1. 知乎自动批量软件使用教程
  2. 【自动驾驶】5. ROS和DDS的区别总结
  3. linux复制文件命令cat ,Linux学习之四(复制移动文件命令cp等及查看文本命令cat等)2017-03-28...
  4. 互斥锁、自旋锁和自适应自旋锁
  5. RabbitMQ之Exchange分类
  6. Python函数传入的参数是否改变(函数参数、指针、引用)
  7. Bootstrap3 过渡插件
  8. EXPRESS项目PM2启动NODE_ENV传参数不生效问题解决方法
  9. 19 岁就在南大读博的“00 后”女生:这是我的学习经历...
  10. 爬虫python需要安装吗_python爬虫需要安装什么
  11. 正确率、召回率和F值
  12. 软考信息安全工程师经验分享以及报考建议
  13. MD5加密解密网站测试,MD5加密还安全吗?
  14. 分析、归纳、综合、演绎
  15. 一个人知道自己为什么而活,他就可以忍受生活加诸他的一切苦难
  16. html等待,休眠代码,HTML DOM closest()用法及代码示例
  17. Shiro学习01:使用Shiro实现身份管理和权限验证
  18. 筛选后系列填充_案例分享10——如何在筛选后的表格中粘贴数据(二)
  19. MYSQL学习整理(4):函数
  20. scp拷贝文件报错-bash: scp: command not found

热门文章

  1. Linux应用开发学习路线图
  2. oracle 大量trace,Trace文件过量生成问题解决
  3. 上转换纳米颗粒无机复合材料中近红外调控研究应用
  4. DUX主题顶部彩条美化
  5. python集合运算求出经理和技术人员有几人_python练习集合-010
  6. 杭电ACM1049题
  7. 扭曲的空间和扭曲的能量场
  8. 相关系数之皮尔森相关系数
  9. Linux系统安装oracle客户端步骤详解
  10. Debye-Wolf积分计算器