zoukankan      html  css  js  c++  java
  • Stream 源码分析

    Stream

    支持顺序和并行聚合操作的一组元素序列。
        1)operations:支持在单个元素上执行的操作,流操作分为中间操作和终止操作
        1-1)中间操作:
            1-1-1)无状态:unordered()、filter()、map()、mapToInt()、mapToLong()、mapToDouble、
                          flatMap()、flatMapToInt()、flatMapToLong()、flatMapToDouble()、
                          peek()
            1-1-2)有状态:distinct()、sorted()、limit()、skip()
        1-2)终止操作:
            1-2-1)非短路操作:forEach()、forEachOrdered()、toArray()、min()、max()、count()、
                          collect()、reduce()
            1-2-2)短路操作: findFirst()、findAny()、anyMatch()、noneMatch()、allMatch()
        2)stream pipeline:将多个流操作串联的流管道
    流是延迟处理的,直到遇到一个终止操作时,才会触发流管道计算。
    已经执行终止操作的流不能再次触发计算。
    

    流管道

    • 流管道的创建【以 ArrayList 为数据源】:.stream()、parallelStream()
    Collection#
        /**
         *  返回一个顺序流,集合中的元素就是数据源 
         */
        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
        }
    
        /**
         *  返回一个并行流,集合中的元素就是数据源 
         */
        default Stream<E> parallelStream() {
            return StreamSupport.stream(spliterator(), true);
        }
    
    ArrayList#
        /**
         * 创建一个延时绑定和快速失败的分割迭代器
         */
        @Override
        public Spliterator<E> spliterator() {
            return new ArrayListSpliterator(0, -1, 0);
        }
    
        /** 基于索引实现二分、延迟初始化的 Spliterator */
        final class ArrayListSpliterator implements Spliterator<E> {
            /**
             * 当前索引,在调用 advance/split 时修改
             */
            private int index;
            /**
             * 初始状态是 -1,使用之后是索引的上界
             */
            private int fence;
            /**
             * 快速失败计数器
             */
            private int expectedModCount;
    
            /** 创建一个覆盖给定索引范围的新  ArrayListSpliterator*/
            ArrayListSpliterator(int origin, int fence, int expectedModCount) {
                this.index = origin;
                this.fence = fence;
                this.expectedModCount = expectedModCount;
            }
    
            private int getFence() {
                int hi;
                // 第一次使用时初始化为元素个数
                if ((hi = fence) < 0) {
                    expectedModCount = modCount;
                    hi = fence = size;
                }
                return hi;
            }
    
            /**
             *  对此 Spliterator 进行拆分,一分为二
             */
            @Override
            public ArrayListSpliterator trySplit() {
                /**
                 * hi:high 索引上界,不包括
                 * lo:low 索引下界,包括
                 * mid:middle 二分索引
                 */
                final int hi = getFence(), lo = index, mid = lo + hi >>> 1;
            // 将范围分成两半,直到无法分割为止【高低索引相邻】
            return lo >= mid ? null : // divide range in half unless too small
                new ArrayListSpliterator(lo, index = mid, expectedModCount);
            }
    
            /**
             * 如果此 Spliterator 中还有元素可用,则将低索引位的元素传递给 action 进行消费
             * 同时递增 index【一次消费一个元素】
             */
            @Override
            public boolean tryAdvance(Consumer<? super E> action) {
                if (action == null) {
                    throw new NullPointerException();
                }
                final int hi = getFence(), i = index;
                if (i < hi) {
                    index = i + 1;
                    @SuppressWarnings("unchecked")
                    // 读取元素
                    final E e = (E)elementData[i];
                    // 执行消费过程
                    action.accept(e);
                    if (modCount != expectedModCount) {
                        throw new ConcurrentModificationException();
                    }
                    return true;
                }
                return false;
            }
    
            /**
             *  一次性消费此 Spliterator 中的所有元素
             */
            @Override
            public void forEachRemaining(Consumer<? super E> action) {
                int i, hi, mc; // hoist accesses and checks from loop
                Object[] a;
                if (action == null) {
                    throw new NullPointerException();
                }
                if ((a = elementData) != null) {
                    if ((hi = fence) < 0) {
                        mc = modCount;
                        hi = size;
                    } else {
                        mc = expectedModCount;
                    }
                    // 读取并更新 index
                    if ((i = index) >= 0 && (index = hi) <= a.length) {
                        // 顺序消费 Spliterator 中的所有元素
                        for (; i < hi; ++i) {
                            @SuppressWarnings("unchecked")
                            final E e = (E) a[i];
                            action.accept(e);
                        }
                        if (modCount == mc) {
                            return;
                        }
                    }
                }
                throw new ConcurrentModificationException();
            }
    
            /**
             * 获取此分割迭代器的估计可用元素数【ArrayListSpliterator 是精确的】
             */
            @Override
            public long estimateSize() {
                return getFence() - index;
            }
    
            /**
             *  此分割迭代器的特性
             */
            @Override
            public int characteristics() {
                return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
            }
        }
    
    StreamSupport#
        /**
         *  基于一个 Spliterator【分割迭代器】创建一个顺序或并行的流
         */
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
        }
    
    StreamOpFlag#
        /**
         *  将 Spliterator 的特征值转换为流的标志位
         */
        static int fromCharacteristics(Spliterator<?> spliterator) {
            // 读取特征值
            final int characteristics = spliterator.characteristics();
            if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) {
                // Do not propagate the SORTED characteristic if it does not correspond to a natural sort order
                return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED;
            }
            else {
                // 转换为流标识
                return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
            }
        }
    
    
    ReferencePipeline#Head
        static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
            /**
             * 创建流水线的管道头
             */
            Head(Spliterator<?> source,
                    int sourceFlags, boolean parallel) {
                super(source, sourceFlags, parallel);
            }
        }
    
    ReferencePipeline#
        /**
         *  创建流水线的管道头
         */
        ReferencePipeline(Spliterator<?> source,
                int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
    
    AbstractPipeline#
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
    extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
        private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
        private static final String MSG_CONSUMED = "source already consumed or closed";
    
        /**
         *  流水线的源阶段【即第一个流管道】
         */
        @SuppressWarnings("rawtypes")
        private final AbstractPipeline sourceStage;
    
        /**
         *  当前流管道的上一阶段,如果是源流,则为 null
         */
        @SuppressWarnings("rawtypes")
        private final AbstractPipeline previousStage;
    
        /**
         *  此阶段操作的操作标识
         */
        protected final int sourceOrOpFlags;
    
        /**
         *  此流管道的下一阶段
         */
        @SuppressWarnings("rawtypes")
        private AbstractPipeline nextStage;
    
        /**
         *  顺序流:当前阶段和源流之间存在的中间阶段的个数
         *  并行流:上一阶段的状态
         */
        private int depth;
    
        /**
         *  组合了源流和所有中间阶段的流标识和操作标识
         */
        private int combinedFlags;
    
        /**
         *  源流的分割迭代器,用于产生元素
         */
        private Spliterator<?> sourceSpliterator;
    
        /**
         *  源流的分割迭代器生成器,如果 sourceSpliterator == null
         */
        private Supplier<? extends Spliterator<?>> sourceSupplier;
    
        /**
         *  此流管道已经被链接或消费
         */
        private boolean linkedOrConsumed;
    
        /**
         *  流水线中存在有状态的流管道
         */
        private boolean sourceAnyStateful;
    
        /**
         *  此流管道关闭时的后置操作
         */
        private Runnable sourceCloseAction;
    
        /**
         *  此流管道是否是并行的
         */
        private boolean parallel;
    
        /**
         *  流水线头部管道的构造函数
         */
        AbstractPipeline(Spliterator<?> source,
                int sourceFlags, boolean parallel) {
            this.previousStage = null;
            this.sourceSpliterator = source;
            this.sourceStage = this;
            this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
            // The following is an optimization of:
            // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
            this.combinedFlags = ~(sourceOrOpFlags << 1) & StreamOpFlag.INITIAL_OPS_VALUE;
            // 源阶段的 depth=0
            this.depth = 0;
            this.parallel = parallel;
        }
    
    • 无状态流管道的链接【以 map 为例】
    ReferencePipeline#
        /**
         *  基于 mapper 创建一个无状态的流管道,并将其链接到此流管道之后
         */
        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            /**
             * 新流管道的操作为 mapper
             * 新流管道的操作标识为 NOT_SORTED、NOT_DISTINCT
             * 下一阶段的操作为 sink【反向链接】
             */
            return new StatelessOp<>(this, StreamShape.REFERENCE,
                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            /**
                             * 接收上游阶段发送的数据 u,并进行当前阶段的处理,
                             * 并将结果发送给下游阶段处理
                             */
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    
        /**
         *  一个无状态的流管道
         */
        abstract static class StatelessOp<E_IN, E_OUT>
        extends ReferencePipeline<E_IN, E_OUT> {
            
            /**
             * 将此流管道追加到上游管道  upstream 之后
             */
            StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
                super(upstream, opFlags);
                assert upstream.getOutputShape() == inputShape;
            }
    
            /**
             *  此管道的操作是无状态的
             */
            @Override
            final boolean opIsStateful() {
                return false;
            }
        }
    
        /**
         * 将此流管道追加到上游管道 upstream 之后
         */
        ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
            super(upstream, opFlags);
        }
    
        /**
         *  将此流管道追加到 previousStage 之后
         */
        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            // 前一阶段已经被链接或消费,则不允许重复消费
            if (previousStage.linkedOrConsumed) {
                throw new IllegalStateException(MSG_STREAM_LINKED);
            }
            // 前一阶段已经被链接
            previousStage.linkedOrConsumed = true;
            // 设置前一阶段的后置阶段为当前阶段
            previousStage.nextStage = this;
    
            // 写入前置阶段
            this.previousStage = previousStage;
            // 写入此阶段的操作标识
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            // 合并前一阶段和此阶段的流操作标识
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            // 写入管道头
            this.sourceStage = previousStage.sourceStage;
            // 此阶段是否是有状态操作
            if (opIsStateful()) {
                sourceStage.sourceAnyStateful = true;
            }
            // 写入中间操作计数值(上一阶段 + 1)
            this.depth = previousStage.depth + 1;
        }
    
    • 有状态流管道的链接【sorted()】
    ReferencePipeline#
        /**
         *  将一个排序的流管道追加到此流管道之后
         */
        @Override
        public final Stream<P_OUT> sorted() {
            return SortedOps.makeRef(this);
        }
    
    SortedOps#
        /**
         *  将一个排序管道追加到 upstream 之后
         */
        static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
            return new OfRef<>(upstream);
        }
    
    ReferencePipeline#
        /**
         *  有状态的流管道
         */
        abstract static class StatefulOp<E_IN, E_OUT>
        extends ReferencePipeline<E_IN, E_OUT> {
            /**
             * 将一个有状态的流管道追加到 upstream 之后
             */
            StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
                super(upstream, opFlags);
                assert upstream.getOutputShape() == inputShape;
            }
    
            /**
             *  此流管道是有状态的
             */
            @Override
            final boolean opIsStateful() {
                return true;
            }
    
            @Override
            abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
                    Spliterator<P_IN> spliterator,
                    IntFunction<E_OUT[]> generator);
        }
    
        /**
         *  用于对引用流进行排序的管道
         */
        private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
            /**
             * 是否是自然顺序
             */
            private final boolean isNaturalSort;
            /**
             * 排序使用的比较器
             */
            private final Comparator<? super T> comparator;
    
            /**
             * 使用自然顺序排序
             */
            OfRef(AbstractPipeline<?, T, ?> upstream) {
                super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
                this.isNaturalSort = true;
                final Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
                this.comparator = comp;
            }
    
            /**
             * 使用指定的比较器排序
             */
            OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
                super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
                this.isNaturalSort = false;
                this.comparator = Objects.requireNonNull(comparator);
            }
    
            @Override
            public Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);
    
                // 1)如果上游管道是已排序的,并且是按照自然顺序排序的,则此流管道可以忽略
                if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
                    return sink;
                // 2)如果上游管道是已知大小的
                } else if (StreamOpFlag.SIZED.isKnown(flags)) {
                    return new SizedRefSortingSink<>(sink, comparator);
                // 3)如果上游管道是未知大小的    
                } else {
                    return new RefSortingSink<>(sink, comparator);
                }
            }
        }
    
    SortedOps#
        private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
            /**
             * 排序元素的比较器
             */
            protected final Comparator<? super T> comparator;
            // 是否取消接收上游的元素
            protected boolean cancellationRequestedCalled;
    
            AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
                super(downstream);
                this.comparator = comparator;
            }
    
            /**
             *  在排序元素被发送到下游时,能够保存短路行为【流水线中存在短路操作】
             */
            @Override
            public final boolean cancellationRequested() {
                cancellationRequestedCalled = true;
                return false;
            }
        }
    
        private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
            // 暂存元素的数组
            private T[] array;
            // 当前元素偏移
            private int offset;
    
            SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
                super(sink, comparator);
            }
    
            @Override
            public void begin(long size) {
                if (size >= Nodes.MAX_ARRAY_SIZE) {
                    throw new IllegalArgumentException(Nodes.BAD_SIZE);
                }
                // 创建固定大小的对象数组,用于接收上游发送的元素
                array = (T[]) new Object[(int) size];
            }
    
            /**
             * 上游元素发送完毕,开始执行排序操作,并将排序后的元素发送到下游
             */
            @Override
            public void end() {
                // 执行元素排序
                Arrays.sort(array, 0, offset, comparator);
                // 发送通知给下游管道,准备接收数据
                downstream.begin(offset);
                // 1)当前管道的下游不存在短路操作
                if (!cancellationRequestedCalled) {
                    // 顺序发送所有元素
                    for (int i = 0; i < offset; i++) {
                        downstream.accept(array[i]);
                    }
                // 2)当前管道的下游存在短路操作
                } else {
                    // 先发送一个元素,之后每次发送前都询问下游是否继续接收,下游拒绝接收元素则退出循环
                    for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
                        downstream.accept(array[i]);
                    }
                }
                // 通知下游管道,数据发送完毕
                downstream.end();
                // 回收数组
                array = null;
            }
    
            /**
             * 接收上游发送的单个元素
             */
            @Override
            public void accept(T t) {
                array[offset++] = t;
            }
        }
    
    /**
     *  能够消费上游管道发送的元素,同时存储状态的 Sink
     */
    interface Sink<T> extends Consumer<T> {
        /**
         *  通知下游管道,重置状态以接收新的数据集
         */
        default void begin(long size) {}
    
        /**
         *  通知下游管道,数据已经推送完毕,可以执行聚合处理
         */
        default void end() {}
    
        /**
         *  询问下游管道是否还需要继续推送数据,适用于短路操作
         */
        default boolean cancellationRequested() {
            return false;
        }
    }
    
    Sink#ChainedReference
        /**
         *  链式引用 sink
         */
        abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
            // 下游 sink
            protected final Sink<? super E_OUT> downstream;
    
            public ChainedReference(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    
            @Override
            public void begin(long size) {
                downstream.begin(size);
            }
    
            @Override
            public void end() {
                downstream.end();
            }
    
            @Override
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();
            }
        }
    
    • 链接终端操作执行计算【forEach()】
    ReferencePipeline#
        @Override
        public void forEach(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, false));
        }
    
    ForEachOps#
        /**
         *  创建一个 TerminalOp,遍历并处理流中的每个引用对象
         */
        public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                boolean ordered) {
            Objects.requireNonNull(action);
            return new ForEachOp.OfRef<>(action, ordered);
        }
    
        abstract static class ForEachOp<T>
        implements TerminalOp<T, Void>, TerminalSink<T, Void> {
            // 遍历是否是有序的
            private final boolean ordered;
    
            protected ForEachOp(boolean ordered) {
                this.ordered = ordered;
            }
    
            // 获取此操作的操作标识
            @Override
            public int getOpFlags() {
                return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
            }
    
            /**
             *  将此终端操作链接到流管道尾部,并将 spliterator 中的每个元素都发送到 sink 中
             */
            @Override
            public <S> Void evaluateSequential(PipelineHelper<T> helper,
                    Spliterator<S> spliterator) {
                return helper.wrapAndCopyInto(this, spliterator).get();
            }
    
            /**
             *  并行评估 spliterator 中的元素
             */
            @Override
            public <S> Void evaluateParallel(PipelineHelper<T> helper,
                    Spliterator<S> spliterator) {
                if (ordered) {
                    new ForEachOrderedTask<>(helper, spliterator, this).invoke();
                } else {
                    new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
                }
                return null;
            }
    
            static final class OfRef<T> extends ForEachOp<T> {
                /**
                 * 实际消费者
                 */
                final Consumer<? super T> consumer;
    
                OfRef(Consumer<? super T> consumer, boolean ordered) {
                    super(ordered);
                    this.consumer = consumer;
                }
    
                /**
                 *  处理上游发送的单个元素
                 */
                @Override
                public void accept(T t) {
                    consumer.accept(t);
                }
            }
        }
    
    AbstractPipeline#
        /**
         *  使用终端操作 terminalOp 对此流管道进行处理,处理过程中会从后往前链接形成流水线
         */
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            // 此阶段的输出类型==终端操作的输入类型
            assert getOutputShape() == terminalOp.inputShape();
            // 不允许重复消费
            if (linkedOrConsumed) {
                throw new IllegalStateException(MSG_STREAM_LINKED);
            }
            // 设置已消费标识
            linkedOrConsumed = true;
    
            // 使用终端操作并行或串行处理此流管道
            return isParallel()
                    ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    
        /**
         *  获取此阶段的源分割迭代器【数据源】
         *  
         * created by ZXD at 19 Dec 2018 T 22:32:09
         * @param terminalFlags 终端操作标识
         * @return
         */
        @SuppressWarnings("unchecked")
        private Spliterator<?> sourceSpliterator(int terminalFlags) {
            Spliterator<?> spliterator = null;
            // 1)源分割迭代器不为 null
            if (sourceStage.sourceSpliterator != null) {
                // 读取
                spliterator = sourceStage.sourceSpliterator;
                // 使用后置空
                sourceStage.sourceSpliterator = null;
            }
            // 2)分割迭代器通过 sourceSupplier 进行生成
            else if (sourceStage.sourceSupplier != null) {
                spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
                sourceStage.sourceSupplier = null;
            }
            else {
                throw new IllegalStateException(MSG_CONSUMED);
            }
    
            // 此流是并行的 && 流管道中存在有状态操作
            if (isParallel() && sourceStage.sourceAnyStateful) {
                // Adapt the source spliterator, evaluating each stateful op in the pipeline up to and including this pipeline stage.
                // The depth and flags of each pipeline stage are adjusted accordingly.
                int depth = 1;
                /**
                 * 从源阶段开始处理,一直处理到当前阶段为止
                 */
                for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                        u != e;
                        u = p, p = p.nextStage) {
    
                    int thisOpFlags = p.sourceOrOpFlags;
                    // 当前处理阶段是有状态操作
                    if (p.opIsStateful()) {
                        depth = 0;
                        // 当前操作是短路操作
                        if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                            thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                        }
    
                        spliterator = p.opEvaluateParallelLazy(u, spliterator);
    
                        // Inject or clear SIZED on the source pipeline stage based on the stage's spliterator
                        thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                                ? thisOpFlags & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED
                                        : thisOpFlags & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
                    }
                    p.depth = depth++;
                    p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
                }
            }
    
            // 终端操作带有标识位
            if (terminalFlags != 0)  {
                // 将终端操作的标志位合并到最后一阶段中
                combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
            }
    
            return spliterator;
        }
    
        /**
         * @param sink  下游管道操作,中间操作或终端操作
         * @param spliterator   分割迭代器
         * @return
         */
        @Override
        final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
            copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
            return sink;
        }
    
        /**
         * 从当前流管道开始,向前构建流水线直到第一个流管道为止,流水线后置操作为 sink
         */
        @Override
        @SuppressWarnings("unchecked")
        final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
            /**
             * AbstractPipeline.this:当前流管道
             * p.depth:当前流管道距离管道头的距离
             * p.previousStage:前置流管道
             */
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            // 返回链接后的 sink
            return (Sink<P_IN>) sink;
        }
    
        /**
         * 将分割迭代器中的元素顺序发送到流水线中处理
         *
         * @param wrappedSink   链接后的流水线
         * @param spliterator   数据源
         */
        @Override
        final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
            // 1)流水线中不存在短路操作
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                // 通知 wrappedSink 处理元素的个数
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                // 使用 wrappedSink 依次处理 spliterator 中的所有元素
                spliterator.forEachRemaining(wrappedSink);
                // 通知 wrappedSink 元素发送完毕,可以执行后置操作
                wrappedSink.end();
            }
            // 2)流水线中存在短路操作
            else {
                copyIntoWithCancel(wrappedSink, spliterator);
            }
        }
    
    
    • Spliterator 及其特征值
    /**
     *  用于划分和遍历数据源的对象,数据源可以是 array、Collection、IO channel、生成器等。
     */
    public interface Spliterator<T> {
        /**
         *  尝试使用 action 处理 Spliterator 中的一个元素
         */
        boolean tryAdvance(Consumer<? super T> action);
    
        /**
         *  尝试使用 action 一次性处理 Spliterator 中的所有元素
         */
        default void forEachRemaining(Consumer<? super T> action) {
            do { } while (tryAdvance(action));
        }
    
        /**
         *  对此 Spliterator 进行拆分
         */
        Spliterator<T> trySplit();
    
        /**
         *  获取此 Spliterator 的估计元素数,如果数据源是无限的,则返回 -1
         */
        long estimateSize();
    
        /**
         *  尝试获取此 Spliterator 的精确元素个数
         */
        default long getExactSizeIfKnown() {
            return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
        }
    
        /**
         *  读取此 Spliterator 的特征值
         */
        int characteristics();
    
        /**
         *  Spliterator 中的元素是顺序处理的
         */
        public static final int ORDERED    = 0x00000010;
    
        /**
         *  Spliterator 中的元素是唯一的
         */
        public static final int DISTINCT   = 0x00000001;
    
        /**
         *  Spliterator 中的元素根据自然顺序或比较器进行过排序
         */
        public static final int SORTED     = 0x00000004;
    
        /**
         *  Spliterator 中的元素个数是有限的
         */
        public static final int SIZED      = 0x00000040;
    
        /**
         *  Spliterator 中的元素是非 null 的
         */
        public static final int NONNULL    = 0x00000100;
    
        /**
         *  Spliterator 关联的数据源是不可变的,不支持增加、替换、删除等
         */
        public static final int IMMUTABLE  = 0x00000400;
    
        /**
         * Spliterator 关联的数据源支持并发修改
         */
        public static final int CONCURRENT = 0x00001000;
    
        /**
         *  此 Spliterator 通过 trySplit() 方法生成的子 Spliterator 是有限大小的
         */
        public static final int SUBSIZED = 0x00004000;
    }
    
    • 流管道和操作标识
    StreamOpFlag#
        /**
         * 流管道中的元素是唯一的
         */
        // 0, 0x00000001
        // Matches Spliterator.DISTINCT
        DISTINCT(0,
                set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
    
        /**
         *  流管道中的元素是排过序的【有状态操作】
         */
        // 1, 0x00000004
        // Matches Spliterator.SORTED
        SORTED(1,
                set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
    
        /**
         *  流管道中的元素是顺序处理的
         */
        // 2, 0x00000010
        // Matches Spliterator.ORDERED
        ORDERED(2,
                set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP)
                .clear(Type.UPSTREAM_TERMINAL_OP)),
    
        /**
         *  流管道的大小是有限的【非无限流】
         */
        // 3, 0x00000040
        // Matches Spliterator.SIZED
        SIZED(3,
                set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)),
    
        /**
         *  中间操作或终端操作是短路的
         */
        // 12, 0x01000000
        SHORT_CIRCUIT(12,
                set(Type.OP).set(Type.TERMINAL_OP));
    
    
    • 流管道中的元素类型
    enum StreamShape {
        /**
         *  流元素是对象引用
         */
        REFERENCE,
        /**
         *  流元素是 int 值
         */
        INT_VALUE,
        /**
         *  流元素是 long 值
         */
        LONG_VALUE,
        /**
         *  流元素是 double 值
         */
        DOUBLE_VALUE
    }
    

    无状态中间操作

    • filter:使用指定的函数式断言过滤流中的元素
        @Override
        public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
            Objects.requireNonNull(predicate);
            return new StatelessOp<>(this, StreamShape.REFERENCE,
                    StreamOpFlag.NOT_SIZED) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            /**
                             * 根据目标 predicate 对上游管道发送的数据进行过滤,
                             * 只将满足断言的对象发送到下游
                             */
                            if (predicate.test(u)) {
                                downstream.accept(u);
                            }
                        }
                    };
                }
            };
        }
    
    • map:将上游管道发送的数据进行映射处理后,再发送到下游
        /**
         *  基于 mapper 创建一个无状态的流管道,并将其链接到此流管道之后
         */
        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            /**
             * 新流管道的操作为 mapper
             * 新流管道的操作标识为 NOT_SORTED、NOT_DISTINCT
             * 下一阶段的操作为 sink【反向链接】
             */
            return new StatelessOp<>(this, StreamShape.REFERENCE,
                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            /**
                             * 接收上游阶段发送的数据 u,并进行当前阶段的处理,
                             * 并将结果发送给下游阶段处理
                             */
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    
    • flatMap:流的扁平化
        @Override
        public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<>(this, StreamShape.REFERENCE,
                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<>(sink) {
                        // true if cancellationRequested() has been called
                        boolean cancellationRequestedCalled;
    
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            /**
                             * 通过 mapper 函数将上游元素映射成一个新的流,并将流中的元素顺序发送到下游
                             */
                            try (Stream<? extends R> result = mapper.apply(u)) {
                                // 映射结果不为 null 时,将新流中的元素发送到下游
                                if (result != null) {
                                    // 1)下游操作是非短路的
                                    if (!cancellationRequestedCalled) {
                                        result.sequential().forEach(downstream);
                                    }
                                    // 2)下游操作是短路操作,则每次发送元素前都先询问下游是否需要继续接收
                                    else {
                                        final var s = result.sequential().spliterator();
                                        do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
                                    }
                                }
                            }
                        }
    
                        @Override
                        public boolean cancellationRequested() {
                            // If this method is called then an operation within the stream pipeline is short-circuiting (see AbstractPipeline.copyInto).
                            // Note that we cannot differentiate between an upstream or downstream operation
                            cancellationRequestedCalled = true;
                            return downstream.cancellationRequested();
                        }
                    };
                }
            };
        }
    
    • peek:查看上游发送的元素
        @Override
        public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
            Objects.requireNonNull(action);
            return new StatelessOp<>(this, StreamShape.REFERENCE,
                    0) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            /**
                             * 先调用目标接口进行消费,之后再将该元素发送到下游,
                             * 可以查看流的具体处理过程,主要用于调试
                             */
                            action.accept(u);
                            downstream.accept(u);
                        }
                    };
                }
            };
        }
    

    有状态的中间操作

    • distinct:将流中的元素去重
        @Override
        public final Stream<P_OUT> distinct() {
            return DistinctOps.makeRef(this);
        }
    
    DistinctOps#
        static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
            // StreamOpFlag.IS_DISTINCT 新管道产生的元素是唯一的
            return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                          StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
    
                @Override
                Sink<T> opWrapSink(int flags, Sink<T> sink) {
                    Objects.requireNonNull(sink);
                    
                    // 1)如果上游管道已经是 distinct 则此阶段无序任何处理。
                    if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                        return sink;
                    // 2)上游管道是已排序的    
                    } else if (StreamOpFlag.SORTED.isKnown(flags)) {
                        return new Sink.ChainedReference<T, T>(sink) {
                            boolean seenNull;
                            // 最近发送的元素
                            T lastSeen;
    
                            @Override
                            public void begin(long size) {
                                seenNull = false;
                                lastSeen = null;
                                downstream.begin(-1);
                            }
    
                            @Override
                            public void end() {
                                seenNull = false;
                                lastSeen = null;
                                downstream.end();
                            }
    
                            @Override
                            public void accept(T t) {
                                // 1)上游发送的元素为 null
                                if (t == null) {
                                    if (!seenNull) {
                                        seenNull = true;
                                        downstream.accept(lastSeen = null);
                                    }
                                /**
                                 * 2)上游发送的元素不为 null
                                 * lastSeen == null,当前元素是第一个元素
                                 * !t.equals(lastSeen),上次发送的元素和当前元素不一致
                                 */
                                } else if (lastSeen == null || !t.equals(lastSeen)) {
                                    downstream.accept(lastSeen = t);
                                }
                            }
                        };
                    // 3)上游管道是未排序的    
                    } else {
                        return new Sink.ChainedReference<T, T>(sink) {
                            // 存放上游发送的唯一元素
                            Set<T> seen;
    
                            @Override
                            public void begin(long size) {
                                seen = new HashSet<>();
                                downstream.begin(-1);
                            }
    
                            @Override
                            public void end() {
                                seen = null;
                                downstream.end();
                            }
    
                            @Override
                            public void accept(T t) {
                                // 已接受元素中不存在此元素 t
                                if (!seen.contains(t)) {
                                    // 将其加入已发送唯一元素集合
                                    seen.add(t);
                                    // 将此元素发送到下游
                                    downstream.accept(t);
                                }
                            }
                        };
                    }
                }
            };
        }
    
    • sorted:新管道产生的元素是已排序的
        /**
         *  将一个排序的流管道追加到此流管道之后
         */
        @Override
        public final Stream<P_OUT> sorted() {
            return SortedOps.makeRef(this);
        }
    
    SortedOps#
        /**
         *  将一个排序管道追加到 upstream 之后
         */
        static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
            return new OfRef<>(upstream);
        }
    
        /**
         *  用于对引用流进行排序的管道
         */
        private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
            /**
             * 是否是自然顺序
             */
            private final boolean isNaturalSort;
            /**
             * 排序使用的比较器
             */
            private final Comparator<? super T> comparator;
    
            /**
             * 使用自然顺序排序
             */
            OfRef(AbstractPipeline<?, T, ?> upstream) {
                super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
                this.isNaturalSort = true;
                final Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
                this.comparator = comp;
            }
    
            /**
             * 使用指定的比较器排序
             */
            OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
                super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
                this.isNaturalSort = false;
                this.comparator = Objects.requireNonNull(comparator);
            }
    
            @Override
            public Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);
    
                // 1)如果上游管道是已排序的,并且是按照自然顺序排序的,则此流管道可以忽略
                if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
                    return sink;
                    // 2)如果上游管道是已知大小的
                } else if (StreamOpFlag.SIZED.isKnown(flags)) {
                    return new SizedRefSortingSink<>(sink, comparator);
                    // 3)如果上游管道是未知大小的
                } else {
                    return new RefSortingSink<>(sink, comparator);
                }
            }
        }
    
    • skip:忽略上游管道发送的前 n 个元素
        @Override
        public final Stream<P_OUT> skip(long n) {
            if (n < 0) {
                throw new IllegalArgumentException(Long.toString(n));
            }
            if (n == 0) {
                return this;
            } else {
                return SliceOps.makeRef(this, n, -1);
            }
        }
    
    SliceOps#
        /**
         * @param upstream 上游管道
         * @param skip 需要跳过的元素个数
         * @param limit 限制接受的元素个数
         */
        public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                long skip, long limit) {
            if (skip < 0) {
                throw new IllegalArgumentException("Skip must be non-negative: " + skip);
            }
    
            return new ReferencePipeline.StatefulOp<>(upstream, StreamShape.REFERENCE,
                    flags(limit)) {
    
                @Override
                Sink<T> opWrapSink(int flags, Sink<T> sink) {
                    return new Sink.ChainedReference<>(sink) {
                        // 需要跳过的前 n 个元素
                        long n = skip;
                        // 只需要获取 m 个元素
                        long m = limit >= 0 ? limit : Long.MAX_VALUE;
    
                        @Override
                        public void begin(long size) {
                            downstream.begin(calcSize(size, skip, m));
                        }
    
                        @Override
                        public void accept(T t) {
                            // 已经不需要跳过元素
                            if (n == 0) {
                                // 下游需要接受的元素个数 > 0
                                if (m > 0) {
                                    // 递减接收个数
                                    m--;
                                    // 将当前元素发送给下游管道
                                    downstream.accept(t);
                                }
                            }
                            // 跳过当前元素,并递减跳过数
                            else {
                                n--;
                            }
                        }
    
                        @Override
                        public boolean cancellationRequested() {
                            // m == 0 表示此管道将不会发送元素到下游 || 下游拒绝接收元素
                            return m == 0 || downstream.cancellationRequested();
                        }
                    };
                }
            };
        }
    
    • limit:只接受上游管道发送的前 maxSize 个元素
        @Override
        public final Stream<P_OUT> limit(long maxSize) {
            if (maxSize < 0) {
                throw new IllegalArgumentException(Long.toString(maxSize));
            }
            return SliceOps.makeRef(this, 0, maxSize);
        }
    

    非短路的终端操作

    • forEach:使用函数式接口 action 消费流水线生产的所有元素
        @Override
        public void forEach(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, false));
        }
    
    ForEachOps#
        /**
         *  创建一个 TerminalOp,遍历并处理流中的每个引用对象
         */
        public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                boolean ordered) {
            Objects.requireNonNull(action);
            return new ForEachOp.OfRef<>(action, ordered);
        }
    
        abstract static class ForEachOp<T>
        implements TerminalOp<T, Void>, TerminalSink<T, Void> {
            // 遍历是否是有序的
            private final boolean ordered;
    
            protected ForEachOp(boolean ordered) {
                this.ordered = ordered;
            }
    
            // 获取此操作的操作标识
            @Override
            public int getOpFlags() {
                return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
            }
    
            /**
             *  将此终端操作链接到流管道尾部,并将 spliterator 中的每个元素都发送到 sink 中
             */
            @Override
            public <S> Void evaluateSequential(PipelineHelper<T> helper,
                    Spliterator<S> spliterator) {
                return helper.wrapAndCopyInto(this, spliterator).get();
            }
    
            static final class OfRef<T> extends ForEachOp<T> {
                /**
                 * 实际消费者
                 */
                final Consumer<? super T> consumer;
    
                OfRef(Consumer<? super T> consumer, boolean ordered) {
                    super(ordered);
                    this.consumer = consumer;
                }
    
                /**
                 *  处理上游发送的单个元素
                 */
                @Override
                public void accept(T t) {
                    consumer.accept(t);
                }
            }
        }
    
    • forEachOrdered:使用函数式接口 action 顺序消费流水线生产的所有元素
        @Override
        public void forEachOrdered(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, true));
        }
    

    短路的终端操作

    • anyMatch:上游管道发送的元素中至少有一个满足函数式断言 predicate 时返回 true
        @Override
        public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
            return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
        }
    
    MatchOps#
        enum MatchKind {
            /** 是否所有的元素都满足指定的断言 */
            ANY(true, true),
    
            /** 是否至少有一个元素满足指定的断言 */
            ALL(false, false),
    
            /** 是否所有的元素都不满足指定的断言 */
            NONE(true, false);
    
            /**
             * 是否需要在满足断言时停止接收上游元素
             */
            private final boolean stopOnPredicateMatches;
            /**
             * 操作被短路时的返回结果
             */
            private final boolean shortCircuitResult;
    
            private MatchKind(boolean stopOnPredicateMatches,
                              boolean shortCircuitResult) {
                this.stopOnPredicateMatches = stopOnPredicateMatches;
                this.shortCircuitResult = shortCircuitResult;
            }
        }
    
        public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
                MatchKind matchKind) {
            Objects.requireNonNull(predicate);
            Objects.requireNonNull(matchKind);
            class MatchSink extends BooleanTerminalSink<T> {
                MatchSink() {
                    super(matchKind);
                }
    
                @Override
                public void accept(T t) {
                    /**
                     * 当前管道还能继续接收元素 && 当前元素匹配停止条件
                     */
                    if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
                        // 停止接收元素
                        stop = true;
                        // 写入结果值
                        value = matchKind.shortCircuitResult;
                    }
                }
            }
    
            return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);
        }
    
        private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
            /**
             * 上游发送的元素类型
             */
            private final StreamShape inputShape;
            /**
             * 匹配类型
             */
            final MatchKind matchKind;
            /**
             * sink 生成器
             */
            final Supplier<BooleanTerminalSink<T>> sinkSupplier;
    
            MatchOp(StreamShape shape,
                    MatchKind matchKind,
                    Supplier<BooleanTerminalSink<T>> sinkSupplier) {
                this.inputShape = shape;
                this.matchKind = matchKind;
                this.sinkSupplier = sinkSupplier;
            }
    
            @Override
            public int getOpFlags() {
                // 当前管道是短路的 && 未排序的
                return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
            }
    
            @Override
            public StreamShape inputShape() {
                return inputShape;
            }
    
            @Override
            public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
                                                  Spliterator<S> spliterator) {
                return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
            }
        }
    
        /**
         * 避免返回值的装箱而定义的 BooleanTerminalSink
         */
        private abstract static class BooleanTerminalSink<T> implements Sink<T> {
            /**
             * 是否停止接收
             */
            boolean stop;
            /**
             * 返回结果值
             */
            boolean value;
    
            BooleanTerminalSink(MatchKind matchKind) {
                value = !matchKind.shortCircuitResult;
            }
    
            /**
             * 情况状态并返回结果值
             */
            public boolean getAndClearState() {
                return value;
            }
    
            /**
             * 是否停止接收上游元素
             */
            @Override
            public boolean cancellationRequested() {
                return stop;
            }
        }
    
    • allMatch:上游管道发送的所有元素都满足函数式断言 predicate 时返回 true
        @Override
        public final boolean allMatch(Predicate<? super P_OUT> predicate) {
            return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
        }
    
    • noneMatch:上游管道发送的元素没有一个满足函数式断言 predicate 时返回 true
        @Override
        public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
            return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
        }
    
    • findFirst:获取上游管道发送的第一个元素
        @Override
        public final Optional<P_OUT> findFirst() {
            return evaluate(FindOps.makeRef(true));
        }
    
    FindOps#
        /**
         * @param mustFindFirst 是否必须是第一个元素
         */
        @SuppressWarnings("unchecked")
        public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
            return (TerminalOp<T, Optional<T>>)
                    (mustFindFirst ? FindSink.OfRef.OP_FIND_FIRST : FindSink.OfRef.OP_FIND_ANY);
        }
    
       private abstract static class FindSink<T, O> implements TerminalSink<T, O> {
            /**
             * 是否已经找到值了
             */
            boolean hasValue;
            /**
             * 结果值
             */
            T value;
    
            FindSink() {} // Avoid creation of special accessor
    
            @Override
            public void accept(T value) {
                // 当前元素是第一个元素
                if (!hasValue) {
                    // 已经找到值
                    hasValue = true;
                    // 写入目标值
                    this.value = value;
                }
            }
    
            /**
             * 一旦找到值,就拒绝接收上游元素
             */
            @Override
            public boolean cancellationRequested() {
                return hasValue;
            }
    
            static final class OfRef<T> extends FindSink<T, Optional<T>> {
                /**
                 * 获取结果值
                 */
                @Override
                public Optional<T> get() {
                    return hasValue ? Optional.of(value) : null;
                }
    
                static final TerminalOp<?, ?> OP_FIND_FIRST = new FindOp<>(true,
                        StreamShape.REFERENCE, Optional.empty(),
                        Optional::isPresent, FindSink.OfRef::new);
    
                static final TerminalOp<?, ?> OP_FIND_ANY = new FindOp<>(false,
                        StreamShape.REFERENCE, Optional.empty(),
                        Optional::isPresent, FindSink.OfRef::new);
            }
        }
    
        private static final class FindOp<T, O> implements TerminalOp<T, O> {
            /**
             * 上游发送的元素类型
             */
            private final StreamShape shape;
            /**
             * 此操作的标识
             */
            final int opFlags;
            /**
             * 未找到值时的返回值
             */
            final O emptyValue;
            /**
             * 查找断言
             */
            final Predicate<O> presentPredicate;
            /**
             * sink 生成器
             */
            final Supplier<TerminalSink<T, O>> sinkSupplier;
    
            FindOp(boolean mustFindFirst,
                    StreamShape shape,
                    O emptyValue,
                    Predicate<O> presentPredicate,
                    Supplier<TerminalSink<T, O>> sinkSupplier) {
                this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
                this.shape = shape;
                this.emptyValue = emptyValue;
                this.presentPredicate = presentPredicate;
                this.sinkSupplier = sinkSupplier;
            }
    
            @Override
            public int getOpFlags() {
                return opFlags;
            }
    
            @Override
            public StreamShape inputShape() {
                return shape;
            }
    
            @Override
            public <S> O evaluateSequential(PipelineHelper<T> helper,
                    Spliterator<S> spliterator) {
                // 使用 sink 顺序评估流水线产生的元素,并返回查找结果
                final O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
                // 找到目标值,则返回它;否则返回 emptyValue
                return result != null ? result : emptyValue;
            }
        }
    
    • findAny:获取上游管道发送的任意一个元素【串行流取的是第一个元素】
        @Override
        public final Optional<P_OUT> findAny() {
            return evaluate(FindOps.makeRef(false));
        }
    

    并行流处理

    通过调用终端 sink 的 evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) 
    方法完成并行计算任务的创建和执行。
    
    • forEach()
        static final class ForEachTask<S, T> extends CountedCompleter<Void> {
            /**
             * 分割迭代器
             */
            private Spliterator<S> spliterator;
            /**
             * 终端操作
             */
            private final Sink<S> sink;
            /**
             * 流水线
             */
            private final PipelineHelper<T> helper;
            /**
             * 当前任务处理的元素个数
             */
            private long targetSize;
    
            ForEachTask(PipelineHelper<T> helper,
                    Spliterator<S> spliterator,
                    Sink<S> sink) {
                super(null);
                this.sink = sink;
                this.helper = helper;
                this.spliterator = spliterator;
                this.targetSize = 0L;
            }
    
            ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
                super(parent);
                this.spliterator = spliterator;
                sink = parent.sink;
                this.targetSize = parent.targetSize;
                this.helper = parent.helper;
            }
    
            // Similar to AbstractTask but doesn't need to track child tasks
            @Override
            public void compute() {
                // 读取 spliterator
                Spliterator<S> rightSplit = spliterator, leftSplit;
                // 读取 spliterator 的估计总元素个数
                long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
                // 计算当前 Task 处理的元素个数
                if ((sizeThreshold = targetSize) == 0L) {
                    targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
                }
                // 当前计算是否是短路的
                final boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
                boolean forkRight = false;
                // 读取终端操作
                final Sink<S> taskSink = sink;
                ForEachTask<S, T> task = this;
                // 如果是非短路 || 操作未取消
                while (!isShortCircuit || !taskSink.cancellationRequested()) {
                    // 当前 spliterator 的总元素个数 <= 阈值 ||   
                    if (sizeEstimate <= sizeThreshold || 目标 spliterator 无法再分割
                            (leftSplit = rightSplit.trySplit()) == null) {
                        // 处理此任务
                        task.helper.copyInto(taskSink, rightSplit);
                        break;
                    }
                    // 创建子任务
                    final ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                    // 设置未完成计数值为 1【task 生成了一个子任务 leftTask】
                    task.addToPendingCount(1);
                    ForEachTask<S, T> taskToFork;
                    if (forkRight) {
                        forkRight = false;
                        rightSplit = leftSplit;
                        taskToFork = task;
                        task = leftTask;
                    }
                    else {
                        forkRight = true;
                        taskToFork = leftTask;
                    }
                    // 将其中一个任务 fork 进线程池并行处理
                    taskToFork.fork();
                    // 计算新 spliterator 的估计总元素个数
                    sizeEstimate = rightSplit.estimateSize();
                }
                task.spliterator = null;
                // 尝试传播任务完成信号
                task.propagateCompletion();
            }
        }
    
    • findFirst()、findAny()
    @SuppressWarnings("serial")
    abstract class AbstractTask<P_IN, P_OUT, R,
    K extends AbstractTask<P_IN, P_OUT, R, K>>
    extends CountedCompleter<R> {
    
        private static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
    
        /** 流水线,一个计算中的所有任务都相同 */
        protected final PipelineHelper<P_OUT> helper;
    
        /**
         * 迭代分割器【数据源】
         */
        protected Spliterator<P_IN> spliterator;
    
        /** 目标元素数 */
        protected long targetSize; // may be lazily initialized
    
        /**
         * 左孩子节点,如果不为 null,则 rightChild 也不为 null
         */
        protected K leftChild;
    
        /**
         * 左孩子节点,如果不为 null,则 leftChild 也不为 null
         */
        protected K rightChild;
    
        /** 当前任务节点完成时的计算结果 */
        private R localResult;
    
        /**
         * 根节点的构造函数
         */
        protected AbstractTask(PipelineHelper<P_OUT> helper,
                Spliterator<P_IN> spliterator) {
            super(null);
            this.helper = helper;
            this.spliterator = spliterator;
            this.targetSize = 0L;
        }
    
        /**
         * 非根节点的构造函数
         */
        protected AbstractTask(K parent,
                Spliterator<P_IN> spliterator) {
            super(parent);
            this.spliterator = spliterator;
            this.helper = parent.helper;
            this.targetSize = parent.targetSize;
        }
    
        /**
         * 并行分解时,叶子任务处理的目标元素个数
         * 通用 ForkJoinPool 或指定 ForkJoinPool 的并行度 * 4
         */
        public static int getLeafTarget() {
            final Thread t = Thread.currentThread();
            if (t instanceof ForkJoinWorkerThread) {
                return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
            }
            else {
                return LEAF_TARGET;
            }
        }
    
        /**
         * 创建一个子节点
         */
        protected abstract K makeChild(Spliterator<P_IN> spliterator);
    
        /**
         * 执行叶子节点的计算任务【此任务已经无法再分割】
         */
        protected abstract R doLeaf();
    
        /**
         * 计算叶子任务建议处理的元素个数
         */
        public static long suggestTargetSize(long sizeEstimate) {
            final long est = sizeEstimate / getLeafTarget();
            return est > 0L ? est : 1L;
        }
    
        /**
         * 此任务处理的目标元素数
         */
        protected final long getTargetSize(long sizeEstimate) {
            long s;
            return (s = targetSize) != 0 ? s :
                (targetSize = suggestTargetSize(sizeEstimate));
        }
    
        /**
         * 此任务的原始结果
         */
        @Override
        public R getRawResult() {
            return localResult;
        }
    
        @Override
        protected void setRawResult(R result) {
            if (result != null) {
                throw new IllegalStateException();
            }
        }
    
        /**
         * 读取任务执行结果
         */
        protected R getLocalResult() {
            return localResult;
        }
    
        /**
         * 设置任务执行结果
         */
        protected void setLocalResult(R localResult) {
            this.localResult = localResult;
        }
    
        /**
         * 此任务是否是叶子任务
         */
        protected boolean isLeaf() {
            return leftChild == null;
        }
    
        /**
         * 此任务是否是根任务
         */
        protected boolean isRoot() {
            return getParent() == null;
        }
    
        /**
         * 读取父任务
         */
        @SuppressWarnings("unchecked")
        protected K getParent() {
            return (K) getCompleter();
        }
    
        /**
         * 分割并处理任务
         */
        @Override
        public void compute() {
            Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
            // 读取分割迭代器的估计总元素个数
            long sizeEstimate = rs.estimateSize();
            // 读取单个任务的元素上限
            final long sizeThreshold = getTargetSize(sizeEstimate);
            boolean forkRight = false;
            @SuppressWarnings("unchecked") K task = (K) this;
            // 估计总元素数 > 阈值 && 尝试对 spliterator 进行二分
            while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
                K leftChild, rightChild, taskToFork;
                // 基于leftSpliterator 创建子任务并写入 leftChild
                task.leftChild  = leftChild = task.makeChild(ls);
                // 基于 rightSpliterator 创建子任务并写入 leftChild
                task.rightChild = rightChild = task.makeChild(rs);
                // 设置 task 的待完成任务数为 1【将会有一个子任务被 fork 进线程池中并行处理】
                task.setPendingCount(1);
                // 是否 forkRight【第一次 forkRight,接着 forkLeft,轮流交替】
                if (forkRight) {
                    forkRight = false;
                    // 更新待分割的 spliterator
                    rs = ls;
                    // 待处理的任务
                    task = leftChild;
                    // 待 fork 进线程池并行处理的任务
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    // 待处理的任务
                    task = rightChild;
                    // 待 fork 进线程池并行处理的任务
                    taskToFork = leftChild;
                }
                taskToFork.fork();
                // 读取待分割 spliterator 的估计总元素个数
                sizeEstimate = rs.estimateSize();
            }
            // 此任务已经是叶子任务,则执行计算逻辑
            task.setLocalResult(task.doLeaf());
            // 此任务计算完毕后尝试完成主任务
            task.tryComplete();
        }
    
        /**
         * 清空 spliterator、leftChild、rightChild
         */
        @Override
        public void onCompletion(CountedCompleter<?> caller) {
            spliterator = null;
            leftChild = rightChild = null;
        }
    
        /**
         * 当前任务是否是最左侧节点【findFirst】
         */
        protected boolean isLeftmostNode() {
            @SuppressWarnings("unchecked")
            K node = (K) this;
            while (node != null) {
                final K parent = node.getParent();
                if (parent != null && parent.leftChild != node) {
                    return false;
                }
                node = parent;
            }
            return true;
        }
    }
    
    /**
     * 可短路的计算
     */
    @SuppressWarnings("serial")
    abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
    K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>
    extends AbstractTask<P_IN, P_OUT, R, K> {
        /**
         * 在所有计算任务中共享的结果,只设置一次
         */
        protected final AtomicReference<R> sharedResult;
    
        /**
         * 此任务是否被取消
         */
        protected volatile boolean canceled;
    
        /**
         * 创建根任务
         */
        protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
                Spliterator<P_IN> spliterator) {
            super(helper, spliterator);
            sharedResult = new AtomicReference<>(null);
        }
    
        /**
         * 创建子任务
         */
        protected AbstractShortCircuitTask(K parent,
                Spliterator<P_IN> spliterator) {
            super(parent, spliterator);
            // 共享同一个引用对象
            sharedResult = parent.sharedResult;
        }
    
        /**
         * 无计算结果
         */
        protected abstract R getEmptyResult();
    
        @Override
        public void compute() {
            Spliterator<P_IN> rs = spliterator, ls;
            long sizeEstimate = rs.estimateSize();
            final long sizeThreshold = getTargetSize(sizeEstimate);
            boolean forkRight = false;
            @SuppressWarnings("unchecked") K task = (K) this;
            final AtomicReference<R> sr = sharedResult;
            R result;
            // 如果计算结果为空【没有一个任务完成计算】
            while ((result = sr.get()) == null) {
                // 此任务是否已经被取消
                if (task.taskCanceled()) {
                    result = task.getEmptyResult();
                    break;
                }
                // 此 spliterator 的估计总元素个数 < 阈值 || spliterator 无法再进行分割
                if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
                    // 执行叶子计算任务
                    result = task.doLeaf();
                    break;
                }
                K leftChild, rightChild, taskToFork;
                // 创建左子节点
                task.leftChild  = leftChild = task.makeChild(ls);
                // 创建右子节点
                task.rightChild = rightChild = task.makeChild(rs);
                task.setPendingCount(1);
                if (forkRight) {
                    forkRight = false;
                    rs = ls;
                    task = leftChild;
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    task = rightChild;
                    taskToFork = leftChild;
                }
                // 将其中一个任务 fork 进线程池并行计算
                taskToFork.fork();
                // 计算新 spliterator 的估计总元素个数
                sizeEstimate = rs.estimateSize();
            }
            // 此任务计算完成,写入结果
            task.setLocalResult(result);
            // 尝试完成根任务
            task.tryComplete();
        }
    
    
        /**
         * 写入短路结果,以使其他计算任务快速终止
         */
        protected void shortCircuit(R result) {
            if (result != null) {
                sharedResult.compareAndSet(null, result);
            }
        }
    
        /**
         * 写入此任务的计算结果
         */
        @Override
        protected void setLocalResult(R localResult) {
            if (isRoot()) {
                if (localResult != null) {
                    sharedResult.compareAndSet(null, localResult);
                }
            } else {
                super.setLocalResult(localResult);
            }
        }
    
        /**
         * 读取此任务的计算结果
         */
        @Override
        public R getRawResult() {
            return getLocalResult();
        }
    
        /**
         * 读取此任务的计算结果
         */
        @Override
        public R getLocalResult() {
            if (isRoot()) {
                final R answer = sharedResult.get();
                return answer == null ? getEmptyResult() : answer;
            } else {
                return super.getLocalResult();
            }
        }
    
        /**
         * 取消此任务
         */
        protected void cancel() {
            canceled = true;
        }
    
        /**
         * 只要有一个父任务已经取消,则此任务取消
         */
        protected boolean taskCanceled() {
            boolean cancel = canceled;
            if (!cancel) {
                for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent()) {
                    cancel = parent.canceled;
                }
            }
    
            return cancel;
        }
    
        protected void cancelLaterNodes() {
            // Go up the tree, cancel right siblings of this node and all parents
            for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this;
                    parent != null;
                    node = parent, parent = parent.getParent()) {
                // If node is a left child of parent, then has a right sibling
                if (parent.leftChild == node) {
                    final K rightSibling = parent.rightChild;
                    // 取消所有的右侧子任务
                    if (!rightSibling.canceled) {
                        rightSibling.cancel();
                    }
                }
            }
        }
    }
    
    FindOps#
        @SuppressWarnings("serial")
        private static final class FindTask<P_IN, P_OUT, O>
        extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
            /**
             * 终端操作
             */
            private final FindOp<P_OUT, O> op;
            /**
             * 是否是 findFirst
             */
            private final boolean mustFindFirst;
    
            FindTask(FindOp<P_OUT, O> op,
                    boolean mustFindFirst,
                    PipelineHelper<P_OUT> helper,
                    Spliterator<P_IN> spliterator) {
                super(helper, spliterator);
                this.mustFindFirst = mustFindFirst;
                this.op = op;
            }
    
            FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
                super(parent, spliterator);
                this.mustFindFirst = parent.mustFindFirst;
                this.op = parent.op;
            }
    
            @Override
            protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
                return new FindTask<>(this, spliterator);
            }
    
            @Override
            protected O getEmptyResult() {
                return op.emptyValue;
            }
    
            private void foundResult(O answer) {
                // 1)如果此任务是最左侧节点,则尝试写入计算结果
                if (isLeftmostNode()) {
                    shortCircuit(answer);
                // 2)取消后继的计算节点    
                } else {
                    cancelLaterNodes();
                }
            }
    
            @Override
            protected O doLeaf() {
                // 执行计算任务
                final O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
                if (!mustFindFirst) {
                    if (result != null) {
                        shortCircuit(result);
                    }
                    return null;
                }
                else {
                    if (result != null) {
                        foundResult(result);
                        return result;
                    } else {
                        return null;
                    }
                }
            }
    
            @Override
            public void onCompletion(CountedCompleter<?> caller) {
                if (mustFindFirst) {
                    for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
                            p = child, child = rightChild) {
                        final O result = child.getLocalResult();
                        if (result != null && op.presentPredicate.test(result)) {
                            setLocalResult(result);
                            foundResult(result);
                            break;
                        }
                    }
                }
                super.onCompletion(caller);
            }
        }
    }
    
    • anyMatch()、allMatch()、noneMatch()
    @SuppressWarnings("serial")
        private static final class MatchTask<P_IN, P_OUT>
        extends AbstractShortCircuitTask<P_IN, P_OUT, Boolean, MatchTask<P_IN, P_OUT>> {
            /**
             * 终端操作
             */
            private final MatchOp<P_OUT> op;
    
            /**
             * 创建根节点
             */
            MatchTask(MatchOp<P_OUT> op, PipelineHelper<P_OUT> helper,
                    Spliterator<P_IN> spliterator) {
                super(helper, spliterator);
                this.op = op;
            }
    
            /**
             * 创建子节点
             */
            MatchTask(MatchTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
                super(parent, spliterator);
                this.op = parent.op;
            }
    
            @Override
            protected MatchTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
                return new MatchTask<>(this, spliterator);
            }
    
            @Override
            protected Boolean doLeaf() {
                // 执行计算并读取结果
                final boolean b = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).getAndClearState();
                // 如果匹配短路结果
                if (b == op.matchKind.shortCircuitResult) {
                    // 完成计算并短路
                    shortCircuit(b);
                }
                return null;
            }
    
            @Override
            protected Boolean getEmptyResult() {
                return !op.matchKind.shortCircuitResult;
            }
        }
    
  • 相关阅读:
    CSS中position小解
    position
    mac默认安装postgresql, 如何让postgresql可以远程访问
    The data directory was initialized by PostgreSQL version 9.6, which is not compatible with this version 10.0.
    active admin gem error
    psql 无法添加超级用户
    ubuntu 15.04 安装Balsamiq Mockups 3
    Rails html 写public里图片的路径
    rails c 历史命令
    undefined local variable or method `per' for []:ActiveRecord::Relation
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10152663.html
Copyright © 2011-2022 走看看