基本概念
-
BaseStream 基础流是一个可行并行或者串行的汇聚操作的元素序列。可以进行顺序遍历,也可以进行并发遍历。通过它也可以得到一个并行流或者串行流。
-
Stream 是Java中流的表现接口,Stream继承自BaseStream,并在BaseStream的基础上增加了fiter、map、distinct、sorted、peek、limit、reduce等可以针对流中的元素进行的更加具具体操作的方法。
-
实现stream的具体设施是Pipeline(流水线),流水线是对流中元素进行加工的地方。流水线进行工作时,需要提供一个操作对象,一个stream可能会包含多个流水线,这些流水线会按照链表的方式组装起来,形成上下游关系。
流水线有如下几个概念:
- Shape: 表示这个流水线要处理的元素的类型(如引用,int,long,double)。
- StreamAndOpFlag: 此流水的标志,比如是否可以并行,是否有序等。
- depth: 在串行流中,代表当前流水线阶段到源的中间操作数。如果是并行流,则代表当前阶段到上一个有状态流水线的中间操作数。
-
pipeline经常是被串在一起的,每一个pipeline叫做一个stage,stage可以是stream的源头,也可以是stream的一个中间操作。
-
Sink Sink代表一个具体的操作,它扩展类Consumer接口,可以接受一个参数并执行一个操作。在stream机制中,sink是对流中的元素进行具体操作的对象,stream支持的操作也是通过sink来实现的。
-
spliterator 在流的底层,对元素的遍历都是使用spliterator来进行的。spliterator可以看成是iterator的并行版本,它可以对集合元素进行顺序遍历,批量遍历,并且可以对列表元素进行分割,以便并行处理。
-
Collection.stream 这个default方法,会返回一个Stream接口表示的对象,这个对象是一个ReferencePipeline,这个pipeline代表整个stream的源(source)。
-
流操作分为两类,中间操作和最终操作,中间操作一般是对元素进行过滤或者映射,最终操作中间操作并不会立即去对流进行遍历,而是仅仅生成一个新的pipeline,并将新生成的pipeline作为被调用的pipeline的下游流水线。最终操作几乎总是会积极地执行,他们会对元素进行遍历,并执行所有流水线。
-
stateless,stateful,流水线操作可能是有状态的,也可能是无状态的。一个有状态的流水线操作是指它的操作结果依赖于流水线操作过程中可能发生变化的任何状态。有状态的流水线在并行执行的时候,可能还会带来并发问题或者效率问题。比如:
Set<Integer> seen = new HashSet<>();
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
对于seen,它会被并发地访问,导致不确定的结果。如果对seem进行互斥操作,则会降低并行的效率,违背并行的本意。
所以要尽量使用无状态操作的。
关键代码
-
java.util.Collection#stream
生成一个可以遍历当前集合的流对象,使用当前对象的spliterator来进行遍历。此流对象是ReferencePipeline.Head,代表一个源pipeiline。 -
java.util.stream.ReferencePipeline#filter(predict)
返回一个无状态操作(statelssOp),此无状态操作也是一个pipeline,并且,再接受一个元素之前,会先用predict进行检查,如果预测失败,则不会讲请求传达到下游流水线,达到过滤的效果。 -
java.util.stream.LongPipeline#reduce(identity, op)
使用给定的恒等操作值identiry和二元操作函数op对流进行归纳运算。
如果是串行操作,则会将此流水线传给第二元操作函数op进行求值。 -
java.util.stream.AbstractPipeline#wrapSink
sink是流中进行具体操作的对象,wrapSink是将当前流水线和它的上游流水线串联起来,当前流水线作为下游。
TerminalOp使用sink来执行具体的操作。(java.util.stream.ReduceOps.ReduceOp#makeSink)