一、引入流

(一)、引言

1、流是什么

流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以看成遍历数据集的高级迭代器。

流可以透明地并行处理,无需写任何多线程代码。

  • eg:返回低热量的菜肴名称,并按照卡路里排序。
package com.java8.chapter4;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;/*** stream 测试** @author liumd* @date 2018/03/28*/
public class StreamTest {public static void main(String[] args) {List<Dish> menu = new ArrayList<>();menu.add(new Dish("fish", 200));menu.add(new Dish("cake", 600));menu.add(new Dish("banana", 350));menu.add(new Dish("apple", 400));menu.add(new Dish("orange", 300));testJava7(menu);testJava8(menu);}public static void testJava8(List<Dish> menu) {// Java8 流实现, 一步完成3步List<String> lowCaloricDishesName = menu.stream()/parallelStream().filter(d -> d.getCalories() < 400).sorted(comparing(Dish::getCalories)).map(Dish::getName).collect(toList());System.out.println("123.处理排序后的菜名列表: " + lowCaloricDishesName);}public static void testJava7(List<Dish> menu) {// Java7实现List<Dish> lowCaloricDishes = new ArrayList<>();// 用累加器筛选元素for (Dish d : menu) {if (d.getCalories() < 400) {lowCaloricDishes.add(d);}}System.out.println("1.用累加器筛选元素: " + lowCaloricDishes);// 用匿名类对菜肴排序Collections.sort(lowCaloricDishes, new Comparator<Dish>() {@Overridepublic int compare(Dish d1, Dish d2) {return Integer.compare(d1.getCalories(), d2.getCalories());}});System.out.println("2.用匿名类对菜肴排序: " + lowCaloricDishes);// 处理排序后的菜名列表List<String> lowCaloricDishesName = new ArrayList<>();for (Dish d : lowCaloricDishes) {lowCaloricDishesName.add(d.getName());}System.out.println("3.处理排序后的菜名列表: " + lowCaloricDishesName);}
}
class Dish {/*** 食物名称*/private String name;/*** 卡路里*/private int calories;// 省略构造方法、setter和getter、toString
}
输出:1.用累加器筛选元素: [Dish{name='fish', calories=200}, Dish{name='banana', calories=350}, Dish{name='orange', calories=300}]2.用匿名类对菜肴排序: [Dish{name='fish', calories=200}, Dish{name='orange', calories=300}, Dish{name='banana', calories=350}]3.处理排序后的菜名列表: [fish, orange, banana]123.处理排序后的菜名列表: [fish, orange, banana]

利用Java 8中的Stream API写出来的代码:

 声明性——更简洁,更易读
 可复合——更灵活
 可并行——性能更好

2、流简介

  • 流:从支持数据处理操作的源生成的元素序列。解析:

 元素序列—— 像集合一样,流提供了一个接口,可以访问特定元素类型的一组有序值。集合讲的是数据,流讲的是计算。

集合是数据结构,它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。

流的目的在于表达计算(如filter、 sorted和map)。

 源—— 流会使用一个提供数据的源,如集合、数组或输入/输出资源。

注意:从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。

 数据处理操作 —— 流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filter、 map、 reduce、 find、 match、 sort等。流操作可以顺序执行,也可并行执行。

  • 流操作有两个重要的特点

 流水线 —— 很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。
 内部迭代 —— 与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。

import static java.util.stream.Collectors.toList;           --> 从menu获得流( 菜肴列表)
List<String> threeHighCaloricDishNames = menu.stream()      --> 建立操作流水线.filter(d -> d.getCalories() > 300)     --> 首先选出高热量的菜肴 .map(Dish::getName)                     --> 获取菜名.limit(3)                               --> 只选择前3个.collect(toList());                     --> 将结果保存在另外一个List中
System.out.println(threeHighCaloricDishNames);              --> 结果是[pork, beef,chicken]

 filter 筛选 —— 接受 Lambda ,从流中排除某些元素。
 map 提取 —— 接受一个 Lambda ,将元素转换成其他形式或提取信息。
 limit 截断 —— 截断流,使其元素不超过给定数量。
 collect —— 将流转换为其他形式。

3、流与集合

(1)、区别

集合与流之间的差异就在于什么时候进行计算。

  • 集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(可以往集合里加东西或者删东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)集合则是急切创建的。
  • 流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。
集合和流的另一个关键区别在于它们 遍历数据的方式

(2)、流只能遍历一次

和迭代器类似,流只能遍历一次。遍历完之后,可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样 。

(3)、外部迭代与内部迭代

外部迭代:使用Collection接口需要用户去做迭代(比如用for-each)。
内部迭代:Streams库使用内部迭代。

4、流操作

java.util.stream.Stream 中的 Stream 接口定义了许多操作。

List<String> names = menu.stream().filter(d -> d.getCalories() > 300).map(Dish::getName).limit(3).collect(toList());

可以看到两类操作:

 filter、 map和limit可以连成一条流水线;
 collect触发流水线执行并关闭它。

可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作。

(1)、中间操作

诸如filter或sorted等中间操作会返回另一个流。利用了流的延迟性质。

(2)、终端操作

终端操作会从流的流水线生成结果。其结果是任何不是流的值,比如List、 Integer,甚至void。

(3)、使用流

流的使用一般包括三件事:
 一个数据源(如集合)来执行一个查询;
 一个中间操作链,形成一条流的流水线;
 一个终端操作,执行流水线,并能生成结果。
流的流水线背后的理念类似于构建器模式。 在构建器模式中有一个调用链用来设置一套置(对流来说这就是一个中间操作链),接着是调用built方法(对流来说就是终端操作)。

5、小结

 流是“从支持数据处理操作的源生成的一系列元素”。
 流利用内部迭代:迭代通过 filter 、 map 、 sorted 等操作被抽象掉了。
 流操作有两类:中间操作和终端操作。
 filter 和 map 等中间操作会返回一个流,并可以链接在一起。可以用它们来设置一条流
水线,但并不会生成任何结果。
 forEach 和 count 等终端操作会返回一个非流的值,并处理流水线以返回结果。

 流中的元素是按需计算的。

二、使用流

1、筛选和切片

用谓词筛选,筛选出各不相同的元素,忽略流中的头几个元素,或将流截短至指定长度。

(1)、用谓词筛选

Streams接口支持filter方法)。该操作会接受一个谓词(一个返回boolean的函数)作为参数,并返回一个包括所有符合谓词的元素的流。

List<Integer> nums = Arrays.asList( 2, 3, -1, -5, 1);
List<Integer> negtiveNum = nums.stream().filter(n -> n <0).collect(toList());
System.out.println(negtiveNum); --> [-1, -5]

(2)、筛选各异的元素

流还支持一个叫作distinct的方法,它会返回一个元素各异(根据流所生成元素的hashCode和equals方法实现)的流。

List<Integer> numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
numbers.stream().filter(i -> i % 2 == 0).distinct().forEach(System.out::println); --> 2  4

(3)、截短流

流支持limit(n)方法,该方法会返回一个不超过给定长度的流。所需的长度作为参数传递给limit。如果流是有序的,则最多会返回前n个元素。注意: limit也可以用在无序流上,比如源是一个Set。这种情况下, limit的结果不会以任何顺序排列。

List<Integer> nums3 = Arrays.asList(6, 2, 3, 8, 8, 2, 4);
List<Integer> res = nums3.stream().filter(i -> i / 2 > 1 && i % 2 == 0).limit(3).collect(toList());
System.out.println(res);  -->[6, 8, 8]

(4)、跳过元素

流还支持skip(n)方法,返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一个空流。请注意, limit(n)和skip(n)是互补的!

