  • JAVA8学习——Stream底层的实现二(学习过程)



        public static <T> Spliterator<T> spliterator(Collection<? extends T> c,
                                                     int characteristics) {
            return new IteratorSpliterator<>(Objects.requireNonNull(c),


            static final int BATCH_UNIT = 1 << 10;  // batch array size increment
            static final int MAX_BATCH = 1 << 25;  // max batch array size;
            private final Collection<? extends T> collection; // null OK
            private Iterator<? extends T> it;
            private final int characteristics;
            private long est;             // size estimate
            private int batch;            // batch size for splits
             * Creates a spliterator using the given given
             * collection's {@link java.util.Collection#iterator()) for traversal,
             * and reporting its {@link java.util.Collection#size()) as its initial
             * size.
             * @param c the collection
             * @param characteristics properties of this spliterator's
             *        source or elements.
            public IteratorSpliterator(Collection<? extends T> collection, int characteristics) {
                this.collection = collection;
                this.it = null;
                this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
                                       ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
                                       : characteristics;
             * Creates a spliterator using the given iterator
             * for traversal, and reporting the given initial size
             * and characteristics.
             * @param iterator the iterator for the source
             * @param size the number of elements in the source
             * @param characteristics properties of this spliterator's
             * source or elements.
            public IteratorSpliterator(Iterator<? extends T> iterator, long size, int characteristics) {
                this.collection = null;
                this.it = iterator;
                this.est = size;
                this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
                                       ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
                                       : characteristics;
             * Creates a spliterator using the given iterator
             * for traversal, and reporting the given initial size
             * and characteristics.
             * @param iterator the iterator for the source
             * @param characteristics properties of this spliterator's
             * source or elements.
            public IteratorSpliterator(Iterator<? extends T> iterator, int characteristics) {
                this.collection = null;
                this.it = iterator;
                this.est = Long.MAX_VALUE;
                this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
            public Spliterator<T> trySplit() {
                 * Split into arrays of arithmetically increasing batch
                 * sizes.  This will only improve parallel performance if
                 * per-element Consumer actions are more costly than
                 * transferring them into an array.  The use of an
                 * arithmetic progression in split sizes provides overhead
                 * vs parallelism bounds that do not particularly favor or
                 * penalize cases of lightweight vs heavyweight element
                 * operations, across combinations of #elements vs #cores,
                 * whether or not either are known.  We generate
                 * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
                 * potential speedup.
                Iterator<? extends T> i;
                long s;
                if ((i = it) == null) {
                    i = it = collection.iterator();
                    s = est = (long) collection.size();
                    s = est;
                if (s > 1 && i.hasNext()) {
                    int n = batch + BATCH_UNIT;
                    if (n > s)
                        n = (int) s;
                    if (n > MAX_BATCH)
                        n = MAX_BATCH;
                    Object[] a = new Object[n];
                    int j = 0;
                    do { a[j] = i.next(); } while (++j < n && i.hasNext());
                    batch = j;
                    if (est != Long.MAX_VALUE)
                        est -= j;
                    return new ArraySpliterator<>(a, 0, j, characteristics);
                return null;
            public void forEachRemaining(Consumer<? super T> action) {
                if (action == null) throw new NullPointerException();
                Iterator<? extends T> i;
                if ((i = it) == null) {
                    i = it = collection.iterator();
                    est = (long)collection.size();
            public boolean tryAdvance(Consumer<? super T> action) {
                if (action == null) throw new NullPointerException();
                if (it == null) {
                    it = collection.iterator();
                    est = (long) collection.size();
                if (it.hasNext()) {
                    return true;
                return false;
            public long estimateSize() {
                if (it == null) {
                    it = collection.iterator();
                    return est = (long)collection.size();
                return est;
            public int characteristics() { return characteristics; }
            public Comparator<? super T> getComparator() {
                if (hasCharacteristics(Spliterator.SORTED))
                    return null;
                throw new IllegalStateException();

    提供给了 几种构造方法。

    就直接返回了IteratorSpliterator 对象


        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
         * Creates a new sequential or parallel {@code Stream} from a
         * {@code Spliterator}.
         * <p>The spliterator is only traversed, split, or queried for estimated
         * size after the terminal operation of the stream pipeline commences.
         * <p>It is strongly recommended the spliterator report a characteristic of
         * {@code IMMUTABLE} or {@code CONCURRENT}, or be
         * <a href="../Spliterator.html#binding">late-binding</a>.  Otherwise,
         * {@link #stream(java.util.function.Supplier, int, boolean)} should be used
         * to reduce the scope of potential interference with the source.  See
         * <a href="package-summary.html#NonInterference">Non-Interference</a> for
         * more details.
         * @param <T> the type of stream elements
         * @param spliterator a {@code Spliterator} describing the stream elements
         * @param parallel if {@code true} then the returned stream is a parallel
         *        stream; if {@code false} the returned stream is a sequential
         *        stream.
         * @return a new sequential or parallel {@code Stream}
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            return new ReferencePipeline.Head<>(spliterator,

    这里又出现了一个 ReferencePipeline.Head<>()


    ReferencePipeline extends-> AbstractPipeline






    很多人说 工作几年之后就剩下增删改查的东西了。 其他底层的东西都没有去留住。





     * Abstract base class for an intermediate pipeline stage or pipeline source
     * stage implementing whose elements are of type {@code U}.
        抽象的基础类, 	管道阶段 或者 管道源阶段的   统一成一个ReferencePipeline
     * @param <P_IN> type of elements in the upstream source
     * @param <P_OUT> type of elements in produced by this stage
     * @since 1.8
    abstract class ReferencePipeline<P_IN, P_OUT>
            extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
            implements Stream<P_OUT>  {
    //继承了AbstractPipeline  非常重要。
    //实现了 Stream 接口
        ReferencePipeline(Supplier<? extends Spliterator<?>> source,
                          int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
    第一个构造方法,一定是构造 源,阶段



    这个类是为了处理 源阶段 和中间阶段,的区别。




         * Source stage of a ReferencePipeline.
         * @param <E_IN> type of elements in the upstream source
         * @param <E_OUT> type of elements in produced by this stage
         * @since 1.8
        static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {


    ReferencePipeline的父类,最底层的实现类 (先来看Doc)

     * Abstract base class for "pipeline" classes, which are the core
     * implementations of the Stream interface and its primitive specializations.
     * Manages construction and evaluation of stream pipelines.
     * <p>An {@code AbstractPipeline} represents an initial portion of a stream
     * pipeline, encapsulating a stream source and zero or more intermediate
     * operations.  The individual {@code AbstractPipeline} objects are often
     * referred to as <em>stages</em>, where each stage describes either the stream
     * source or an intermediate operation.
     一个AbstractPipeline 表示了 初始的部分。
     封装了一个源的 0个或者多个中间操作。
     每一个单个的AbstractPipeline对象,通常被叫做 “阶段”、
     * <p>A concrete intermediate stage is generally built from an
     * {@code AbstractPipeline}, a shape-specific pipeline class which extends it
     * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
     * concrete class which extends that.  {@code AbstractPipeline} contains most of
     * the mechanics of evaluating the pipeline, and implements methods that will be
     * used by the operation; the shape-specific classes add helper methods for
     * dealing with collection of results into the appropriate shape-specific
     * containers.
     如 IntPipeline 。。等等、
     AbstractPipeline包含了大量的 特换的方法,
     * <p>After chaining a new intermediate operation, or executing a terminal
     * operation, the stream is considered to be consumed, and no more intermediate
     * or terminal operations are permitted on this stream instance.
     当链接一个新的中间操作。 在一个中间操作 或者 终止操作 之后。这个流就会被认为:被消费掉了。
     * @implNote
     * <p>For sequential streams, and parallel streams without
     * <a href="package-summary.html#StreamOps">stateful intermediate
     * operations</a>, parallel streams, pipeline evaluation is done in a single
     * pass that "jams" all the operations together.  For parallel streams with
     * stateful operations, execution is divided into segments, where each
     * stateful operations marks the end of a segment, and each segment is
     * evaluated separately and the result used as the input to the next
     * segment.  In all cases, the source data is not consumed until a terminal
     * operation begins.
     对于串行流  以及没有状态的中间操作的 并行流。  管道的计算是在单个的一次的操作当中完成的。
    所以存在 “短路”操作。
    对于有状态的并行流。会被分为多个有标识的 段。
     * @param <E_IN>  type of input elements
     * @param <E_OUT> type of output elements
     * @param <S> type of the subclass implementing {@code BaseStream}
     * @since 1.8
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    构造方法 (最底层的构造)

         * Constructor for the head of a stream pipeline.
         * @param source {@code Supplier<Spliterator>} describing the stream source
         * @param sourceFlags The source flags for the stream source, described in
         * {@link StreamOpFlag}
         * @param parallel True if the pipeline is parallel
        AbstractPipeline(Supplier<? extends Spliterator<?>> source,
                         int sourceFlags, boolean parallel) {
            this.previousStage = null;
            this.sourceSupplier = source;
            this.sourceStage = this;
            this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
            // The following is an optimization of:
            // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
            this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
            this.depth = 0;
            this.parallel = parallel;


    3. forEach()方法


         * Performs an action for each element of this stream.
         * <p>This is a <a href="package-summary.html#StreamOps">terminal
         * operation</a>.
         * <p>The behavior of this operation is explicitly nondeterministic.
         * For parallel stream pipelines, this operation does <em>not</em>
         * guarantee to respect the encounter order of the stream, as doing so
         * would sacrifice the benefit of parallelism.  For any given element, the
         * action may be performed at whatever time and in whatever thread the
         * library chooses.  If the action accesses shared state, it is
         * responsible for providing the required synchronization.
         对于并行流管道来说,这个操作并不会保证 流中元素的顺序。
         * @param action a <a href="package-summary.html#NonInterference">
         *               non-interfering</a> action to perform on the elements
        void forEach(Consumer<? super T> action);


    1. 在Head中

    (被源操作执行的时候,默认调用 Head里面的实现)

            // Optimized sequential terminal operations for the head of the pipeline
            public void forEach(Consumer<? super E_OUT> action) {
                if (!isParallel()) {
                else {

    2. 在ReferencePipeline中。


        // Terminal operations from Stream
        public void forEach(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, false));


            // Optimized sequential terminal operations for the head of the pipeline
            public void forEach(Consumer<? super E_OUT> action) {
                if (!isParallel()) {
                else {

    forEach 调用了sourceStageSpliterator() 和 forEachRemaining(action)


         * Gets the source stage spliterator if this pipeline stage is the source
         * stage.  The pipeline is consumed after this method is called and
         * returns successfully.
         * @return the source stage spliterator
         * @throws IllegalStateException if this pipeline stage is not the source
         *         stage.
        final Spliterator<E_OUT> sourceStageSpliterator() {
            if (this != sourceStage)
                throw new IllegalStateException();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
            if (sourceStage.sourceSpliterator != null) {
                Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
                sourceStage.sourceSpliterator = null;
                return s;
            else if (sourceStage.sourceSupplier != null) {
                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
                sourceStage.sourceSupplier = null;
                return s;
            else {
                throw new IllegalStateException(MSG_CONSUMED);

    forEachRemaining(Consumer<? super T> action)

    forEachRemaining的实现就比较多了。这里追到了Iterator类中的forEachRemaining(Consumer<? super T> action)的实现


    Iterator类中的forEachRemaining(Consumer<? super T> action)的实现

         * Performs the given action for each remaining element until all elements
         * have been processed or the action throws an exception.  Actions are
         * performed in the order of iteration, if that order is specified.
         * Exceptions thrown by the action are relayed to the caller.
         * @implSpec
         * <p>The default implementation behaves as if:
         * <pre>{@code
         *     while (hasNext())
         *         action.accept(next());
         * }</pre>
         * @param action The action to be performed for each element
         * @throws NullPointerException if the specified action is null
         * @since 1.8
        default void forEachRemaining(Consumer<? super E> action) {
            while (hasNext())



    可以跟一下 IterationSpliterator对象里的Spliterator()方法

    这里的forEachRemaining(Consumer<? super T> action)方法的多种实现。也是一个重点。

    Debug -> Arrays 里面还有一个 ArrayList。

    public class StreamTest3 {
        public static void main(String[] args) {
            List<String> list = Arrays.asList("hello", "world", "hello world");


            public void forEachRemaining(Consumer<? super T> action) {
                Object[] a; int i, hi; // hoist accesses and checks from loop
                if (action == null)
                    throw new NullPointerException();
                if ((a = array).length >= (hi = fence) &&
                    (i = index) >= 0 && i < (index = hi)) {
                    do { action.accept((T)a[i]); } while (++i < hi);


    list.stream().map(item -> item).forEach(System.out::println);


        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        public void accept(P_OUT u) {



         * Constructor for appending an intermediate operation stage onto an
         * existing pipeline.
         * @param previousStage the upstream pipeline stage
         * @param opFlags the operation flags for the new stage, described in
         * {@link StreamOpFlag}
        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;
            this.previousStage = previousStage;
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;


     * An extension of {@link Consumer} used to conduct values through the stages of
     * a stream pipeline, with additional methods to manage size information,
     * control flow, etc.  Before calling the {@code accept()} method on a
     * {@code Sink} for the first time, you must first call the {@code begin()}
     * method to inform it that data is coming (optionally informing the sink how
     * much data is coming), and after all data has been sent, you must call the
     * {@code end()} method.  After calling {@code end()}, you should not call
     * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
     * offers a mechanism by which the sink can cooperatively signal that it does
     * not wish to receive any more data (the {@code cancellationRequested()}
     * method), which a source can poll before sending more data to the
     * {@code Sink}.
     * <p>A sink may be in one of two states: an initial state and an active state.
     * It starts out in the initial state; the {@code begin()} method transitions
     * it to the active state, and the {@code end()} method transitions it back into
     * the initial state, where it can be re-used.  Data-accepting methods (such as
     * {@code accept()} are only valid in the active state.
     * @apiNote
     * A stream pipeline consists of a source, zero or more intermediate stages
     * (such as filtering or mapping), and a terminal stage, such as reduction or
     * for-each.  For concreteness, consider the pipeline:
     * <pre>{@code
     *     int longestStringLengthStartingWithA
     *         = strings.stream()
     *                  .filter(s -> s.startsWith("A"))
     *                  .mapToInt(String::length)
     *                  .max();
     * }</pre>
     * <p>Here, we have three stages, filtering, mapping, and reducing.  The
     * filtering stage consumes strings and emits a subset of those strings; the
     * mapping stage consumes strings and emits ints; the reduction stage consumes
     * those ints and computes the maximal value.
     * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
     * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
     * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
     * not need a specialized interface for each primitive specialization.  (It
     * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
     * point to the pipeline is the {@code Sink} for the filtering stage, which
     * sends some elements "downstream" -- into the {@code Sink} for the mapping
     * stage, which in turn sends integral values downstream into the {@code Sink}
     * for the reduction stage. The {@code Sink} implementations associated with a
     * given stage is expected to know the data type for the next stage, and call
     * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
     * each stage must implement the correct {@code accept} method corresponding to
     * the data type it accepts.
     * <p>The specialized subtypes such as {@link Sink.OfInt} override
     * {@code accept(Object)} to call the appropriate primitive specialization of
     * {@code accept}, implement the appropriate primitive specialization of
     * {@code Consumer}, and re-abstract the appropriate primitive specialization of
     * {@code accept}.
     * <p>The chaining subtypes such as {@link ChainedInt} not only implement
     * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
     * represents the downstream {@code Sink}, and implement the methods
     * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
     * delegate to the downstream {@code Sink}.  Most implementations of
     * intermediate operations will use these chaining wrappers.  For example, the
     * mapping stage in the above example would look like:
     * <pre>{@code
     *     IntSink is = new Sink.ChainedReference<U>(sink) {
     *         public void accept(U u) {
     *             downstream.accept(mapper.applyAsInt(u));
     *         }
     *     };
     * }</pre>
     * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
     * to receive elements of type {@code U} as input, and pass the downstream sink
     * to the constructor.  Because the next stage expects to receive integers, we
     * must call the {@code accept(int)} method when emitting values to the downstream.
     * The {@code accept()} method applies the mapping function from {@code U} to
     * {@code int} and passes the resulting value to the downstream {@code Sink}.
     * @param <T> type of elements for value streams
     * @since 1.8
    interface Sink<T> extends Consumer<T> {

    执行原理: begin(激活状态)-> accept () -> end() .

    A {@code Sink} instance is used to represent each stage of this pipeline,


    链接引用:ChainedReference: 是Sink

         * Abstract {@code Sink} implementation for creating chains of
         * sinks.  The {@code begin}, {@code end}, and
         * {@code cancellationRequested} methods are wired to chain to the
         * downstream {@code Sink}.  This implementation takes a downstream
         * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
         * implementation of the {@code accept()} method must call the correct
         * {@code accept()} method on the downstream {@code Sink}.
        static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
            protected final Sink<? super E_OUT> downstream;
            public ChainedReference(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            public void begin(long size) {
            public void end() {
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();




      //ReferencePipeline的 map()方法
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        public void accept(P_OUT u) {



