简单Stream示例
@Test
public void test (){
getData().stream().filter(person -> person.getAge() >24).map(Person::getName).forEach(System.out::println);
}
private static ArrayList<Person> getData() {
ArrayList<Person> list=new ArrayList<>();
list.add(new Person("1",20));
list.add(new Person("2",22));
list.add(new Person("3",24));
list.add(new Person("4",26));
list.add(new Person("5",28));
list.add(new Person("6",20));
list.add(new Person("7",18));
list.add(new Person("8",16));
return list;
}
流程分析:
public interface Collection<E> extends Iterable<E> {
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
}
这里的spliterator()为上一节中
ArrayListSpliterator
Stream接口抽象类:AbstractPipeline
//
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
//反向链接到pipeline链的头部,如果是源,则是自己
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
//上游pipeline
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
//操作标志
protected final int sourceOrOpFlags;
//下一个pipeline
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
//深度
//如果有一个阶段是有状态或者是并行的好像有区别
private int depth;
//combined source and operation flags
private int combinedFlags;
//源spliterator,只在head pipeline有效
//并且流被消费后会置为null
private Spliterator<?> sourceSpliterator;
//source supplier,只在 head pipeline 有效.
//并且流被消费后会置为null
private Supplier<? extends Spliterator<?>> sourceSupplier;
//True if this pipeline has been linked or consumed
private boolean linkedOrConsumed;
//在源阶段,标记pipeline是否为stateful操作
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
//是否是并行流
private boolean parallel;
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;//首操作上一步为null
this.sourceSpliterator = source;//Spliterator源
this.sourceStage = this;
//示例sourceFlags为80
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
}
AbstractPipeline是所有Stream实现的父类,请参考第一张uml类图
这里初始化可以看出,stream会组合出一条Stream pipeline linked list
Head Stream
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
}
简单的创建一个
AbstractPipeline
:previousStage为null
sourceSpliterator为
ArrayListSpliterator
sourceStage:为当前pipeline
depth:为0
filter()
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
//filter是一个无状态的操作
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
//最后在TerminalOp中会调用该方法,组合一个Sink链,收集结果
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
//如果Predicate为true,才传递给下游downstream
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
//StatelessOp初始化
abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {
//调用ReferencePipeline(a,b)
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
}
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
//以下几行代码完成上Pipeline链的关联操作
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
}
初始化
StatelessOp
,AbstractPipeline的子类.将Head Stream的
nextStage
指定为当前StatelessOp(fileter类型)将当前StatelessOp的previousStage指定为前面的Head stream
同时sourceStage保存Head stream
map()
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
//AbstractPipeline的子类,初始化
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
//最后在TerminalOp中会调用该方法,组合一个Sink链,收集结果
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
//让下游downstream接受mapper映射结果
downstream.accept(mapper.apply(u));
}
};
}
};
}
foreach()
//java.util.stream.ReferencePipeline#forEach
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
//java.util.stream.ForEachOps#makeRef
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
//java.util.stream.ForEachOps.ForEachOp.OfRef
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
//执行,java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//java.util.stream.ForEachOps.ForEachOp#evaluateSequential
abstract static class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
}
//java.util.stream.AbstractPipeline#wrapAndCopyInto
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
//java.util.stream.AbstractPipeline#wrapSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
//这里每个AbstractPipeline子类实现不一样,在前面的几个例子中也能看出来
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
//java.util.stream.AbstractPipeline#copyInto
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//开始遍历集合,接受consumer接口
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
wrapSink()
方法最后会将pipeline总所有操作组合为一个Sink,并且不包括head stream,因为head stream 的depth为0
spliterator.forEachRemaining()
遍历集合,接受consumer接口,也就是调用consumer.accept()方法ForEachOp没有实现AbstractPipeline,所以不在pipelinechain中,而是直接实现了Sink接口以及TerminalOp接口,所在
wrapSink
时,ForEachOp本身作为Sink链第一个(但最后被调用)创建Sink单向链才是目的,AbstractPipeline只是保存了一个双向链条以及根据不同类型封装不同的accept方法
Sink
ChainedReference
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
//初始化时必须保存downstream为上一个sink
//构建一个sink单向链
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
accept()方法由子类定义,如
java.util.stream.ForEachOps.ForEachOp.OfRef