zoukankan      html  css  js  c++  java
  • Java8 Stream 并行计算实现的原理

    Java8 Stream 并行计算实现的原理

    转自:http://lvheyang.com/?p=87

    这两天组内的小伙伴在学习Java8,推广在新项目内使用新特性。正好看到了Stream 带来的遍历的多线程并发:
        

    Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 8, 0, 1)
    
    .stream()
    
    .parallel()
    
    .collect(Collectors.groupingBy(x -> x % 10))
    
    .forEach((x, y) -> System.out.println(x + ":" + y));


    和小伙伴一起试着用各种玩法玩了一下Java8的函数式编程特性之后,感叹到这样子并发计算真的是越来越简单了的。

    但是深入思考之后就会很自然的想到一个问题,这个过程中,我们并没有显示的告诉Stream,我们需要多少个线程进行并行计算?我们能否复用之前的线程进行计算?

    带着这个问题我们先打开了VisualVM,查看一下我们运行这样一个任务会启动多少个线程?

    threads

    我们可以看到默认的parallel计算启动了三个线程进行并行。这三个线程是怎么来的呢?抱着这个问题,我们来参考一下Jdk8的源码,来看看它是如何设置这个值的。

    我们知道Stream 是一个惰性求值的系统(如何进行惰性求值,我会在另一篇博客中进行分析),那么我们只需要找它最后求值的过程,看它是怎样进行求值的就可以了。在AbstractPipeline 这个类里面我们找到了Stream 计算的最终求值过程的默认实现:

    /**
    * Evaluate the pipeline with a terminal operation to produce a result.
    *
    * @param <R> the type of result
    * @param terminalOp the terminal operation to be applied to the pipeline.
    * @return the result
    */
    
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    
        assert getOutputShape() == terminalOp.inputShape();
    
        if (linkedOrConsumed)
    
            throw new IllegalStateException(MSG_STREAM_LINKED);
    
        linkedOrConsumed = true;
    
    
    return isParallel()
    
    ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
    
    : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    
    }
    
    

    在15行,我们可以看到,在求值的时候会检查并行计算的标志位,如果标志了并行计算的话,我们就会并行求值,反之则会串行求值。我们可以进一步进入并行求值的逻辑中,这是一个TerminalOp的默认接口方法,默认实现就是直接调用串行求值,在FindOp、ForEachOp、MatchOp 和 ReduceOp 中得到了覆盖。

    parallel

    @Override
    
    public <P_IN> O evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
    
        return new FindTask<>(this, helper, spliterator).invoke();
    
    }

    如FindOp的代码示例,这四个操作都是创建一个Task的示例,然后执行invoke方法。这些Task的继承关系如图:


    Java的ForkJoin运行原理-Task继承图

    可以看出所有的Task 都继承自Jdk7 中引入的ForkJoin 并行框架的ForkJoinTask。所以我们可以看出Stream 的并行是依赖于ForkJoin 框架的。以AbstractTask 为例我们看看它是如何进行并行计算的:

     
    1. @Override

    2. public void compute() {

    3. Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators

    4. long sizeEstimate = rs.estimateSize();

    5. long sizeThreshold = getTargetSize(sizeEstimate);

    6. boolean forkRight = false;

    7. @SuppressWarnings("unchecked") K task = (K) this;

    8. while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {

    9. K leftChild, rightChild, taskToFork;

    10. task.leftChild = leftChild = task.makeChild(ls);

    11. task.rightChild = rightChild = task.makeChild(rs);

    12. task.setPendingCount(1);

    13. if (forkRight) {

    14. forkRight = false;

    15. rs = ls;

    16. task = leftChild;

    17. taskToFork = rightChild;

    18. }

    19. else {

    20. forkRight = true;

    21. task = rightChild;

    22. taskToFork = leftChild;

    23. }

    24. taskToFork.fork();

    25. sizeEstimate = rs.estimateSize();

    26. }

    27. task.setLocalResult(task.doLeaf());

    28. task.tryComplete();

    29. }

    30.  
    31. ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,

    32. Spliterator spliterator) {

    33. super(parent, spliterator);

    34. this.op = parent.op;

    35. }

    36.  
    37. @Override

    38. protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator spliterator) {

    39. return new ReduceTask<>(this, spliterator);

    40. }


    这里面的主要逻辑就是

        先调用当前splititerator 方法的estimateSize 方法,预估这个分片中的数据量
        根据预估的数据量获取最小处理单元的大小阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork 将任务划分成更小的数据块,进行求解。这里值得注意的是,getTargetSize 在第一次调用的时候会设置:
            预测数据量大小 / (默认并发度 * 4) 的结果作为最小执行单元的数量(配置的默认值是cpu 数 – 1,可以通过java.util.concurrent.ForkJoinPool.common.parallelism设置)
        如果当前分片大小仍然大于处理数据单元的阈值,且分片继续尝试切分成功,那么就继续切分,分别将左右分片的任务创建为新的Task,并且将当前的任务关联为两个新任务的父级任务(逻辑在makeChild 里面)
        先后对左右子节点的任务进行fork,对另外的分区进行分解。同时设定pending 为1,这代表一个task 实际上只会有一个等待的子节点(被fork)。
        当任务已经分解到足够小的时候退出循环,尝试进行结束。调用子类实现的doLeaf方法,完成最小计算单元的计算任务,并设置到当前任务的localResult中
        调用tryComplete 方法进行最终任务的扫尾工作,如果该任务pending 值不等于0,则原子的减1,如果已经等于0,说明任务都已经完成,则调用onCompletion 回调,如果该任务是叶子任务,则直接销毁中间数据结束;如果是中间节点会将左右子节点的结果进行合并
        检查如果这个任务已经没有父级任务了,则将该任务置为正常结束,如果还有则尝试递归的去调用父级节点的onCompletion回调,逐级进行任务的合并。

     
    1. public final void tryComplete() {

    2. CountedCompleter<?> a = this, s = a;

    3. for (int c;;) {

    4. if ((c = a.pending) == 0) {

    5. a.onCompletion(s);

    6. if ((a = (s = a).completer) == null) {

    7. s.quietlyComplete();

    8. return;

    9. }

    10. }

    11. else if (U.compareAndSwapInt(a, PENDING, c, c - 1))

    12. return;

    13. }

    14. }


    说了这么多,大家也基本理解了Stream 的实现原理了。其实本质上就是在ForkJoin上进行了一层封装,将Stream 不断尝试分解成更小的split,然后使用fork/join 框架分而治之。

    所以我们以往关于Fork/Join 的经验也都可以派上用场,可以解答之前我们的几个疑问:

        我在visualvm 中看到的 parallize 的3个线程是怎么来的?
            答:由于 taskToFork.fork() 调用,parallize使用了默认的ForkJoinPool.common 默认的一个静态线程池,这个线程池的默认线程个数是cpu 数量-1。由于我的代码是运行在四个逻辑内核的MacBook 上,所以这里的线程个数为3。如下面代码和注释所示:

     
    1. if (parallelism < 0 && // default 1 less than #cores

    2. (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)

    3. parallelism = 1;

    4. if (parallelism > MAX_CAP)

    5. parallelism = MAX_CAP;

        如何控制parallize 的线程数?
            答:我们可以自己构建一个ForkJoinPool,向其中提交一个parallize 任务,可以做到控制并发度。如以下示例代码:我们将之前的stream 的过程构造成一个runnable 的lambda 匿名函数 ()-> {…}。提交至线程池中,就可以按照我们想要的并发度进行计算了。

     
    1. ForkJoinPool pool = new ForkJoinPool(2);

    2. ret = pool.submit(() -> {

    3. return LongStream.range(1, 50 * 1024 * 1024).boxed().collect(Collectors.toList())

    4. .stream()

    5. .parallel()

    6. .map(x -> x * 2)

    7. .filter(x -> x < 1500)

    8. .reduce((x,y) -> x+y)

    9. .get();

    10. }).get();

    接下来打算继续深入这两个很有意思的问题:

        深入介绍一下Stream的惰性求值过程,最好能跟Scala 的Stream 实现进行比较:Java8 Stream 惰性求值实现分析 – 驴和羊

        深入介绍ForkJoin 的底层实现,包括它是如何进行线程调度和cache line sharing 优化的

    参考文献:

    1. http://blog.krecan.net/2014/03/18/how-to-specify-thread-pool-for-java-8-parallel-streams/
  • 相关阅读:
    oracle 10g 免安装客户端在windows下配置
    sql2005 sa密码
    使用windows live writer 有感
    windows xp SNMP安装包提取
    汉化groove2007
    迁移SQL server 2005 Reporting Services到SQL server 2008 Reporting Services全程截图操作指南
    foxmail 6在使用中的问题
    AGPM客户端连接不上服务器解决一例
    SpringSource Tool Suite add CloudFoundry service
    Java 之 SWing
  • 原文地址:https://www.cnblogs.com/grj001/p/12225557.html
Copyright © 2011-2022 走看看