List<Integer> nums3 = Arrays.asList(6, 2, 3, 8, 8, 2, 4);
List<Integer> res = nums3.stream().filter(i -> i / 2 > 1 && i % 2 == 0).skip(5).collect(toList());
System.out.println(res);  --> [4]

2、映射

Stream API也通过map和flatMap方法从某些对象中选择信息。

(1)、对流中每一个元素应用函数

流支持map方法,它会接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映射成一个新的元素(使用映射一词,是因为它和转换类似,但其中的细微差别在于它是“创建一个新版本”而不是去“修改”)。

List<String> words = Arrays.asList("Java 8", "Lambdas", "In", "Action");
List<Integer> wordLengths = words.stream().map(String::length).collect(toList());
System.out.println(wordLengths);    -->[6, 7, 2, 6]

(2)、流的扁平化

eg: 给 定 单 词 列 表["Hello","World"],你想要返回列表["H","e","l", "o","W","r","d"]。

  • 尝试使用map和Arrays.stream(),各个数组分别映射成一个流,返回一个流的列表
String[] arrayOfWords = {"Goodbye", "World"};
List<Stream<String>> wordList = Arrays.stream(arrayOfWords).map(word -> word.split("")).map(Arrays::stream).distinct().collect(Collectors.toList());
  • 使用flatMap, 各个数组映射成流的内容。
String[] arrayOfWords = {"Goodbye", "World"};
List<String> wordList = Arrays.stream(arrayOfWords).map(word -> word.split("")).flatMap(Arrays::stream).distinct().collect(Collectors.toList());
System.out.println(wordList);    --> [G, o, d, b, y, e, W, r, l]

3、查找和匹配

查看数据集中的某些元素是否匹配一个给定的属性。(短路,类似于Java中的||、&&)

(1)、检查谓词是否至少匹配一个元素

使用anyMatch方法,返回一个boolean,是一个终端操作。

(2)、检查谓词是否匹配所有元素

使用allMatch方法,相对方法:noneMatch (确保流中没有任何元素与给定的谓词匹配)。

  • anyMatch、 allMatch和noneMatch短路求值
List<Integer> nums4 = Arrays.asList(1, 2, -1, 3, 4, 1);
if (nums4.stream().anyMatch(n -> n < 0)) {System.out.println("The list exists negtive number.");
}
if (!nums4.stream().allMatch(n -> n > 0)) {System.out.println("The list exists negtive number.");
}

(3)、查找元素

findAny方法将返回当前流中的任意元素。它可以与其他流操作结合使用。

  • Optional<T>类(java.util.Optional)是一个容器类,代表一个值存在或不存在。

 isPresent()将在Optional包含值的时候返回true, 否则返回false。
 ifPresent(Consumer<T> block)会在值存在的时候执行给定的代码块。它让你传递一个接收T类型参数,并返回void的Lambda表达式。
 T get()会在值存在时返回值,否则抛出一个NoSuchElement异常。

 T orElse(T other)会在值存在时返回值,否则返回一个默认值。

List<Integer> nums5 = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
Optional<Integer> op = nums5.stream().filter(n -> n % 2 == 0).findAny();
System.out.println(op.get() + ", " + op.isPresent() + ", " + op.orElse(null));  --> 2, true, 2

(4)、查找第一个元素

使用findFirst 。

nums5.stream().filter(n -> n % 2 == 0).findFirst().get()  --> 2

4、规约

如何把一个流中的元素组合起来,使用reduce操作来表达更复杂的查询。需要将流中所有元素反复结合起来,得到一个值,比如一个Integer。这样的查询可以被归类为 归约操作(将流归约成一个值)。

(1)、元素求和

使用reduce。

