zoukankan      html  css  js  c++  java
  • java中StreamAPI的流的创建及操作原理分析

    前言

    StreamAPI是java8提供的一种方便,高效操作容器的工具。可以根据数组或集合创建一个流。

    简单使用

    import java.util.Arrays;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    public class Client {
    
      public static void main(String[] args) {
        //根据int数组创建流
        System.out.println(Arrays.stream(new int[]{1, 2, 3, 4, 5}).sum());
        //根据对象数组创建流
        System.out.println(Stream.of("a", "b", "c").collect(Collectors.joining(",")));
        //创建无限流
        System.out.println(Stream.generate(() -> 1).limit(5).mapToInt(x -> x).sum());
      }
    
    }
    

    一个Stream流有3部分构成

    • 零个或多个中间操作
    • 终止操作

    无状态表示当前元素的处理不受之前元素的影响,有状态表示只有拿到所有元素才能处理,如排序。

    /**
    * 分割迭代器接口,核心接口
    */
    public interface Spliterator<T> {
        /**
         * 如果还有剩余元素,执行action,返回true,否则返回false,相当于迭代器的hasNext()和next()的结合
         */
        boolean tryAdvance(Consumer<? super T> action);
    
        /**
         * 对所有剩余元素执行action
         */
        default void forEachRemaining(Consumer<? super T> action) {
            do { } while (tryAdvance(action));
        }
    
        /**
         * 将一个Spliterator按大小分割成左右两个Spliterator,并行流调用
         */
        Spliterator<T> trySplit();
    
        /**
         * 返回一个估计的大小
         */
        long estimateSize();
    
        /**
         * 如果知道大小,返回确切大小
         */
        default long getExactSizeIfKnown() {
            return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
        }
    
        /**
         * 返回分割器的标识
         */
        int characteristics();
    
        /**
         * 是否包含指定标识
         */
        default boolean hasCharacteristics(int characteristics) {
            return (characteristics() & characteristics) == characteristics;
        }
    
        /**
         * 返回比较器
         */
        default Comparator<? super T> getComparator() {
            throw new IllegalStateException();
        }
    
        /**
         * 是否有序
         */
        public static final int ORDERED    = 0x00000010;
    
        /**
         * 是否去重
         */
        public static final int DISTINCT   = 0x00000001;
    
        /**
         * 排序的
         */
        public static final int SORTED     = 0x00000004;
    
        /**
         * 有确定大小的
         */
        public static final int SIZED      = 0x00000040;
    
        /**
         * 非空的
         */
        public static final int NONNULL    = 0x00000100;
    
        /**
         * 不可变的
         */
        public static final int IMMUTABLE  = 0x00000400;
    
        /**
         * 支持并发的
         */
        public static final int CONCURRENT = 0x00001000;
    
        /**
         * 分割后确定大小的
         */
        public static final int SUBSIZED = 0x00004000;
    
    }
    

    关于Spliterator接口的子接口OfInt的一个疑惑

    /**
         * A Spliterator specialized for {@code int} values.
         * @since 1.8
         */
        public interface OfInt extends OfPrimitive<Integer, IntConsumer, OfInt> {
    
            @Override
            OfInt trySplit();
    
            @Override
            boolean tryAdvance(IntConsumer action);
    
            /**
             * {@inheritDoc}
             * @implSpec
             * If the action is an instance of {@code IntConsumer} then it is cast
             * to {@code IntConsumer} and passed to
             * {@link #tryAdvance(java.util.function.IntConsumer)}; otherwise
             * the action is adapted to an instance of {@code IntConsumer}, by
             * boxing the argument of {@code IntConsumer}, and then passed to
             * {@link #tryAdvance(java.util.function.IntConsumer)}.
             */
            @Override
            default boolean tryAdvance(Consumer<? super Integer> action) {
                if (action instanceof IntConsumer) {
                    return tryAdvance((IntConsumer) action);
                }
                else {
                    if (Tripwire.ENABLED)
                        Tripwire.trip(getClass(),
                                      "{0} calling Spliterator.OfInt.tryAdvance((IntConsumer) action::accept)");
                    return tryAdvance((IntConsumer) action::accept);
                }
            }
        }
    

    有两个tryAdvance的重载方法,一个参数类型为IntConsumer ,一个参数类型为Consumer,Consumer 和 IntConsumer 接口之间是没有任何的继承关系的,为什么能够强转成功,不知道传递什么参数会达到这一步。

    /**
         * A Spliterator using a given Iterator for element
         * operations. The spliterator implements {@code trySplit} to
         * permit limited parallelism.
         */
        static class IteratorSpliterator<T> implements Spliterator<T> {
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if (action == null) throw new NullPointerException();
                if (it == null) {
                    it = collection.iterator();
                    est = (long) collection.size();
                }
                if (it.hasNext()) {
                    action.accept(it.next());
                    return true;
                }
                return false;
            }
        }
    

    集合容器使用的Spliterator ,基于Iterator 。tryAdvance()方法就是调用迭代器的hasNext()和next()方法。

    源码分析

    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    
        /**
         * 管道的源阶段
         */
        @SuppressWarnings("rawtypes")
        private final AbstractPipeline sourceStage;
    
        /**
         * 管道的上一个阶段
         */
        @SuppressWarnings("rawtypes")
        private final AbstractPipeline previousStage;
    
        /**
         * The operation flags for the intermediate operation represented by this
         * pipeline object.
         */
        protected final int sourceOrOpFlags;
    
        /**
         * 管道的下一个阶段
         */
        @SuppressWarnings("rawtypes")
        private AbstractPipeline nextStage;
    
        /**
         * 深度
         */
        private int depth;
    
        /**
         * 源分割器
         */
        private Spliterator<?> sourceSpliterator;
    
    }
    

    一个管道流包括一个源头和多个中间阶段,通过类似双向链表的方式连接起来。

    通过Sink接口将所有操作连接起来,相互调用。如Stream.map()将操作封装成一个Sink。

        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    

    从当前阶段向前找,直到源阶段,将最后一个Sink依次包装成一个包含所有操作的Sink

        @Override
        @SuppressWarnings("unchecked")
        final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
    
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }
    

    执行最终的Sink

        @Override
        final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
    
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                spliterator.forEachRemaining(wrappedSink);
                wrappedSink.end();
            }
            else {
                copyIntoWithCancel(wrappedSink, spliterator);
            }
        }
    

    此时的Sink就是包含所有操作的Sink,执行这个Sink就是执行整个流。

    参考

    Java8之Stream API原理
    深入理解Java Stream流水线
    Weird implementation of tryAdvance in Spliterator.OfInt

  • 相关阅读:
    LeetCode "Palindrome Partition II"
    LeetCode "Longest Substring Without Repeating Characters"
    LeetCode "Wildcard Matching"
    LeetCode "Best Time to Buy and Sell Stock II"
    LeetCodeEPI "Best Time to Buy and Sell Stock"
    LeetCode "Substring with Concatenation of All Words"
    LeetCode "Word Break II"
    LeetCode "Word Break"
    Some thoughts..
    LeetCode "Longest Valid Parentheses"
  • 原文地址:https://www.cnblogs.com/strongmore/p/14656338.html
Copyright © 2011-2022 走看看