zoukankan      html  css  js  c++  java
  • ForkJoin框架详解

    ForkJoin框架详解

    什么是Fork/Join框架

    • 提供一个可以用于并行执行任务的框架.
    • 是把一个大任务分割成若干个小任务,最终汇总小任务结果后得到大任务结果的框架.
    • Fork:将大任务切分成若干个子任务并行的执行. Join:合并这些子任务的执行结果,最后得到大任务的结果.

    工作窃取算法(work-stealing)

    • 将一个大的任务分割成若干个小任务,放在不同的任务队列中.
    • 每个任务队列都有一个线程对其进行处理.
    • 若一个线程处理完其任务队列,可以窃取其他线程的队列中的未完成的任务.
    • 为了防止线程之间的竞争,任务队列采用双端队列.原线程从队列的一端获取任务.窃取任务的线程从队列的另外一段获取任务.

    特点:

    • 充分利用线程并行计算,减少线程间的竞争.
    • 建立多个线程和多个队列消耗资源.

    Fork/Join框架

    步骤:

    1. 分割任务.通过fork将大任务分割成子任务.
    2. 执行任务并合并结果.分割的子任务放在双端队列中,启动多个线程获取任务并执行.子任务的结果同一放在一个队列中,启动一个线程获取结果后合并获取最终结果.

    Fork/Join框架提供的类:

    • ForkJoinTask:提供在任务中执行fork()join()操作的机制.可以继承其子类:
      • RecursiveAction:没有返回结果的任务.
      • RecursiveTask:有返回结果的任务.
    • ForkJoinPool:提交的任务由其来执行.分割出来的子任务进入当前线程维护的双端队列的头部.工作线程没有任务时会从其他线程的队列的尾部获取任务.

    使用Fork/Join框架

    class MyForkJoin extends RecursiveTask{
    
        private int[] array;
        private int start;
        private int end;
    
        MyForkJoin(int[] array, int start, int end){
            this.array = array;
            this.start = start;
            this.end = end;
            System.out.println(Thread.currentThread().getName() + " Time is: " + System.currentTimeMillis());
        }
    
        @Override
        protected Integer compute() {
            int sum = 0;
            if (end-start<=1){
                for (int i = start; i <= end; i++) {
                    sum += array[i];
                }
            }
            else {
                MyForkJoin task1 = new MyForkJoin(array, start, (start + end) / 2);
                MyForkJoin task2 = new MyForkJoin(array, (start + end) / 2 + 1, end);
                task1.fork(); // 执行任务
                task2.fork();
    
                int res1 = (int) task1.join(); // 等待任务完成获取结果
                int res2 = (int) task2.join();
                sum = res1+res2;
            }
            return sum;
        }
    }
    

    异常处理

    ForkJoinTask框架可能抛出异常,无法在主线程中直接获取异常.
    可以通过isCompletedAbnormally()方法检查任务是否已经抛出异常或者已经被取消.getException()返回Throwable对象.

    • 任务取消 --> CancellationExcetpion.
    • 任务未完成或没有抛出异常 --> null.

    实现原理

    • ForkJoinPool是由ForkJoinTask数组和ForkJoinWorkerThread数组组成.
    • ForkJoinTask数组用来存放提交给ForkJoinPool的任务.
    • ForkJoinWorkerThread数组负责执行这些任务.

    核心方法

    fork()

    • 异步执行这个任务并立即返回结果.
    • 会把当前任务存放在ForkJoinTask数组中,再唤醒或创建一个工作线程来执行任务.

    join()

    • 阻塞当前线程并等待获得结果.
    • 首先由当前任务的状态判断返回的结果类型:
      • 已完成(NORMAL) -- > 直接返回结果
      • 被取消(CANCELLED) -- > 抛出CancellationException
      • 信号(SIGNAL)
      • 出现异常(EXCEPTIONAL) -- > 抛出对应的异常
  • 相关阅读:
    动态规划-重叠子问题
    百度 谷歌 Twitter,这么多短链接服务(Short Url)究竟哪家强?
    java中String初始化的两种方式
    bzoj 1218 [HNOI2003]激光炸弹
    Android TextView 横向滚动(跑马灯效果)
    混合高斯模型的EM求解(Mixtures of Gaussians)及Python实现源代码
    【Allwinner ClassA20类库分析】 2.free pascal语法及结构简析
    昂贵的聘礼
    C++11时间具体解释
    C++开发人脸性别识别教程(7)——搭建MFC框架之界面绘制
  • 原文地址:https://www.cnblogs.com/truestoriesavici01/p/13216213.html
Copyright © 2011-2022 走看看