Lambda表达式:int sum = numbers.stream().reduce(0/1, (a, b) -> a +/* b);
  • reduce接受两个参数:

 一个初始值,这里是 0 ;
 一个 BinaryOperator<T> 来将两个元素结合起来产生一个新值,这里用的是 lambda (a, b) -> a + b 。

方法引用:int sum = nums6.stream().reduce(0, Integer::sum).get();  get之前返回一个Optional对象
List<Integer> nums6 = Arrays.asList(1, 2, 3, 4, 5);
int sum = nums6.stream().reduce(0, (a, b) -> a + b);
System.out.println("元素求和: " + sum);  --> 15
System.out.println("方法引用元素求和: " + nums6.stream().reduce(Integer::sum).get());  --> 15
sum = nums6.stream().reduce(1, (a, b) -> a * b);
System.out.println("元素求积: " + sum);  -->120
System.out.println("无初始值元素求积: " + nums6.stream().reduce((a, b) -> a * b).get());  --> 120 
  • 为什么它返回一个Optional<Integer>呢?考虑流中没有任何元素的情况。由于没有初始值,reduce操作无法返回其和。这就是为什么结果被包裹在一个Optional对象里,以表明和可能不存在。

(2)、最大值和最小值

使用reduce。

List<Integer> nums7 = Arrays.asList(2, 6, 3, 4, 6);
System.out.println("最大值: " + nums7.stream().reduce(Integer::max).get());  --> 6
System.out.println("最小值: " + nums7.stream().reduce(Integer::min).get());  --> 2

(3)、小结

1)、规约测试

怎样用map和reduce方法数一数流中有多少个菜呢?

List<Dish> menu = new ArrayList<>();
menu.add(new Dish("fish", 200));
menu.add(new Dish("cake", 600)); menu.add(new Dish("banana", 350));
menu.add(new Dish("apple", 400));
menu.add(new Dish("orange", 300));
int count = menu.stream().map(d -> 1).reduce(Integer::sum).get();
System.out.println("菜单中有" + count + "道菜。");  --> 5

map和reduce的连接通常称为map-reduce模式。很容易并行化。内置count方法可用来计算流中元素的个数:

long count = menu.stream().count();
2)、归约方法的优势与并行化
  • 逐步迭代求和:要更新共享变量sum,不易并行化;使用reduce的好处:迭代被内部迭代抽象掉,让内部实现得以选择并行执行reduce操作。
  • 若加入同步,很可能会发现线程竞争抵消了并行本应带来的性能提升!这种计算的并行化需要另一种办法:将输入分块,分块求和,最后再合并起来。但这样的话代码看起来就完全不一样了
  • 使用流来对所有的元素并行求和时,代码几乎不用修改,只需: stream()换成了parallelStream()。
int sum = numbers.parallelStream().reduce(0, Integer::sum);

上述并行也要付出一定代价:传递给reduce的Lambda不能更改状态(如实例变量),而且操作必须满足结合律才可以按任意顺序执行。

3)、流操作:无状态和有状态
  • 诸如map或filter等操作会从输入流中获取每一个元素,并在输出流中得到0或1个结果。这些操作一般都是无状态的:它们没有内部状态(假设用户提供的Lambda或方法引用没有内部可变状态)。
  • 但诸如reduce、 sum、 max等操作需要内部状态来累积结果。内部状态很小,都是有界的。
  • 相反,诸如sort或distinct等操作开始也是接受一个流,再生成一个流(中间操作),但有一个关键的区别:从流中排序和删除重复项时都需要知道先前的历史。例如:排序要求所有元素都放入缓冲区后才能给输出流加入一个项目,这一操作的存储要求是无界的。要是流比较大或是无限的,就可能会有问题。把这些操作叫作有状态操作。

5、付诸实践

eg:执行交易的交易员。

(1) 找出2011年发生的所有交易,并按交易额排序(从低到高)。
(2) 交易员都在哪些不同的城市工作过?
(3) 查找所有来自于剑桥的交易员,并按姓名排序。
(4) 返回所有交易员的姓名字符串,按字母顺序排序。
(5) 有没有交易员是在米兰工作的?
(6) 打印生活在剑桥的交易员的所有交易额。
(7) 所有交易中,最高的交易额是多少?
(8) 找到交易额最小的交易。

//1.找出2011年发生的所有交易,并按交易额排序(从低到高)
List<Transaction> tr2011 = transactions.stream().filter(t -> 2011 == t.getYear()).sorted(Comparator.comparing(Transaction::getValue)).collect(Collectors.toList());
System.out.println(tr2011);
//2.交易员都在哪些不同的城市工作过
List<String> cityList = transactions.stream().map(t -> t.getTrader().getCity()).distinct().collect(Collectors.toList());
System.out.println(cityList);
Set<String> citySet = transactions.stream().map(t -> t.getTrader().getCity()).collect(Collectors.toSet());
System.out.println(citySet);
//3.查找所有来自于剑桥的交易员,并按姓名排序
List<Trader> traders = transactions.stream().map(Transaction::getTrader).filter(trader -> "Cambridge".equals(trader.getCity())).distinct().sorted(Comparator.comparing(Trader::getName)).collect(Collectors.toList());
System.out.println(traders);
//4.返回所有交易员的姓名字符串,按字母顺序排序
String traderNameStr = transactions.stream().map(t -> t.getTrader().getName()).distinct().sorted().reduce(String::concat).get();
//效率不高,反复连接
System.out.println(traderNameStr);
traderNameStr = transactions.stream().map(t -> t.getTrader().getName()).distinct().sorted().collect(Collectors.joining());
System.out.println(traderNameStr);
//5.有没有交易员是在米兰工作的
boolean milanBased = transactions.stream().anyMatch(t -> "milan".equals(t.getTrader().getName()));
System.out.println(milanBased);
//6.打印生活在剑桥的交易员的所有交易额
transactions.stream().filter(t -> "Cambridge".equals(t.getTrader().getCity())).map(Transaction::getValue).forEach(System.out::println);
//7.所有交易中,最高的交易额是多少
int maxTr = transactions.stream().map(t -> t.getValue()).reduce(Integer::max).get();
System.out.println(maxTr);
//8.找到交易额最小的交易
Transaction minTran = transactions.stream()
.reduce((t1, t2) -> t1.getValue() < t2.getValue() ? t1 : t2).get();
System.out.println(minTran);
minTran = transactions.stream().min(Comparator.comparing(Transaction::getValue)).get();
System.out.println(minTran);

6、数值流

例如:使用reduce方法计算流中元素的总和。

int calories = menu.stream().map(Dish::getCalories).reduce(0, Integer::sum);

以上操作有一个暗含的装箱成本。每个Integer都必须拆箱成一个原始类型,再进行求和。如果如下进行更好:

int calories = menu.stream().map(Dish::getCalories).sum();

但这是不可能的。问题在于map方法会生成一个Stream<T>。

Stream API还提供了原始类型流特化,专门支持处理数值流的方法。

(1)、原始类型流特化

  • Java 8引入了三个原始类型特化流接口:IntStream、 DoubleStream和LongStream,分别将流中的元素特化为int、 long和double,从而避免了暗含的装箱成本。每个接口都带来了进行常用数值归约的新方法,比如sum、max、min、average等。
  • 注意:这些特化的原因并不在于流的复杂性,而是装箱造成的复杂性——即类似int和Integer之间的效率差异。
1)、 映射到数值流

将流转换为特化版本的常用方法是mapToInt、 mapToDouble和mapToLong。返回特化流,而不是Stream<T>。

int calories = menu.stream().mapToInt(Dish::getCalories).sum();
2)、 转换回对象流

使用 boxed方法。

IntStream intStream = menu.stream().mapToInt(Dish::getCalories);  --> 将 Stream 转换为数值流
Stream<Integer> stream = intStream.boxed();   --> 将数值流转换为Stream 
3)、 默认值OptionalInt

Optional可以用Integer、 String等参考类型来参数化。对于三种原始流特化,也分别有一个Optional原始类型特化版本: OptionalInt、 OptionalDouble和 OptionalLong。

OptionalInt maxCalories = menu.stream().mapToInt(Dish::getCalories).max();

若有最大值,返回最大值,否者返回1:

int max = maxCalories.orElse(1);

(2)、数值范围

Java 8引入了两个可以用于IntStream和LongStream的静态方法,帮助生成这种范围:range和rangeClosed。

这两个方法都是第一个参数接受 起始值,第二个参数接受 结束值。但 range是 不包含结束值的,而 rangeClosed则 包含结束值。

IntStream evenNumbers = IntStream.rangeClosed(1, 100).filter(n -> n % 2 == 0);
int count = evenNumbers.count();   --> 50(1~100)   若使用range,返回49(1~99)

(3)、数值流应用:勾股数

创建一个勾股数流。
  • 勾股数:(3, 4, 5)就是一组有效的勾股数,因为3 * 3 + 4 * 4 = 5 * 5或9 + 16 = 25。这样的三元数有无限组。
  • 表示三元数:new int[]{3, 4, 5}
  • 筛选成立的组合:假定提供了三元数中的前两个数字: a和b。怎么知道它是否能形成一组勾股数呢?需要测试a * a + b * b的平方根是不是整数。
filter(b -> Math.sqrt(a*a + b*b) % 1 == 0)
  • 生成三元组:在筛选之后,知道a和b能够组成一个正确的组合。现在需要创建一个三元组。
stream.filter(b -> Math.sqrt(a*a + b*b) % 1 == 0).map(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)});
  • 生成b值:Stream.rangeClosed让你可以在给定区间内生成一个数值流。调用boxed ,从rangeClosed 返 回 的IntStream 生成 一 个Stream<Integer>。
IntStream.rangeClosed(1, 100).filter(b -> Math.sqrt(a*a + b*b) % 1 == 0).boxed().map(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)});

使用IntStream的mapToObj方法,返回一个对象值流:

IntStream.rangeClosed(1, 100).filter(b -> Math.sqrt(a*a + b*b) % 1 == 0).mapToObj(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)});
  • 生成值 :给出了a的值。
Stream<int[]> pythagoreanTriples = IntStream.rangeClosed(1, 100).boxed().flatMap(a -> IntStream.rangeClosed(a, 100).filter(b -> Math.sqrt(a*a + b*b) % 1 == 0).mapToObj(b -> new int[]{a, b, (int)Math.sqrt(a * a + b * b)}));

首先,创建一个从1到100的数值范围来生成a的值。对每个给定的a值,创建一个三元数流。要是把a的值映射到三元数流的话,就会得到一个由流构成的流。 flatMap方法在做映射的同时,还会把所有生成的三元数流扁平化成一个流。这样你就得到了一个 三元数流。

注意:b的范围改成a到100。没有必要再从1开始了,否则就会造成重复的三元数,例如(3,4,5)和(4,3,5)。

  • 执行代码:(结果:3, 4, 5)
Stream<int[]> pythagoreanTriples = IntStream.rangeClosed(1, 100).boxed().flatMap(a -> IntStream.rangeClosed(a, 100).filter(b -> Math.sqrt(a * a + b * b) % 1 == 0).mapToObj(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)}));
pythagoreanTriples.limit(6).forEach(t -> System.out.println(t[0] + ", " + t[1] + ", " + t[2]));
  • 你还能做得更好吗?

并非最优,两次平方根。让代码更为紧凑的一种可能的方法是,先生成所有的三元数(a*a, b*b, a*a+b*b),然后再筛选符合条件的:(结果:3.0, 4.0, 5.0)

Stream<double[]> pythagoreanTriples2 = IntStream.rangeClosed(1, 100).boxed().flatMap(a -> IntStream.rangeClosed(a, 100).mapToObj(b -> new double[]{a, b, Math.sqrt(a * a + b * b)}).filter(t -> t[2] % 1 == 0));
pythagoreanTriples2.limit(6).forEach(t -> System.out.println(t[0] + ", " + t[1] + ", " + t[2]));

7、构建流

(1)、由值创建流

  • 使用静态方法Stream.of,通过显式值创建一个流。它可以接受任意数量的参数。Stream.empty()生成一个空流
Stream<String> valueStream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");
valueStream.map(String::toUpperCase).forEach(System.out::println);  --> 大写

(2)、由数组创建流

  • 使用静态方法Arrays.stream从数组创建一个流。它接受一个数组作为参数。
int sum = Arrays.stream(new int[]{1, 2, 3, 4, 5}).sum();   --> 15

(3)、由文件生成流

java.nio.file.Files中的很多静态方法都会返回一个流。

  • Files.lines,它会返回一个由指定文件中的各行构成的字符串流。
long uniqueWords = 0;   data.txt:1234?;',2a!`1
try(Stream<String> lines = Files.lines(Paths.get("data.txt"), Charset.defaultCharset())) {uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(""))).distinct().count();System.out.println(uniqueWords);  --> 11
} catch (IOException e) {e.printStackTrace();
}

(4)、由函数生成流:创建无限流

Stream API提供了两个静态方法来从函数生成流: Stream.iterate和Stream.generate。

  • 由iterate和generate产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去!一般来说,应该使用limit(n)来对这种流加以限制,以避免打印无穷多个值
1)、迭代
Stream.iterate(0, n -> n + 2).limit(6).forEach(System.out::println);

iterate方法接受一个初始值(0),还有一个依次应用在每个产生的新值上的Lambda(UnaryOperator<t>类型)。

  • 斐波纳契元组序列
Stream.iterate(new int[]{0, 1}, t -> new int[]{t[1], t[0] + t[1]}).limit(8).map(t -> t[0]).forEach(System.out::println);  --> 0 1 1 2 3 5 8 13
2)、生成

与iterate方法类似, generate方法也可按需生成一个无限流。但generate不是依次对每个新生成的值应用函数的。它接受一个Supplier<T>类型的Lambda提供新的值。

Stream.generate(Math::random).limit(6).forEach(System.out::println);

(斐波纳契数列 )使用有状态的供应源是不安全的。因此下面的代码仅仅是为了内容完整,应尽量避免使用!(后面说明)

IntSupplier fib = new IntSupplier() {private int prev = 0;private int curr = 1;@Overridepublic int getAsInt() {int oldPrev = this.prev;int nextValue = this.prev + this.curr;this.prev = this.curr;this.curr = nextValue;return oldPrev;}
};
IntStream.generate(fib).limit(8).forEach(System.out::println);

8、小结

 Streams API可以表达复杂的数据处理查询。常用的流操作总结在表5-1中。
 你可以使用filter、 distinct、 skip和limit对流做筛选和切片。
 你可以使用map和flatMap提取或转换流中的元素。
 你 可 以 使 用 findFirst 和 findAny 方 法 查 找 流 中 的 元 素 。 你 可 以 用 allMatch 、noneMatch和anyMatch方法让流匹配给定的谓词。
 这些方法都利用了短路:找到结果就立即停止计算;没有必要处理整个流。
 你可以利用reduce方法将流中所有的元素迭代合并成一个结果,例如求和或查找最大元素。
 filter和map等操作是无状态的,它们并不存储任何状态。 reduce等操作要存储状态才能计算出一个值。 sorted和distinct等操作也要存储状态,因为它们需要把流中的所有元素缓存起来才能返回一个新的流。这种操作称为有状态操作。
 流有三种基本的原始类型特化: IntStream、 DoubleStream和LongStream。它们的操作也有相应的特化。
 流不仅可以从集合创建,也可从值、数组、文件以及iterate与generate等特定方法创建。
 无限流是没有固定大小的流。

三、用流收集数据

1、收集器简介

函数式编程相对于指令式编程的一个主要优势:你只需指出希望的结果——“做什么”,而不用操心执行的步骤——“如何做”。

要是做多级分组,指令式和函数式之间的区别更明显:由于需要好多层嵌套循环和条件,指令式代码很快就变得更难阅读、更难维护、更难修改。相比之下,函数式版本只要再加上一个收集器就可以轻松地增强功能了。

(1)、 收集器用作高级归约

  • 函数式API设计:更易复合和重用。
  • Collectors实用类提供了很多静态工厂方法,可以方便地创建常见收集器的实例,只要拿来用就可以了。最直接和最常用的收集器是toList静态方法,它会把流中所有的元素收集到一个List中。

(2)、预定义收集器

  • 即:从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。主要提供了三大功能:

 将流元素归约和汇总为一个值
 元素分组
 元素分区

2、归约和汇总

import java.util.stream.Collectors;

eg:利用counting工厂方法返回的收集器,数一数菜单里有多少种菜?

long howManyDishes = menu.stream().collect(Collectors.counting());
long howManyDishes = menu.stream().count();
List<Dish> menu = new ArrayList<>();
menu.add(new Dish("fish", 200));
menu.add(new Dish("cake", 600));
menu.add(new Dish("banana", 350));
menu.add(new Dish("apple", 400));
menu.add(new Dish("orange", 300));

(1)、查找流中的最大值和最小值

使用两个收集器 Collectors.maxBy和Collectors.minBy,来计算流中的最大或最小值。这两个收集器接收一个Comparator参数来比较流中的元素。

Comparator<Dish> dishes = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> maxCalor = menu.stream().collect(Collectors.maxBy(dishes));
Optional<Dish> minCalor = menu.stream().collect(Collectors.minBy(dishes));
System.out.println("最高热量的菜:" + maxCalor.get() + ";最低热量的菜:" + minCalor.get());

(2)、汇总操作

即:对流中对象的一个数值字段求和、求平均值等。

  • Collectors类专门为汇总提供了工厂方法: Collectors.summingInt(/Long/Double)等。它可接受一个把对象映射为求和所需int/long/double的函数,并返回一个收集器;该收集器在传递给普通的collect方法后即执行需要的汇总操作。
  • 还提供了工厂方法: Collectors.averagingInt(/Long/Double)等。它可接受一个把对象映射为求平均值所需int/long/double的函数,并返回一个收集器;该收集器在传递给普通的collect方法后即执行需要的汇总操作。
int totalCalor = menu.stream().collect(Collectors.summingInt(Dish::getCalories));
double avgCalor = menu.stream().collect(Collectors.averagingDouble(Dish::getCalories));
System.out.println("所有菜的总热量:" + totalCalor + ";所有菜的平均热量:" + avgCalor);  --> 1850  370.0
  • 一次操作完成:通过一次Collectors.summarizingInt(/Long/Double)操作你可以就数出菜单中元素的个数,对应类型:(Long/Double)IntSummaryStatistics,得到菜肴热量总和、平均值、最大值和最小值(通过getter方法获取):
IntSummaryStatistics menuStatistic = menu.stream().collect(Collectors.summarizingInt(Dish::getCalories));
System.out.println("菜的个数、总热量、最低热量、平均热量、最高热量:" + menuStatistic.toString()); --> IntSummaryStatistics{count=5, sum=1850, min=200, average=370.000000, max=600}

(3)、连接字符串

joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。

  • 注意, joining在内部使用了StringBuilder来把生成的字符串逐个追加起来。此外若Dish类有一个toString方法来返回菜肴的名称,那无需用提取每一道菜名称的函数来对原流做映射就能够得到相同的结果:
String shortMenu = menu.stream().map(Dish::getName).collect(Collectors.joining(", ")); --> fish, cake, banana, apple, orange

(4)、广义的归约汇总

以上所有收集器,都是一个可以用reducing工厂方法定义的归约过程的特殊情况而已。 Collectors.reducing工厂方法是所有这些特殊情况的一般化。

int totalCalor = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, (i, j) -> i + j)); --> 1850
  • 它需要三个参数。

 第一个参数是归约操作的起始值,也是流中没有元素时的返回值,所以很显然对于数值和而言0是一个合适的值。
 第二个参数就是使用的函数,将菜肴转换成一个表示其所含热量的int。
 第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值。这里它就是对两个int求和。

1)、收集框架的灵活性:以不同的方法执行同样的操作

简化前面使用reducing收集器的求和例子——引用Integer类的sum方法,而不用去写一个表达同一操作的Lambda表达式。

int totalCalor = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, Integer::sum));  --> 1850

counting收集器也是类似地利用三参数reducing工厂方法实现的。它把流中的每个元素都转换成一个值为1的Long型对象,然后再把它们相加:

public static <T> Collector<T, ?, Long> counting() {return reducing(0L, e -> 1L, Long::sum);
}
  • 使用泛型?通配符:?通配符,用作counting工厂方法返回的收集器签名中的第二个泛型类型。此处,它仅仅意味着收集器的累加器类型未知,即,累加器本身可以是任何类型。

使用的方法引用来归约得到的流:

int totalCalor = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();

在这种情况下使用get方法是安全的。一般来说,使用允许提供默认值的方法,如orElse或orElseGet来解开Optional中包含的值更为安全。

更简洁的方法是把流映射到一个IntStream,然后调用sum方法,你也可以得到相同的结果:

int totalCalor = menu.stream().mapToInt(Dish::getCalories).sum();
2)、 根据情况选择最佳解决方案

收集器在某种程度上比Stream接口上直接提供的方法用起来更复杂,但好处在于它们能提供更高水平的抽象和概括,也更容易重用和自定义。

建议:尽可能为手头的问题探索不同的解决方案,但在通用的方案里面,始终选择最专门化的一个。

3、分组

List<Dish> menu = Arrays.asList(new Dish("pork", false, 800, Dish.Type.MEAT),new Dish("chicken", false, 400, Dish.Type.MEAT),new Dish("french fries", true, 530, Dish.Type.OTHER),new Dish("season fruit", true, 120, Dish.Type.OTHER),new Dish("pizza", true, 550, Dish.Type.OTHER),new Dish("prawns", false, 300, Dish.Type.FISH),new Dish("salmon", false, 450, Dish.Type.FISH)
);

一个常见的数据库操作:根据一个或多个属性对集合中的项目进行分组。Java8使用Collectors.groupingBy工厂方法。

public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier) {return groupingBy(classifier, toList());}
Map<Dish.Type, List<Dish>> dishByType = menu.stream().collect(Collectors.groupingBy(Dish::getType));--> {OTHER=[french fries, season fruit, pizza], MEAT=[pork, chicken], FISH=[prawns, salmon]}

groupingBy方法传递了一个Function(以方法引用的形式),它提取了流中每一道Dish的Dish.Type。这个Function叫作分类函数,因为它用来把流中的元素分成不同的组。

Map<Dish1.CaloricLevel, List<Dish1>> dishByLevel = menu1.stream().collect(Collectors.groupingBy(dish -> {if (dish.getCalories() <= 400) {return Dish1.CaloricLevel.DIET;} else if (dish.getCalories() <= 700) {return Dish1.CaloricLevel.NORMAL;} else {return Dish1.CaloricLevel.FAT;}}
));  --> {FAT=[pork], DIET=[chicken, season fruit, prawns], NORMAL=[french fries, pizza, salmon]}

看到了如何对菜单中的菜肴按照类型和热量进行分组,但要是想同时按照这两个标准分类怎么办呢?

(1)、多级分组

  • 使用一个由双参数版本的Collectors.groupingBy工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数。
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,Collector<? super T, A, D> downstream) {return groupingBy(classifier, HashMap::new, downstream);
}
Map<Dish.Type, Map<Dish.CaloricLevel, List<Dish>>> twoGroup = menu.stream().collect(Collectors.groupingBy(Dish::getType,Collectors.groupingBy(dish -> {if (dish.getCalories() <= 400) {return Dish.CaloricLevel.DIET;}else if (dish.getCalories() <= 700) {return Dish.CaloricLevel.NORMAL;}else {return Dish.CaloricLevel.FAT;}}))
); --> {OTHER={DIET=[season fruit], NORMAL=[french fries, pizza]}, MEAT={FAT=[pork], DIET=[chicken]}, FISH={DIET=[prawns], NORMAL=[salmon]}}

(2)、按子组收集数据

  • eg1:要数一数菜单中每类菜有多少个,可以传递counting收集器作为groupingBy收集器的第二个参数:
Map<Dish.Type, Long> typeCount = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.counting())); --> {OTHER=3, MEAT=2, FISH=2}
  • eg2:用于查找菜单中每种类型中热量最高的菜肴:
Map<Dish.Type, Optional<Dish>> maxCaloricByType = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.maxBy(Comparator.comparing(Dish::getCalories)))
); --> {OTHER=Optional[pizza], MEAT=Optional[pork], FISH=Optional[salmon]}

maxBy工厂方法生成的收集器的类型为Optional

1)、把收集器的结果转换为另一种类型

使用Collectors.collectingAndThen工厂方法返回的收集器。它接受两个参数——要转换的收集器以及转换函数,并返回另一个收集器。

Map<Dish.Type, Dish> maxCalorByType = menu.stream().collect(Collectors.groupingBy(Dish::getType,Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparing(Dish::getCalories)), Optional::get))
); --> {OTHER=pizza, MEAT=pork, FISH=salmon}

这个收集器相当于旧收集器的一个包装, collect操作的最后一步就是将返回值用转换函数做一个映射。

它是如何工作的?

  • 从最外层开始逐层向里,注意以下几点:

 收集器用虚线表示,因此groupingBy是最外层,根据菜肴的类型把菜单流分组,得到三个子流。
 groupingBy收集器包裹着collectingAndThen收集器,因此分组操作得到的每个子流都用这第二个收集器做进一步归约。
 collectingAndThen收集器又包裹着第三个收集器maxBy。
 随后由归约收集器进行子流的归约操作,然后包含它的collectingAndThen收集器会对其结果应用Optional:get转换函数。
 对三个子流分别执行这一过程并转换而得到的三个值,也就是各个类型中热量最高的Dish,将成为groupingBy收集器返回的Map中与各个分类键(Dish的类型)相关联的值。

2)、与groupingBy联合使用的其他收集器的例子

一般来说,通过groupingBy工厂方法的第二个参数传递的收集器将会对分到同一组中的所有流元素执行进一步归约操作。
eg:求出按类型分类的每个类型中菜肴热量总和:

Map<Dish.Type, Integer> totalCalorByType = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.summingInt(Dish::getCalories))
); --> {OTHER=1200, MEAT=1200, FISH=750}

常常和groupingBy联合使用的另一个收集器是mapping方法生成的。这个方法接受两个参数:一个函数对流中的元素做变换,另一个则将变换的结果对象收集起来。其目的是:在累加之前对每个输入元素应用一个映射函数,就可以让接受特定类型元素的收集器适应不同类型的对象。

eg:对菜单中的菜肴按照类型和热量进行分组,并获取每种类型中的热量程度的集合:

Map<Dish.Type, Set<Dish.CaloricLevel>> caloricLevelsByType = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.mapping(dish -> {if (dish.getCalories() <= 400) {return Dish.CaloricLevel.DIET;}else if (dish.getCalories() <= 700) {return Dish.CaloricLevel.NORMAL;}else {return Dish.CaloricLevel.FAT;}}, Collectors.toSet()/toCollection(HashSet::new)))
); --> {OTHER=[DIET, NORMAL], MEAT=[FAT, DIET], FISH=[DIET, NORMAL]}

4、分区

使用提供的工厂方法Collectors.partitioningBy。

public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {return partitioningBy(predicate, toList());
}

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true和 false。

eg1:把菜单按照素食和非素食分开,使用get分别获取两种结果:

Map<Boolean, List<Dish>> partitionedMenu = menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian)
);
System.out.println(partitionedMenu);            --> {false=[pork, chicken, prawns, salmon], true=[french fries, season fruit, pizza]}
System.out.println(partitionedMenu.get(true));  --> [french fries, season fruit, pizza]
System.out.println(partitionedMenu.get(false)); --> [pork, chicken, prawns, salmon]

使用同样的分区谓词,进行筛选filter和收集collect得到同样结果:

List<Dish> vegetDish = menu.stream().filter(Dish::isVegetarian).collect(Collectors.toList());
System.out.println(vegetDish);  --> [french fries, season fruit, pizza]

(1)、分区的优势

分区的好处:保留了分区函数返回true或false的两套流元素列表。

  • partitioningBy工厂方法有一个重载版本,可以传递第二个收集器 :
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream)

eg1:把菜单按照素食和非素食分开,并按照类型分组:

Map<Boolean, Map<Dish.Type, List<Dish>>> vegetDishByType = menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian,Collectors.groupingBy(Dish::getType))
);  -> {false={MEAT=[pork, chicken], FISH=[prawns, salmon]}, true={OTHER=[french fries, season fruit, pizza]}}

eg2:找到素食和非素食中热量最高的菜

Map<Boolean, Dish> maxCalorPartByVeget = menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian,Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparingInt(Dish::getCalories)),Optional::get))
); --> {false=pork, true=pizza}

(2)、将数字按质数和非质数分区

eg:它接受参数int n,并将前n个自然数分为质数和非质数。

public class PrimePartTest {public static void main(String[] args) {System.out.println(partitionPrimes(16));   --> {false=[4, 6, 8, 9, 10, 12, 14, 15, 16], true=[2, 3, 5, 7, 11, 13]}}public static Map<Boolean, List<Integer>> partitionPrimes(int num) {return IntStream.rangeClosed(2, num).boxed().collect(Collectors.partitioningBy(can -> isPrime(can)));}public static boolean isPrime(int candidate) {int candidateRoot = (int) Math.sqrt((double) candidate);return IntStream.rangeClosed(2, candidateRoot).noneMatch(i -> candidate % i == 0);}
}
  • Collectors类的静态工厂方法,以及它们用于一个menuStream的Stream<Dish>上的实际例子。

5、收集器接口

Collector接口的定义,它列出了接口的签名以及声明的五个方法:

public interface Collector<T, A, R> {Supplier<A> supplier();BiConsumer<A, T> accumulator();Function<A, R> finisher();BinaryOperator<A> combiner();Set<Characteristics> characteristics();
}

 T 是流中要收集的项目的泛型。
 A 是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
 R 是收集操作得到的对象(通常但并不一定是集合)的类型。
eg:实现一个ToListCollector<T>类,将Stream<T>中的所有元素收集到一个List<T>里,它的签名如下:

public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

(1)、理解 Collector 接口声明的方法

1)、建立新的结果容器: supplier方法

supplier方法必须返回一个结果为空的Supplier,也就是一个无参数函数,在调用时它会创建一个空的累加器实例,供数据收集过程使用。

  • ToListCollector中, supplier返回一个空的List:
public Supplier<List<T>> supplier() {return () -> new ArrayList<T>();
}
传递构造函数引用 -->
public Supplier<List<T>> supplier() {return ArrayList::new;
}
2)、将元素添加到结果容器: accumulator方法

accumulator方法会返回执行归约操作的函数。当遍历到流中第n个元素时,会有两个参数:保存归约结果的累加器(已收集了流中的前 n-1 个项目), 还有第n个元素本身。

  • ToListCollector中, 这个函数仅仅会把当前项目添加至已经遍历过的项目的列表:
public BiConsumer<List<T>, T> accumulator() {return (list, item) -> list.add(item);
}
方法引用 -->
public BiConsumer<List<T>, T> accumulator() {return List::add;
}
3)、对结果容器应用最终转换: finisher方法

在遍历完流后, finisher方法必须返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果。

  • ToListCollector中, 累加器对象恰好符合预期的最终结果,因此无需进行转换:
public Function<List<T>, List<T>> finisher() {return Function.identity();
}

1)~3)已经足以对流进行顺序归约,至少从逻辑上看可以按下图进行。

4)、合并两个结果容器: combiner方法

combiner方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。

  • toList中,把从流的第二个部分收集到的项目列表加到遍历第一部分时得到的列表后面:
public BinaryOperator<List<T>> combiner() {return (list1, list2) -> {list1.addAll(list2);return list1; };
}

1)~4)可以对流进行并行归约。过程如下图:

 原始流会以递归方式拆分为子流,直到定义流是否需要进一步拆分的一个条件为非。
 现在,所有的子流都可以并行处理,即对每个子流应用图6-7所示的顺序归约算法。
 最后,使用收集器combiner方法返回的函数,将所有的部分结果两两合并。这时会把原始流每次拆分时得到的子流对应的结果合并起来。

5)、characteristics方法

characteristics会返回一个不可变的Characteristics集合,它定义了收集器的行为——尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。

  • Characteristics是一个包含三个项目的枚举:

 UNORDERED——归约结果不受流中项目的遍历和累积顺序的影响。
 CONCURRENT——accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约。
 IDENTITY_FINISH——这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的。

  • eg:ToListCollector是IDENTITY_FINISH的,因为用来累积流中元素的List已经是最终结果,用不着进一步转换 。

(2)、全部融合到一起

6、开发自己的收集器以获得更好的性能

(1)、仅用质数做除数

eg:用Collectors类提供的一个方便的工厂方法创建了一个收集器,它将前n个自然数划分为质数和非质数:

import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.stream.IntStream;import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;/*** 开发自己的Collector接口** @author liumd* @date 2018/04/10*/
public class CollectorInterfaceTest {public static void main(String[] args) {System.out.println(partitionPrimesWithCustomCollector(16));}public static Map<Boolean, List<Integer>>  partitionPrimesWithCustomCollector(int n) {return IntStream.rangeClosed(2, n).boxed().collect(new PrimeNumCollector());}public static class PrimeNumCollector implements Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {@Overridepublic Supplier<Map<Boolean, List<Integer>>> supplier() {//从一个有两个空List的Map开始收集过程return () -> new HashMap<Boolean, List<Integer>>() {{put(true, new ArrayList<Integer>());put(false, new ArrayList<Integer>());}};}@Overridepublic BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {//将已经找到的质数列表传递给isPrime方法return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {//根据isPrime方法的返回值,从Map中取质数或非质数列表,把当前的被测数加进去acc.get(isPrime(acc.get(true), candidate)).add(candidate);};}@Overridepublic Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {//收集过程最后无需转换,因此用identity函数收尾return Function.identity();}@Overridepublic BinaryOperator<Map<Boolean, List<Integer>>> combiner() {//将第二个Map合并到第一个return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));return map1;};}@Overridepublic Set<Characteristics> characteristics() {//这个收集器是IDENTITY_FINISH,但既不是UNORDERED//也不是CONCURRENT,因为质数是按顺序发现的return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));}public static boolean isPrime(List<Integer> primes, int candidate) {int candidateRoot = (int) Math.sqrt((double) candidate);return takeWhile(primes, i -> i <= candidateRoot).stream().noneMatch(p -> candidate % p == 0);}/*** 给定一个排序列表和一个谓词,它会返回元素满足谓词的最长前缀* @param list* @param p* @param <A>* @return*/private static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {int i = 0;for (A item : list) {if (!p.test(item)) {return list.subList(0, i);}i++;}return list;}}
}

(2)、比较收集器的性能

public class CollectorPropertyTest {public static void main(String[] args) {System.out.println("Partitioning done in: " + execute(PrimePartTest::partitionPrimes) + " msecs");  -->424System.out.println("Partitioning done in: " + execute(CollectorInterfaceTest::partitionPrimesWithCustomCollector) + " msecs");  --> 268}private static long execute(Consumer<Integer> primePartitioner) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();primePartitioner.accept(1_000_000);long duration = (System.nanoTime() - start) / 1_000_000;if (duration < fastest) {fastest = duration;}System.out.println("done in " + duration);}return fastest;}
}

自定义收集器性能较好!

  • 为了避免实现Collector接口创建一个全新的类,可以通过把实现PrimeNumCollector核心逻辑的三个函数传给collect方法的重载版本来获得同样的结果:
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.stream.IntStream;import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;/*** 开发自己的Collector接口** @author liumd* @date 2018/04/10*/
public class CollectorInterfaceTest {public static void main(String[] args) {System.out.println(partPrimeWithCustomCollector(16));}public static Map<Boolean, List<Integer>>  partPrimeWithCustomCollector(int n) {return IntStream.rangeClosed(2, n).boxed().collect(() -> new HashMap<Boolean, List<Integer>>() {{put(true, new ArrayList<Integer>());put(false, new ArrayList<Integer>());}},(acc, candidate) -> {acc.get(isPrime(acc.get(true), candidate)).add(candidate);},(map1, map2) -> {map1.get(true).addAll(map2.get(true));map1.get(false).addAll(map2.get(false));});}public static boolean isPrime(List<Integer> primes, int candidate) {int candidateRoot = (int) Math.sqrt((double) candidate);return takeWhile(primes, i -> i <= candidateRoot).stream().noneMatch(p -> candidate % p == 0);}/*** 给定一个排序列表和一个谓词,它会返回元素满足谓词的最长前缀* @param list* @param p* @param <A>* @return*/private static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {int i = 0;for (A item : list) {if (!p.test(item)) {return list.subList(0, i);}i++;}return list;}
}

四、并行数据处理与性能

Java 7引入了一个叫作分支/合并的框架,让下面操作更稳定、更不易出错:

  • 明确地把包含数据的数据结构分成若干子部分。
  • 要给每个子部分分配一个独立的线程。
  • 需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并。

1、并行流

Java8中,Stream接口提供可以通过对收集源调用Collector.parallelStream方法来把集合转换为并行流。

  • 并行流:把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

例子,接受数字n作为参数,并返回从1到给定参数的所有数字的和:

(1)、将顺序流转换为并行流

对顺序流调用parallel方法进行并行归纳。可 以 通 过 系 统 属 性 java.util.concurrent.ForkJoinPool.common.
parallelism来改变线程池大小,eg:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

(2)、测量流性能

测试迭代式、顺序归纳和并行归纳:

public class ParallelStreamTest {public static void main(String[] args) {long num = 10_000_000;//迭代式System.out.println("迭代式计算时间: " + measureSumPerf(ParallelStreamTest::iterativeSum, num) + "ms"); --> 5//顺序归纳System.out.println("顺序归纳计算时间: " + measureSumPerf(ParallelStreamTest::sequentialSum, num) + "ms"); --> 94//并行归纳System.out.println(" 并行归纳计算时间: " + measureSumPerf(ParallelStreamTest::parallelSum, num) + "ms"); --> 88}public static long measureSumPerf(Function<Long, Long> adder, long n) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();long sum = adder.apply(n);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Result: " + sum);if (duration < fastest) {fastest = duration;}}return fastest;}public static long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result;}public static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}public static long parallelStreamSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}
}
  • 在2核英特尔i5 2.5 GHz的Win10下执行,经测试发现:迭代式快很多,顺序式和并行式差不多。并行效果不佳的原因:

 iterate生成的是装箱的对象,必须拆箱成数字才能求和;
 很难把iterate分成多个独立块来并行执行。

把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

1)、改进:使用更有针对性的方法:LongStream.rangeClosed

 LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。
 LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。

public static long rangedSum(long n) {return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);
}
  • 经测试发现,该方法快很多,和迭代式运算时间差不多,比顺序归纳和并行归纳快很多。原因:数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。
