zoukankan      html  css  js  c++  java
  • 并发编程之 Fork-Join 分而治之框架

    前言

    “分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采取了分而治之的思想。简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始的1000个数据的处理结果。

    Fork & Join 的具体含义

    Fork 一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux 平台中,函数 fork()用来创建子进程,使得系统进程可以多一个执行分支。在 Java 中也沿用了类似的命名方式。

    而 Join() 的含义和 Thread 类的 join 类似,表示等待。也就是使用 fork() 后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此 join 就是表示等待。

    在实际使用中,如果毫无顾忌的使用 fork 开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出一个 ForkJoinPool 线程池,对于 fork() 方法并不急着开启线程,而是提交给 ForkJoiinPool 线程池进行处理,以节省系统资源。

    由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。因此,在实际执行过程中,可能遇到这么一种情况:线程A已经把自己的任务都处理完了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助” 线程B,从线程 B的任务队列中拿一个任务来处理,尽可能的达到平衡。值得注意的是:当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试图执行自己的任务时,则从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。

    我们看看线程池 ForkJoinPool 的一个接口:

        /**
         * Submits a ForkJoinTask for execution.
         *
         * @param task the task to submit
         * @param <T> the type of the task's result
         * @return the task
         * @throws NullPointerException if the task is null
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         */
        public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
            if (task == null)
                throw new NullPointerException();
            externalPush(task);
            return task;
        }
    

    你可以向 ForkJoinPool 线程池提交一个 ForkJoinTask 任务。所谓 ForkJoinTask 任务就是支持 fork () 分解以及 join()等待的任务。 ForkJoinTask 有两个重要的子类,RecursiveAction 和 RecursiveTask。他们粉笔表示没有返回值的任务和可以携带返回值的任务。有点像 Rannable 和 Callable。

    下面来要给简单的例子展示 Fork/Join 框架的使用。这里用来计算求和。

    /**
     *  Fork/Join 核心思想:分而治之
     *
     * 著名的 MapReduce 也是这个思想。将任务进行分解,然后合并所有的结果。
     *
     */
    public class CountTask extends RecursiveTask<Long> {
    
      /**
       * 阀值
       */
      static final int THRESHOLD = 10000;
      long start;
      long end;
    
      public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
      }
    
      /**
       * 有返回值的
       * @return
       */
      @Override
      protected Long compute() {
    
        long sum = 0;
        // 当阀值小于10000则不分解了
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
          for (long i = start; i <= end; i++) {
            sum += i;
          }
        } else {
          // 2000
          long step = (start + end) / 100;
          ArrayList<CountTask> subTasks = new ArrayList<>();
          long pos = start;
          for (int i = 0; i < 100; i++) {
            long lastOne = pos + step;
            if (lastOne > end) {
              lastOne = end;
            }
            //0-2000 个计算任务 * 100
            CountTask subTask = new CountTask(pos, lastOne);
            pos += step + 1;
            subTasks.add(subTask);
            subTask.fork();// fork
          }
    
          for (CountTask t : subTasks) {
            sum += t.join();
          }
        }
        return sum;
    
      }
    
      public static void main(String[] args) {
    
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0, 200000L);
        // 将一个大的任务提交到池中
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        long res = 0;
        try {
          // 等待运算结果
          res = result.get();
          System.out.println("sum = " + res);
        } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
        }
    
      }
    }
    
    

    由于计算求和必须需要返回值,因此我们选择了 RecursiveTask 作为任务的模型。首先我们构造了一个大任务,提交给线程池,线程池会返回一个携带结果的任务,通过 get 方法可以得到最终结果。如果执行 get 方法时任务没有结束,那么主线程就会在 get 方法等待。

    再看看 CountTask 的实现,首先 CountTask 继承自 RecursiveTask ,可以携带返回值,这里的返回值类型设置为 long,定义一个 THRESHOLD 设置了任务分解的规模,也就是如果需要求和的总数大于 THRESHOLD 个,那么任务就需要再次分解,否则就直接执行。 每次分解时,简单的将原有任务划分成100个规模相等的小任务,并使用 fork() 提交子任务。之后,等待所有的子任务结束,并将结果再次求和。

    再使用 ForkJoin的时候注意:如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况: 第一,系统内的线程数量越来越多,导致性能严重下降。第二,函数的调用层次变的很深,最终导致栈溢出。

    此外,ForkJoin 线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中,待将来有任务可用时,再从栈中唤醒这些线程。

    总结

    本文来源自 《Java 高并发程序设计》,没有什么自己的见解。因为使用场景太少了。不过还是可以看看源码来涨涨姿势的。嘿嘿。

    good luck !!!!

  • 相关阅读:
    HAOI2018 简要题解
    HAOI2017 简要题解
    BZOJ 5477: 星际穿越
    HAOI2016 简要题解
    C#oracle还原imp实例
    oracle备份imp命令大全
    C#oracle备份和还原
    win10安装CAD后出现致命错误
    Oracle 恢复数据后,数据库中中文变成问号解决方法
    CAD 安装时出现.net frameword 3.5安装不上的问题
  • 原文地址:https://www.cnblogs.com/stateis0/p/9062018.html
Copyright © 2011-2022 走看看