zoukankan      html  css  js  c++  java
  • java基础-Steam[4]-Steam简单示例以及运行原理

    简单Stream示例

    @Test
    public void test (){
        getData().stream().filter(person -> person.getAge() >24).map(Person::getName).forEach(System.out::println);
    }
    private static ArrayList<Person> getData() {
        ArrayList<Person> list=new ArrayList<>();
        list.add(new Person("1",20));
        list.add(new Person("2",22));
        list.add(new Person("3",24));
        list.add(new Person("4",26));
        list.add(new Person("5",28));
        list.add(new Person("6",20));
        list.add(new Person("7",18));
        list.add(new Person("8",16));
        return list;
    }
    

    流程分析:

    public interface Collection<E> extends Iterable<E> {
        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
        }
    }
    
    

    这里的spliterator()为上一节中ArrayListSpliterator

    Stream接口抽象类:AbstractPipeline

    //
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
          //反向链接到pipeline链的头部,如果是源,则是自己
        @SuppressWarnings("rawtypes")
        private final AbstractPipeline sourceStage;
    
        //上游pipeline
        @SuppressWarnings("rawtypes")
        private final AbstractPipeline previousStage;
    
        //操作标志
        protected final int sourceOrOpFlags;
    
        //下一个pipeline
        @SuppressWarnings("rawtypes")
        private AbstractPipeline nextStage;
    
        //深度
        //如果有一个阶段是有状态或者是并行的好像有区别
        private int depth;
    
        //combined source and operation flags 
        private int combinedFlags;
    
        //源spliterator,只在head pipeline有效
        //并且流被消费后会置为null
        private Spliterator<?> sourceSpliterator;
    
        //source supplier,只在 head pipeline 有效. 
        //并且流被消费后会置为null
        private Supplier<? extends Spliterator<?>> sourceSupplier;
    
        //True if this pipeline has been linked or consumed
        private boolean linkedOrConsumed;
    
        //在源阶段,标记pipeline是否为stateful操作
        private boolean sourceAnyStateful;
    
        private Runnable sourceCloseAction;
    
        //是否是并行流
        private boolean parallel;
        
        
        AbstractPipeline(Spliterator<?> source,
                         int sourceFlags, boolean parallel) {
            this.previousStage = null;//首操作上一步为null
            this.sourceSpliterator = source;//Spliterator源
            this.sourceStage = this;
            //示例sourceFlags为80
            this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
            // The following is an optimization of:
            // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
            this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
            this.depth = 0;
            this.parallel = parallel;
        }
    }
    

    AbstractPipeline是所有Stream实现的父类,请参考第一张uml类图

    这里初始化可以看出,stream会组合出一条Stream pipeline linked list

    Head Stream

    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
       		 Objects.requireNonNull(spliterator);
       		 return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
        }
    }
    

    简单的创建一个AbstractPipeline:

    previousStage为null

    sourceSpliterator为ArrayListSpliterator

    sourceStage:为当前pipeline

    depth:为0

    filter()

    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        //filter是一个无状态的操作
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                             StreamOpFlag.NOT_SIZED) {
            //最后在TerminalOp中会调用该方法,组合一个Sink链,收集结果
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
                    //如果Predicate为true,才传递给下游downstream
                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    //StatelessOp初始化
    abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
            implements Stream<P_OUT>  {
       ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
            super(upstream, opFlags);
        }
        abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
            StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {
                //调用ReferencePipeline(a,b)
                super(upstream, opFlags);
                assert upstream.getOutputShape() == inputShape;
            }
    
            @Override
            final boolean opIsStateful() {
                return false;
            }
        }
        
    }
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            //以下几行代码完成上Pipeline链的关联操作
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;
            this.previousStage = previousStage;
            
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;
        }
    }
    

    初始化StatelessOp,AbstractPipeline的子类.

    将Head Stream的nextStage指定为当前StatelessOp(fileter类型)

    将当前StatelessOp的previousStage指定为前面的Head stream

    同时sourceStage保存Head stream

    map()

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        //AbstractPipeline的子类,初始化
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            //最后在TerminalOp中会调用该方法,组合一个Sink链,收集结果
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        //让下游downstream接受mapper映射结果
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    

    foreach()

    //java.util.stream.ReferencePipeline#forEach
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }
    //java.util.stream.ForEachOps#makeRef
    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }
    //java.util.stream.ForEachOps.ForEachOp.OfRef
    static final class OfRef<T> extends ForEachOp<T> {
        final Consumer<? super T> consumer;
    
        OfRef(Consumer<? super T> consumer, boolean ordered) {
            super(ordered);
            this.consumer = consumer;
        }
    
        @Override
        public void accept(T t) {
            consumer.accept(t);
        }
    }
    //执行,java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    //java.util.stream.ForEachOps.ForEachOp#evaluateSequential
    abstract static class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }
    }
    //java.util.stream.AbstractPipeline#wrapAndCopyInto
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    //java.util.stream.AbstractPipeline#wrapSink
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
    
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            //这里每个AbstractPipeline子类实现不一样,在前面的几个例子中也能看出来
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }
    //java.util.stream.AbstractPipeline#copyInto
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
    
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //开始遍历集合,接受consumer接口
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    

    wrapSink()方法最后会将pipeline总所有操作组合为一个Sink,并且不包括head stream,因为head stream 的depth为0

    spliterator.forEachRemaining()遍历集合,接受consumer接口,也就是调用consumer.accept()方法

    ForEachOp没有实现AbstractPipeline,所以不在pipelinechain中,而是直接实现了Sink接口以及TerminalOp接口,所在wrapSink时,ForEachOp本身作为Sink链第一个(但最后被调用)

    创建Sink单向链才是目的,AbstractPipeline只是保存了一个双向链条以及根据不同类型封装不同的accept方法

    Sink

    ChainedReference

    abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;
        //初始化时必须保存downstream为上一个sink
        //构建一个sink单向链
        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }
    
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    
        @Override
        public void end() {
            downstream.end();
        }
    
        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }
    

    accept()方法由子类定义,如java.util.stream.ForEachOps.ForEachOp.OfRef

    图解

    stream

  • 相关阅读:
    Elasticsearch之CURL命令的bulk批量操作
    Elasticsearch之CURL命令的DELETE
    Elasticsearch之CURL命令的UPDATE
    Elasticsearch之CURL命令的HEAD
    Elasticsearch之CURL命令的mget查询
    Elasticsearch之CURL命令的DSL查询
    Elasticsearch之CURL命令的GET
    Elasticsearch之CURL命令的PUT和POST对比
    Elasticsearch之sense插件安装之后的浏览详解
    [转]Sorting, Filtering, and Paging with the Entity Framework in an ASP.NET MVC Application (3 of 10)
  • 原文地址:https://www.cnblogs.com/froggengo/p/14675982.html
Copyright © 2011-2022 走看看