zoukankan      html  css  js  c++  java
  • Fork/Join框架

    Fork/Join框架:将一个任务拆分(fork)成若干个子任务,再将一个个的子任务运算的结果进行汇总(join)。

      以四核CPU为例,多线程下将任务分配到每个cpu线程核上,传统线程存在的问题是:每个任务可能会阻塞,一单某个线程发生了阻塞,那么这个核上线程的

    其他线程将不能执行,与此同时,其他核上的线程执行完成后会达到空闲状态,这样阻塞的线程队列还在等待,这样一定程度上会造成资源浪费,没有很好的利用

    CPU资源,被白白浪费掉,也会影响到效率问题。

      Fork/Join框架是将一个任务分成多个子任务,再将子任务压入到线程中,形成一个个线程队列,与传统线程池不一样的地方是,Fork/Join采用的是“工作

    窃取”模式,简单理解为:当执行新的任务时它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它

    放在自己的队列中。如果某个子问题由于阻塞而无法继续执行的时候,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行,这种方式减少了

    线程的等待时间,提高了性能。再说的直白一点就是,其他线程完事之后不会变成空闲状态,而是去别的线程队列上“偷”一个来“帮助”执行(因为线程队列

    是双端队列,所以可以从尾部进行“偷取”),更好的利用了CPU资源。

      Fork/Join在java1.7的时候就有了,为啥没有广泛的利用呢?写个Demo就知道了!

    package com.lql.java8;
    
    import java.util.concurrent.RecursiveTask;
    
    /**
     * @author: lql
     * @date: 2019.08.24
     * Description: ForkJoinDemo
     */
    public class ForkJoinCalculate extends RecursiveTask<Long> {
        /**
         * 必须继承 :
         *          RecursiveTask                          或者                         RecursiveAction
         *              有返回值                                                           返回值
         *          @Override                                                           @Override
         *       protected Long compute() {return null;}                                protected void compute() { }
         */
    
        //需求:做个累加操作
        /**
         * start:起始
         * end:终止
         * THRESHOLD:临界值
         */
        private long start;
        private long end;
        private static long THRESHOLD = 10000;
    
        public ForkJoinCalculate(long start,long end) {
            this.start = start;
            this.end = end;
        }
    
    
        @Override
        protected Long compute() {
            long length = end - start;
            if (length <= THRESHOLD) {
                //小于等于临界值将不在拆分
                long sum = 0; //累加
                for (long i = start; i <= end; i++ ) {
                    sum+=i;
                }
                return sum;
            } else {
                long mid= (start + end) / 2 ;
                //如果没到临界值则递归去拆
                ForkJoinCalculate left = new ForkJoinCalculate(start,end);
                left.fork();   //拆分子任务
    
                ForkJoinCalculate right = new ForkJoinCalculate(mid+1,end);
                right.fork();
                //汇总
                return left.join()+right.join();
            }
        }
    }

    接下来进行测试:

    package com.lql.java8;
    
    import org.junit.Test;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    
    /**
     * @author: lql
     * @date: 2019.08.24
     * Description:     ForkJoin测试
     */
    public class ForkTestTest {
    
        @Test
        public void test() {
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Long> task = new ForkJoinCalculate(0,1000000000L);
            Long sum = pool.invoke(task);
            System.out.println(sum);
        }
    }

    可以看出来代码复杂,这也是没有广泛使用起来的原因,在java8中,并行流很好的解决这种复杂性,也是做累加操作,代码如下:

      @Test
        public void test2() {
            LongStream.range(0, 1000000000L).parallel().reduce(0,Long::sum);
        }

     这里介绍下“并行流和顺序流

    并行流:就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流,StreamAPI可以声明性地通过parallel()与sequential()在并行流和顺序流之间进行切换

    顺序流:执行过程是一个连续的步骤序列,它在完成一个活动之后会去执行到下一个

    当然parallel()底层就是Fork/Join.

  • 相关阅读:
    Spring MVC程序中得到静态资源文件css,js,图片文件的路径问题总结
    【转】MySQL数据丢失讨论
    【摘】 pt-query-digest工具一解
    【摘】Mysql备份还原数据库之mysqldump实例及参数详细说明
    【原】redis插件安装
    【摘】linux中fstab解说
    【原】mysql慢日志分析
    【原】带宽、流量单位换算关系
    【转】Linux查看内存大小和插槽
    【转】bind
  • 原文地址:https://www.cnblogs.com/-qilin/p/11404222.html
Copyright © 2011-2022 走看看