例子:
怎样用map和reduce方法数一数流中有多少个菜呢?
答案:要解决这个问题,你可以把流中每个元素都映射成数字1,然后用reduce求和。这相当于按顺序数流中的元素个数。
int count = menu.stream() .map(d -> 1) .reduce(0, (a, b) -> a + b);
map和reduce的连接通常称为map-reduce模式,因Google用它来进行网络搜索而出名,因为它很容易并行化。同时,内置count方法也可用来计算流中元素的个数:
long count = menu.stream().count();
归约方法的优势与并行化 相比于前面写的逐步迭代求和,使用reduce的好处在于,这里的迭代被内部迭代抽象掉了,这让内部实现 得以选择并行执行reduce操作。而迭代式求和例子要更新共享变量sum,这不是那么容易并行化的。如果你 加入了同步,很可能会发现线程竞争抵消了并行本应带来的性能提升!这种计算的并行化需要另一种办法: 将输入分块,分块求和,最后再合并起来。但这样的话代码看起来就完全不一样了。以后你会认识到使用 分支/合并框架来做是什么样子。但现在重要的是要认识到,可变的累加器模式对于并行化来说是死路一条。 你需要一种新的模式,这正是reduce所提供的。使用流来对所有的元素并行求和时,你的代码几乎不用修改: stream()换成了parallelStream()。 int sum = numbers.parallelStream().reduce(0, Integer::sum); 但要并行执行这段代码也要付一定代价:传递给reduce的Lambda不能更改状态(如实例变量),而且操作 必须满足结合律才可以按任意顺序执行。
流操作:无状态和有状态 你已经看到了很多的流操作。乍一看流操作简直是灵丹妙药,而且只要在从集合生成流的时候把Stream 换成parallelStream就可以实现并行。当然,对于许多应用来说确实是这样,就像前面的那些例子。你可以 把一张菜单变成流,用filter选出某一类的菜肴,然后对得到的流做map来对卡路里求和,最后reduce得到菜 单的总热量。这个流计算甚至可以并行进行。但这些操作的特性并不相同。它们需要操作的内部状态还是有些 问题的。 诸如map或filter等操作会从输入流中获取每一个元素,并在输出流中得到0或1个结果。这些操作一般 都是无状态的:它们没有内部状态(假设用户提供的Lambda或方法引用没有内部可变状态)。 但诸如reduce、sum、max等操作需要内部状态来累积结果。在上面的情况下,内部状态很小。在我们的例 子里就是一个int或double。不管流中有多少元素要处理,内部状态都是有界的。 相反,诸如sort或distinct等操作一开始都和filter和map差不多——都是接受一个流,再生成一个流 (中间操作),但有一个关键的区别。从流中排序和删除重复项时都需要知道先前的历史。例如,排序要求所有 元素都放入缓冲区后才能给输出流加入一个项目,这一操作的存储要求是无界的。要是流比较大或是无限的, 就可能会有问题(把质数流倒序会做什么呢?它应当返回最大的质数,但数学告诉我们它不存在)。我们 把这些操作叫作有状态操作。
操作 | 类型 | 返回类型 | 使用的类型/函数式接口 | 函数描述符 |
filter | 中间 | Stream<T> | Predicate<T> | T -> boolean |
distinct | 中间(有状态-无界) | Stream<T> | ||
skip | 中间(有状态-有界) | Stream<T> | long | |
limit | 中间 | Strem<T> | long | |
map | 中间 | Stream<R> | Function<T,R> | T -> R |
flatMap | 中间(有状态-无界) | Stream<R> | Function<T,Stream<R>> | T -> Stream<R> |
sorted | 终端 | Stream<T> | Comparator<T> | (T, T) -> int |
anyMatch | 终端 | boolean | Predicate<T> | T -> boolean |
noneMatch | 终端 | boolean | Predicate<T> | T -> boolean |
allMatch | 终端 | boolean | Predicate<T> | T -> boolean |
findAny | 终端 | Optional<T> | ||
findFirst | 终端 | Optional<T> | ||
forEach | 终端 | void | Consumer<T> | T -> void |
collect | 终端 | R | Collector<T,A,R> | |
reduce | 终端(有状态-有界) | Optional<T> | BinaryOperator<T> | (T, T) -> T |
count | 终端 | long |