zoukankan      html  css  js  c++  java
  • JAVA8学习——Stream底层的实现二(学习过程)

    继续深入Stream的底层实现过程

    2.spliterator()

    接上 https://www.cnblogs.com/bigbaby/p/12159495.html

    我们这次回到最开始源码分析的地方

        public static <T> Spliterator<T> spliterator(Collection<? extends T> c,
                                                     int characteristics) {
            return new IteratorSpliterator<>(Objects.requireNonNull(c),
                                             characteristics);
        }
    

    IteratorSpliteratorSpliterators中有一个静态实现:

    
            static final int BATCH_UNIT = 1 << 10;  // batch array size increment
            static final int MAX_BATCH = 1 << 25;  // max batch array size;
            private final Collection<? extends T> collection; // null OK
            private Iterator<? extends T> it;
            private final int characteristics;
            private long est;             // size estimate
            private int batch;            // batch size for splits
    
            /**
             * Creates a spliterator using the given given
             * collection's {@link java.util.Collection#iterator()) for traversal,
             * and reporting its {@link java.util.Collection#size()) as its initial
             * size.
             *
             * @param c the collection
             * @param characteristics properties of this spliterator's
             *        source or elements.
             */
            public IteratorSpliterator(Collection<? extends T> collection, int characteristics) {
                this.collection = collection;
                this.it = null;
                this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
                                       ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
                                       : characteristics;
            }
    
            /**
             * Creates a spliterator using the given iterator
             * for traversal, and reporting the given initial size
             * and characteristics.
             *
             * @param iterator the iterator for the source
             * @param size the number of elements in the source
             * @param characteristics properties of this spliterator's
             * source or elements.
             */
            public IteratorSpliterator(Iterator<? extends T> iterator, long size, int characteristics) {
                this.collection = null;
                this.it = iterator;
                this.est = size;
                this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
                                       ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
                                       : characteristics;
            }
    
            /**
             * Creates a spliterator using the given iterator
             * for traversal, and reporting the given initial size
             * and characteristics.
             *
             * @param iterator the iterator for the source
             * @param characteristics properties of this spliterator's
             * source or elements.
             */
            public IteratorSpliterator(Iterator<? extends T> iterator, int characteristics) {
                this.collection = null;
                this.it = iterator;
                this.est = Long.MAX_VALUE;
                this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
            }
    
            @Override
            public Spliterator<T> trySplit() {
                /*
                 * Split into arrays of arithmetically increasing batch
                 * sizes.  This will only improve parallel performance if
                 * per-element Consumer actions are more costly than
                 * transferring them into an array.  The use of an
                 * arithmetic progression in split sizes provides overhead
                 * vs parallelism bounds that do not particularly favor or
                 * penalize cases of lightweight vs heavyweight element
                 * operations, across combinations of #elements vs #cores,
                 * whether or not either are known.  We generate
                 * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
                 * potential speedup.
                 */
                Iterator<? extends T> i;
                long s;
                if ((i = it) == null) {
                    i = it = collection.iterator();
                    s = est = (long) collection.size();
                }
                else
                    s = est;
                if (s > 1 && i.hasNext()) {
                    int n = batch + BATCH_UNIT;
                    if (n > s)
                        n = (int) s;
                    if (n > MAX_BATCH)
                        n = MAX_BATCH;
                    Object[] a = new Object[n];
                    int j = 0;
                    do { a[j] = i.next(); } while (++j < n && i.hasNext());
                    batch = j;
                    if (est != Long.MAX_VALUE)
                        est -= j;
                    return new ArraySpliterator<>(a, 0, j, characteristics);
                }
                return null;
            }
    
            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                if (action == null) throw new NullPointerException();
                Iterator<? extends T> i;
                if ((i = it) == null) {
                    i = it = collection.iterator();
                    est = (long)collection.size();
                }
                i.forEachRemaining(action);
            }
    
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if (action == null) throw new NullPointerException();
                if (it == null) {
                    it = collection.iterator();
                    est = (long) collection.size();
                }
                if (it.hasNext()) {
                    action.accept(it.next());
                    return true;
                }
                return false;
            }
    
            @Override
            public long estimateSize() {
                if (it == null) {
                    it = collection.iterator();
                    return est = (long)collection.size();
                }
                return est;
            }
    
            @Override
            public int characteristics() { return characteristics; }
    
            @Override
            public Comparator<? super T> getComparator() {
                if (hasCharacteristics(Spliterator.SORTED))
                    return null;
                throw new IllegalStateException();
            }
        }
    

    提供给了 几种构造方法。

    就直接返回了IteratorSpliterator 对象

    然后StreamSupport提供了stream方法。调用了spliterator()。上面已经获取了这个参数。

        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
        }
    如下实现:
        /**
         * Creates a new sequential or parallel {@code Stream} from a
         * {@code Spliterator}.
         *
         * <p>The spliterator is only traversed, split, or queried for estimated
         * size after the terminal operation of the stream pipeline commences.
         *
         * <p>It is strongly recommended the spliterator report a characteristic of
         * {@code IMMUTABLE} or {@code CONCURRENT}, or be
         * <a href="../Spliterator.html#binding">late-binding</a>.  Otherwise,
         * {@link #stream(java.util.function.Supplier, int, boolean)} should be used
         * to reduce the scope of potential interference with the source.  See
         * <a href="package-summary.html#NonInterference">Non-Interference</a> for
         * more details.
         *
         * @param <T> the type of stream elements
         * @param spliterator a {@code Spliterator} describing the stream elements
         * @param parallel if {@code true} then the returned stream is a parallel
         *        stream; if {@code false} the returned stream is a sequential
         *        stream.
         * @return a new sequential or parallel {@code Stream}
         */
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
        }
    

    这里又出现了一个 ReferencePipeline.Head<>()

    用于描述管道的中间阶段,或者管道的中间流阶段。

    ReferencePipeline extends-> AbstractPipeline

    跟!继续往里跟!


    感想:记录

    记录,笔记。记录你的理解。过一段时间,有些遗忘的时候,往回看一看。特别是这种不是引用型的代码.就算在实际开发当中,也不会去看源码的。

    通过记录,去给别人讲一遍。你的理解会更好、

    因为底层的代码和引用型的代码。学习方式是不一样的。

    很多人说 工作几年之后就剩下增删改查的东西了。 其他底层的东西都没有去留住。

    现在学习的这些东西就是底层的这些东西。建议:在学习过程中,把这些给记录下来。真正变成自己的一部分。

    还有就是千万不要去死记硬背。背下来的东西肯定会忘掉。用你的知识体系去理解你学到的这些知识点。


    ReferencePipeline

    ReferencePipeline含有stream中是及其重要的方法stream,filter,map,等。

    /**
     * Abstract base class for an intermediate pipeline stage or pipeline source
     * stage implementing whose elements are of type {@code U}.
        抽象的基础类, 	管道阶段 或者 管道源阶段的   统一成一个ReferencePipeline
        (将流的两种阶段合并起来了。)
     *
     * @param <P_IN> type of elements in the upstream source
     * @param <P_OUT> type of elements in produced by this stage
     *
     * @since 1.8
     */
    abstract class ReferencePipeline<P_IN, P_OUT>
            extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
            implements Stream<P_OUT>  {
        
    }
    
    //继承了AbstractPipeline  非常重要。
    //实现了 Stream 接口
    
        ReferencePipeline(Supplier<? extends Spliterator<?>> source,
                          int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
    第一个构造方法,一定是构造 源,阶段
    

    ReferencePipeline.Head

    HeadReferencePipeline的内部类

    这个类是为了处理 源阶段 和中间阶段,的区别。

    1.ReferencePipeline表示流的源阶段和中间阶段

    2.ReferencePipeline.Head表示流的源阶段,

    二者大部分属性的设定上是类似的。但是一些特定属性的值不一样。如果说

    /**
         * Source stage of a ReferencePipeline.
         *
         * @param <E_IN> type of elements in the upstream source
         上流源,元素的类型
         * @param <E_OUT> type of elements in produced by this stage
         这个阶段所生成的类型
         * @since 1.8
         */
        static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
            
        }
    

    AbstractPipeline

    ReferencePipeline的父类,最底层的实现类 (先来看Doc)

    /**
     * Abstract base class for "pipeline" classes, which are the core
     * implementations of the Stream interface and its primitive specializations.
     * Manages construction and evaluation of stream pipelines.
     这个抽象的pipeline的基类,是流接口及其核心特化的核心实现。
     管理:计算,构建,评估。
     *
     * <p>An {@code AbstractPipeline} represents an initial portion of a stream
     * pipeline, encapsulating a stream source and zero or more intermediate
     * operations.  The individual {@code AbstractPipeline} objects are often
     * referred to as <em>stages</em>, where each stage describes either the stream
     * source or an intermediate operation.
     一个AbstractPipeline 表示了 初始的部分。
     封装了一个源的 0个或者多个中间操作。
     每一个单个的AbstractPipeline对象,通常被叫做 “阶段”、
     每一个阶段,要么描述的是源,要么描述的是中间操作。
     
     *
     * <p>A concrete intermediate stage is generally built from an
     * {@code AbstractPipeline}, a shape-specific pipeline class which extends it
     * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
     * concrete class which extends that.  {@code AbstractPipeline} contains most of
     * the mechanics of evaluating the pipeline, and implements methods that will be
     * used by the operation; the shape-specific classes add helper methods for
     * dealing with collection of results into the appropriate shape-specific
     * containers.
     一个具体的中间阶段,通常是通过一个AbstractPipeline来构建的。这是一个形状特化的管道类来继承了它。
     如 IntPipeline 。。等等、
     
     AbstractPipeline包含了大量的 特换的方法,
     避免了自己装箱和拆箱的操作。
     
     *
     * <p>After chaining a new intermediate operation, or executing a terminal
     * operation, the stream is considered to be consumed, and no more intermediate
     * or terminal operations are permitted on this stream instance.
     当链接一个新的中间操作。 在一个中间操作 或者 终止操作 之后。这个流就会被认为:被消费掉了。
     那么这个被消费的了流。已经不能再继续的执行任何的操作了。
     
     *
     * @implNote
     * <p>For sequential streams, and parallel streams without
     * <a href="package-summary.html#StreamOps">stateful intermediate
     * operations</a>, parallel streams, pipeline evaluation is done in a single
     * pass that "jams" all the operations together.  For parallel streams with
     * stateful operations, execution is divided into segments, where each
     * stateful operations marks the end of a segment, and each segment is
     * evaluated separately and the result used as the input to the next
     * segment.  In all cases, the source data is not consumed until a terminal
     * operation begins.
     对于串行流  以及没有状态的中间操作的 并行流。  管道的计算是在单个的一次的操作当中完成的。
    并且每个管道会将每个元素的所有操作,一次全部执行。然后才去执行下一个元素.
    所以存在 “短路”操作。
    
    对于有状态的并行流。会被分为多个有标识的 段。
    每个段都会有输入,然后有输出。
    输出会被用到下一个段。
    然后遇到终止操作的时候,这个流才会被消费。
     
     *
     * @param <E_IN>  type of input elements
     * @param <E_OUT> type of output elements
     * @param <S> type of the subclass implementing {@code BaseStream}
     * @since 1.8
     */
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    }
    

    构造方法 (最底层的构造)

        /**
         * Constructor for the head of a stream pipeline.
         *
         * @param source {@code Supplier<Spliterator>} describing the stream source
         * @param sourceFlags The source flags for the stream source, described in
         * {@link StreamOpFlag}
         * @param parallel True if the pipeline is parallel
         */
        AbstractPipeline(Supplier<? extends Spliterator<?>> source,
                         int sourceFlags, boolean parallel) {
            this.previousStage = null;
            this.sourceSupplier = source;
            this.sourceStage = this;
            this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
            // The following is an optimization of:
            // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
            this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
            this.depth = 0;
            this.parallel = parallel;
        }
    

    往上返回,往上返回,往上返回。回到自己写的程序上。(目前,流源已经构建好了。)

    3. forEach()方法

    forEach()在stream接口中的定义

        /**
         * Performs an action for each element of this stream.
         对流中每一个元素执行传入的action
         *
         * <p>This is a <a href="package-summary.html#StreamOps">terminal
         * operation</a>.
         这是一个终止操作
         *
         * <p>The behavior of this operation is explicitly nondeterministic.
         * For parallel stream pipelines, this operation does <em>not</em>
         * guarantee to respect the encounter order of the stream, as doing so
         * would sacrifice the benefit of parallelism.  For any given element, the
         * action may be performed at whatever time and in whatever thread the
         * library chooses.  If the action accesses shared state, it is
         * responsible for providing the required synchronization.
         这个操作的行为是不确定的。
         对于并行流管道来说,这个操作并不会保证 流中元素的顺序。
         因为如果这样做的话,就会牺牲并行的优势。
         对于任何给定的元素,动作会在任何的时间在任何的线程上执行。
         如果这个动作选择了共享的状态,那么这个动作就要提供同步的动作。
         
         (可以多运行几下程序看看结果。)
         *
         * @param action a <a href="package-summary.html#NonInterference">
         *               non-interfering</a> action to perform on the elements
         */
        void forEach(Consumer<? super T> action);
    
    

    forEach()的两个具体实现

    1. 在Head中

    (被源操作执行的时候,默认调用 Head里面的实现)

            // Optimized sequential terminal operations for the head of the pipeline
    
            @Override
            public void forEach(Consumer<? super E_OUT> action) {
                if (!isParallel()) {
                    sourceStageSpliterator().forEachRemaining(action);
                }
                else {
                    super.forEach(action);
                }
            }
    

    2. 在ReferencePipeline中。

    (执行中间操作的时候,默认执行ReferencePipeline里的实现)

        // Terminal operations from Stream
    
        @Override
        public void forEach(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, false));
        }
    
    

    forEach()的实现

            // Optimized sequential terminal operations for the head of the pipeline
    
            @Override
            public void forEach(Consumer<? super E_OUT> action) {
                if (!isParallel()) {
                    sourceStageSpliterator().forEachRemaining(action);
                }
                else {
                    super.forEach(action);
                }
            }
    

    forEach 调用了sourceStageSpliterator() 和 forEachRemaining(action)

    sourceStageSpliterator()

    /**
         * Gets the source stage spliterator if this pipeline stage is the source
         * stage.  The pipeline is consumed after this method is called and
         * returns successfully.
         *
         * @return the source stage spliterator
         * @throws IllegalStateException if this pipeline stage is not the source
         *         stage.
         */
        @SuppressWarnings("unchecked")
        final Spliterator<E_OUT> sourceStageSpliterator() {
            if (this != sourceStage)
                throw new IllegalStateException();
    
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    
            if (sourceStage.sourceSpliterator != null) {
                @SuppressWarnings("unchecked")
                Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
                sourceStage.sourceSpliterator = null;
                return s;
            }
            else if (sourceStage.sourceSupplier != null) {
                @SuppressWarnings("unchecked")
                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
                sourceStage.sourceSupplier = null;
                return s;
            }
            else {
                throw new IllegalStateException(MSG_CONSUMED);
            }
        }
    

    forEachRemaining(Consumer<? super T> action)

    forEachRemaining的实现就比较多了。这里追到了Iterator类中的forEachRemaining(Consumer<? super T> action)的实现

    追到最后:还是用的传统的Iterator()方法。

    Iterator类中的forEachRemaining(Consumer<? super T> action)的实现

        /**
         * Performs the given action for each remaining element until all elements
         * have been processed or the action throws an exception.  Actions are
         * performed in the order of iteration, if that order is specified.
         * Exceptions thrown by the action are relayed to the caller.
         *
         * @implSpec
         * <p>The default implementation behaves as if:
         * <pre>{@code
         *     while (hasNext())
         *         action.accept(next());
         * }</pre>
         *
         * @param action The action to be performed for each element
         * @throws NullPointerException if the specified action is null
         * @since 1.8
         */
        default void forEachRemaining(Consumer<? super E> action) {
            Objects.requireNonNull(action);
            while (hasNext())
                action.accept(next());
        }
    

    但是不要被迷惑了。因为这是最简单的一种例子。最后

    由外部迭代转换成了内部迭代。

    可以跟一下 IterationSpliterator对象里的Spliterator()方法

    这里的forEachRemaining(Consumer<? super T> action)方法的多种实现。也是一个重点。

    Debug -> Arrays 里面还有一个 ArrayList。

    public class StreamTest3 {
        public static void main(String[] args) {
            List<String> list = Arrays.asList("hello", "world", "hello world");
            list.stream().forEach(System.out::println);
        }
    }
    

    里面有一个重写的方法。所以调用的是基于数组的Arrays.spliterator()

    Spliterators类中:line:940
        
    		@SuppressWarnings("unchecked")
            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                Object[] a; int i, hi; // hoist accesses and checks from loop
                if (action == null)
                    throw new NullPointerException();
                if ((a = array).length >= (hi = fence) &&
                    (i = index) >= 0 && i < (index = hi)) {
                    do { action.accept((T)a[i]); } while (++i < hi);
                }
            }
    

    看看map()方法的流程

    list.stream().map(item -> item).forEach(System.out::println);
    

    ReferencePipeline类中的map实现:opWrapSink()

        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    

    中间操作时候的AbstractPipeline的构造方法:(追加一个操作到上一个流操作。)

    两个构造方法完成的方法是完全不一样的。

        /**
         * Constructor for appending an intermediate operation stage onto an
         * existing pipeline.
         *
         * @param previousStage the upstream pipeline stage
         * @param opFlags the operation flags for the new stage, described in
         * {@link StreamOpFlag}
         */
        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;
    
            this.previousStage = previousStage;
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;
        }
    

    Sink对象(连接引用对象)

    
    /**
     * An extension of {@link Consumer} used to conduct values through the stages of
     * a stream pipeline, with additional methods to manage size information,
     * control flow, etc.  Before calling the {@code accept()} method on a
     * {@code Sink} for the first time, you must first call the {@code begin()}
     * method to inform it that data is coming (optionally informing the sink how
     * much data is coming), and after all data has been sent, you must call the
     * {@code end()} method.  After calling {@code end()}, you should not call
     * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
     * offers a mechanism by which the sink can cooperatively signal that it does
     * not wish to receive any more data (the {@code cancellationRequested()}
     * method), which a source can poll before sending more data to the
     * {@code Sink}.
     *
     * <p>A sink may be in one of two states: an initial state and an active state.
     * It starts out in the initial state; the {@code begin()} method transitions
     * it to the active state, and the {@code end()} method transitions it back into
     * the initial state, where it can be re-used.  Data-accepting methods (such as
     * {@code accept()} are only valid in the active state.
     *
     * @apiNote
     * A stream pipeline consists of a source, zero or more intermediate stages
     * (such as filtering or mapping), and a terminal stage, such as reduction or
     * for-each.  For concreteness, consider the pipeline:
     *
     * <pre>{@code
     *     int longestStringLengthStartingWithA
     *         = strings.stream()
     *                  .filter(s -> s.startsWith("A"))
     *                  .mapToInt(String::length)
     *                  .max();
     * }</pre>
     *
     * <p>Here, we have three stages, filtering, mapping, and reducing.  The
     * filtering stage consumes strings and emits a subset of those strings; the
     * mapping stage consumes strings and emits ints; the reduction stage consumes
     * those ints and computes the maximal value.
     *
     * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
     * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
     * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
     * not need a specialized interface for each primitive specialization.  (It
     * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
     * point to the pipeline is the {@code Sink} for the filtering stage, which
     * sends some elements "downstream" -- into the {@code Sink} for the mapping
     * stage, which in turn sends integral values downstream into the {@code Sink}
     * for the reduction stage. The {@code Sink} implementations associated with a
     * given stage is expected to know the data type for the next stage, and call
     * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
     * each stage must implement the correct {@code accept} method corresponding to
     * the data type it accepts.
     *
     * <p>The specialized subtypes such as {@link Sink.OfInt} override
     * {@code accept(Object)} to call the appropriate primitive specialization of
     * {@code accept}, implement the appropriate primitive specialization of
     * {@code Consumer}, and re-abstract the appropriate primitive specialization of
     * {@code accept}.
     *
     * <p>The chaining subtypes such as {@link ChainedInt} not only implement
     * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
     * represents the downstream {@code Sink}, and implement the methods
     * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
     * delegate to the downstream {@code Sink}.  Most implementations of
     * intermediate operations will use these chaining wrappers.  For example, the
     * mapping stage in the above example would look like:
     *
     * <pre>{@code
     *     IntSink is = new Sink.ChainedReference<U>(sink) {
     *         public void accept(U u) {
     *             downstream.accept(mapper.applyAsInt(u));
     *         }
     *     };
     * }</pre>
     *
     * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
     * to receive elements of type {@code U} as input, and pass the downstream sink
     * to the constructor.  Because the next stage expects to receive integers, we
     * must call the {@code accept(int)} method when emitting values to the downstream.
     * The {@code accept()} method applies the mapping function from {@code U} to
     * {@code int} and passes the resulting value to the downstream {@code Sink}.
     *
     * @param <T> type of elements for value streams
     * @since 1.8
     */
    interface Sink<T> extends Consumer<T> {
    }
    

    执行原理: begin(激活状态)-> accept () -> end() .

    A {@code Sink} instance is used to represent each stage of this pipeline,

    一个sink,代表管道的每一个阶段。

    链接引用:ChainedReference: 是Sink

        /**
         * Abstract {@code Sink} implementation for creating chains of
         * sinks.  The {@code begin}, {@code end}, and
         * {@code cancellationRequested} methods are wired to chain to the
         * downstream {@code Sink}.  This implementation takes a downstream
         * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
         * implementation of the {@code accept()} method must call the correct
         * {@code accept()} method on the downstream {@code Sink}.
         */
        static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
            protected final Sink<? super E_OUT> downstream;
    
            public ChainedReference(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    
            @Override
            public void begin(long size) {
                downstream.begin(size);
            }
    
            @Override
            public void end() {
                downstream.end();
            }
    
            @Override
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();
            }
        }
    

    可以得出一个结论:

    流的操作并不是一个一个链式的执行的。

    而是先拿出来一个元素,执行所有的操作。执行完毕之后,再拿出来一个元素进行下一次操作。

      //ReferencePipeline的 map()方法
    
    @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));//在这里打断点可以看出来
                        }
                    };
                }
            };
        }
    

    看了stream(),map(),filter()方法的执行源码,对流的整理流程有了大概的认识。剩下的方法的流程是可以举一反三的。


    伴随着这个执行过程深入的结束,流的学习到此也到了一个标记。

  • 相关阅读:
    数据库基础-INDEX
    LINQ教程
    NPOI导出EXCEL
    WPF数据双向绑定
    WPF控件数据单项绑定
    HelloWorld IL代码
    Python基础教程(英文视频教学)
    ado.net的5个主要对象
    Linux学习-0627
    C#中Abstract和Virtual
  • 原文地址:https://www.cnblogs.com/bigbaby/p/12160915.html
Copyright © 2011-2022 走看看