2)、rangeClosed方法并行处理
public static long parallelRangedSum(long n) {return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
}
  • 经测试发现,该方法并行更快!运行快慢:rangeClosed并行 > rangeClosed、迭代式 > 顺序式、并行式。

注意,并行也有代价。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。

(3)、正确使用并行流

错用并行流而产生错误的首要原因:使用的算法改变了某些共享状态。

错误例子:

顺序 -->
public static long sideEffectSum(long n) {Accumulator acc = new Accumulator();LongStream.rangeClosed(1, n).forEach(acc::add);return acc.total;
}
并行 -->
public static long sideEffectParallelSum(long n) {Accumulator acc = new Accumulator();LongStream.rangeClosed(1, n).parallel().forEach(acc::add);return acc.total;
}
  • 经测试发现,上述并行方法每次输出结果不一样,结果错误!原因:由于多个线程在同时访问累加器,执行total += value,并非原子计算。问题的根源在于, forEach中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。
  • 共享可变状态会影响并行流以及并行计算。

(4)、高效使用并行流

《Java8实战》中给出以下定性意见,帮助决定某个特定情况下是否有必要使用并行流:

 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。此外,并行流有时候会和你的直觉不一致,所
以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
 留意装箱。自动装箱和拆箱操作会大大降低性能。 Java 8中有原始类型流(IntStream、LongStream、 DoubleStream)来避免这种操作,但凡有可能都应该用这些流。
 有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如, findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成
无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。
 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数, Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。 Q值较高就意味着使用并行流时性能好的可能性比较大。
 对于较小数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
 要考虑流背后的数据结构是否易于分解。例如, ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。最后,也可以自己实现Spliterator来完全掌控分解过程。

 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

  • 按照可分解性总结了一些流数据源适不适于并行:

  • 注意:并行流背后使用的基础架构是Java 7中引入的分支/合并框架。

2、分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

(1)、使用 RecursiveTask

要把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当然它可能会更新其他非局部机构)。

要定义RecursiveTask, 只需实现它唯一的抽象方法compute:protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。伪代码:

if (任务足够小或不可分) {顺序计算该任务
} else {将任务分成两个子任务递归调用本方法,拆分每个子任务,等待所有子任务完成合并每个子任务的结果
}
  • eg:用分支/合并框架执行并行求和
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;/*** 用分支/合并框架执行并行求和*      继承RecursiveTask来创建可以用于分支/合并框架的任务** @date 2018/04/12*/
public class ForkJoinSumCalculator extends RecursiveTask {/*** 子任务处理的数组的起始和终止位置*/private final long[] numbers;private final int start;private final int end;//不再将任务分解为子任务的数组大小public static final long THRESHOLD = 10_000;/*** 公共构造函数用于创建主任务*/public ForkJoinSumCalculator(long[] numbers) {this(numbers, 0, numbers.length);}/*** 私有构造函数用于以递归方式为主任务创建子任务*/private ForkJoinSumCalculator(long[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}/*** 覆盖RecursiveTask抽象方法*/@Overrideprotected Long compute() {//该任务负责求和的部分的大小int length = end -start;//如果大小<=阈值,顺序计算结果if (length <= THRESHOLD) {return computeSequentially();}//创建一个子任务来为数组的前一半求和ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);//利用另一个ForkJoinPool线程异步执行新创建的子任务leftTask.fork();ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);//同步执行第二个子任务,有可能允许进一步递归划分Long rightResult = rightTask.compute();//读取第一个子任务的结果,如果尚未完成就等待Long leftResult = (Long) leftTask.join();//该任务的结果是两个子任务结果的组合return leftResult + rightResult;}/*** 在子任务不在可分时计算结果的简单算法*/private long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {sum += numbers[i];}return sum;}/*** 并行对前n个自然数求和*/public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task);}public static void main(String[] args) {long start = System.nanoTime();System.out.println(forkJoinSum(10_000_000) + ", 并行计算时间: " + ((System.nanoTime() - start) / 1_000_000)); --> 150}
}

  • 经测试发现,性能比并行流稍微差些!

(2)、使用分支/合并框架的最佳做法

以下是几个有效使用它的最佳做法:

 调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用join。
 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
 调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支-合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,后者是调用fork的那个。
 和并行流一样,不应认为在多核处理器上使用分支/合并框架就比顺序计算快。

  • 对于分支/合并拆分策略还有最后一点补充:必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值。

(3)、工作窃取

理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。

不幸的是:实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如

磁盘访问慢,或是需要和外部服务协调执行。

  • 分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。

3、Spliterator

Spliterator是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitableiterator)。

  • 和Iterator一样, Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
public interface Spliterator<T> {boolean tryAdvance(Consumer<? super T> action);Spliterator<T> trySplit();long estimateSize();int characteristics();
}

T是Spliterator遍历的元素的类型。

 tryAdvance方法的行为类似于普通的Iterator,它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true。

 但trySplit是专为Spliterator接口设计的,它可以把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。

 Spliterator还可通过estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。

(1)、拆分过程

将Stream拆分成多个部分的算法是一个递归过程:

 对第一个Spliterator调用trySplit,生成第二个Spliterator。

 对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。

 这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。

 最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。

  • Spliterator的特性:Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代表Spliterator本身特性集的编码。

(2)、实现你自己的 Spliterator(略)

Java8实战学习笔记(三)——函数式数据处理相关推荐

  1. Java8实战学习笔记(四)——高效 Java 8 编程(一)

    一.重构.测试和调试 (一).为改善可读性和灵活性重构代码 用更紧凑的方式描述程序的行为 -- Lambda表达式 将一个既有的方法作为参数传递给另一个方法 -- 方法引用 如何运用前几章介绍的Lam ...

  2. JavaScript实战学习笔记三20200412

    JavaScript 队列 在计算机科学中 队列(queue)是一个抽象的数据结构,队列中的数据条目都是有秩序的.新的条目会被加到 队列 的末尾,旧的条目会从 队列 的头部被移出. /*用一个数组ar ...

  3. 《Java8实战》笔记汇总

    <Java8实战>笔记(01):为什么要关心Java8 <Java8实战>笔记(02):通过行为参数传递代码 <Java8实战>笔记(03):Lambda表达式 & ...

  4. Java 8 实战学习笔记

    Java 8 实战学习笔记 @(JAVASE)[java8, 实战, lambda] 文章目录 Java 8 实战学习笔记 参考内容 Lambda表达式 Lambda环绕执行模式(抽离步骤) 原始代码 ...

  5. MongoDB 入门教程实战学习笔记-31-mongo 聚合查询管道 Aggregation Pipieline

    aggregation 聚合操作处理数据记录并返回计算结果. 聚合操作将多个文档中的值组合在一起, 并且可以对分组数据执行各种操作以返回单个结果. mongodb 提供了三种执行聚合的方法: 聚合管道 ...

  6. Linux性能优化实战学习笔记:第四十六讲=====实战分析

    Linux性能优化实战学习笔记:第四十六讲 一.上节回顾 不知不觉,我们已经学完了整个专栏的四大基础模块,即 CPU.内存.文件系统和磁盘 I/O.以及网络的性能分析和优化.相信你已经掌握了这些基础模 ...

  7. Word2vec原理+实战学习笔记(二)

    来源:投稿 作者:阿克西 编辑:学姐 前篇:Word2vec原理+实战学习笔记(一)​​​​​​​ 视频链接:https://ai.deepshare.net/detail/p_5ee62f90022 ...

  8. “物联网开发实战”学习笔记-(二)手机控制智能电灯

    "物联网开发实战"学习笔记-(二)手机控制智能电灯 如果搭建好硬件平台后,这一次我们的任务主要是调试好智能电灯,并且连接到腾讯云的物联网平台. 腾讯云物联网平台 腾讯物联网平台的优 ...

  9. 第55课:60分钟内从零起步驾驭Hive实战学习笔记

    第55课:60分钟内从零起步驾驭Hive实战学习笔记 本期内容: 1. Hive本质解析 2. Hive安装实战 3. 使用Hive操作搜索引擎数据实战 SparkSQL前身是Shark,Shark强 ...

最新文章

  1. Android studio 设置忽略文件
  2. 【面试】我是如何面试别人List相关知识的,深度有点长文
  3. 输入两个数,计算它们的最大公约数和最小公倍数
  4. oracle SQL 命令行(一.基础)
  5. ANSI编码和Unicode编码的不同
  6. 面试英语自我介绍的常用词汇
  7. 自定义 HBase-MapReduce1
  8. CentOS 6.4 64位系统U盘安装
  9. php生成图片文件流,php 如何把图片转化为字节流存储到数据库?
  10. 峥果智能连接不到服务器,峥果浴霸 ESP8285版本 固件
  11. 关于PLC的“源型”和“漏型”
  12. 使用深度学习自动给图片生成文字描述
  13. 储存profiles是什么意思_程序开发里面的profile 是什么意思
  14. ubuntu20.04设置登录壁纸
  15. flash常见问题集锦,很适合新手哦
  16. 今天19:30 | 复旦大学青年副研究员许嘉蓉—《基于图数据的鲁棒机器学习 》
  17. Windows 安全资料库网站
  18. 社保照片PS修改教程
  19. 全国计算机三级网络技术电子版,全国计算机三级网络技术最新版笔试电子教材(完全免费版).doc...
  20. iOS 照片存储到沙盒 和读取

热门文章

  1. 全球公有云:亚马逊半壁江山 阿里稳居前三
  2. 职场小白做短视频,用好了这些辅助工具,每天收入增加了200多
  3. 史上最简单的Spring Security教程(八):用户登出成功LogoutSuccessHandler高级用法
  4. 安全漏洞是如何被发现的?
  5. 高效工作的法宝推荐,小小便签助你快人一步
  6. 转岗前景分析:人工智能、大数据开发是未来高薪的趋势?
  7. 小学计算机应聘简历,小学计算机教师求职简历范文素材赏析
  8. Win10打开图片需要使用新应用以打开此ms-paint链接解决办法(简便)
  9. 自动化设备数据采集系统如何实现
  10. 可以表白的生日祝福(附源码与修改教程)