zoukankan      html  css  js  c++  java
  • Java并行任务框架Fork/Join

    Fork/Join是什么?

    Fork意思是分叉,Join为合并。Fork/Join是一个将任务分割并行运行,然后将最终结果合并成为大任务的结果的框架,父任务可以分割成若干个子任务,子任务可以继续分割,提供我们一种方便的并行任务功能,满足实际场景的业务需求,思想类似于MapReduce。任务的分割必须保证子任务独立,不会相互依赖结果。

     
     

    从哪里开始?

    Fork/Join框架主要有如下接口和类:

    • ForkJoinPool:一个线程池,用于执行调度分割的任务,实现了ExecutorService接口。提供三种执行任务的方式:

    1、execute:最原生的执行方式,以异步执行,并且无返回结果。
    2、submit:异步执行,有返回结果,返回结果是封装后的Future对象。
    3、invoke和invokeAll:异步执行,有返回结果,会等待所有任务执行执行完成,返回的结果为无封装的泛型T。

    • ForkJoinTask:抽象的分割任务,提供以分叉的方式执行,以及合并执行结果。
    • RecursiveAction:异步任务,无返回结果。通常自定义的任务要继承,并重写compute方法,任务执行的就是compute方法。
    • RecursiveTask:异步任务,有返回结果。通常自定义的任务要继承,并重写compute方法,任务执行的就是compute方法。

    核心类图

     
     

    从核心类图看出,要想开始一个分割的并行任务,可以创建一个ForkJoinPool线程池,同时创建无返回结果的任务RecursiveAction或有返回结果的任务RecursiveTask,最后调用线程池ForkJoinPool的execute或submit或invoke方法执行任务,完成后合并结果。

    实例

    我们以一个有返回结果的并行任务实例进行测试。计算从起始值到结束值得连续数的累加结果,利用Fork/Join框架。并对比普通计算和并行计算的耗时差异。

    package com.misout.forkjoin;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 计算从起始值到结束值得连续数的累加结果,利用Fork/Join框架
     * @author Misout
     * @date 2018-01-13 16:06:44
     */
    public class SumTask extends RecursiveTask<Long> {
    
        private static final long serialVersionUID = 4828818665955149519L;
        
        /** 每个任务最多允许计算的数字个数阈值,超过这个阈值,任务进行拆分 */
        private static final long THRESHOLD = 1000L;
        
        /** 起始值 */
        private Long startNumber;
        
        /** 结束值 */
        private Long endNumber;
        
        public SumTask(Long startNumber, Long endNumber) {
            this.startNumber = startNumber;
            this.endNumber = endNumber;
        }
    
        /**
         * 累加数的个数超过阈值1000个,拆分成2个子任务执行。子任务继续作拆分。计算完,合并结果。
         */
        @Override
        protected Long compute() {
            if(startNumber > endNumber) {
                System.out.println("start number should be smaller than end number");
                return 0L;
            }
            if(endNumber - startNumber < THRESHOLD) {
                return this.getCount(startNumber, endNumber);
            } else {
                Long mid = (startNumber + endNumber) / 2;
                RecursiveTask<Long> subTask1 = new SumTask(startNumber, mid);
                RecursiveTask<Long> subTask2 = new SumTask(mid + 1, endNumber);
                subTask1.fork();
                subTask2.fork();
                
                return subTask1.join() + subTask2.join();
            }
        }
        
        /**
         * 普通累加执行方法
         * @param start 起始数
         * @param end 结束数
         * @return 累加和
         */
        protected Long getCount(Long start, Long end) {
            Long sum = 0L;
            for(long i = start; i <= end; i++) {
                sum += i;
            }
            
            return sum;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            Long start = 5L;
            Long end = 3463434L;
            SumTask task = new SumTask(start, end);
            
            Long startTime = System.currentTimeMillis();
            Long sum = forkJoinPool.invoke(task);
            Long endTime = System.currentTimeMillis();
            System.out.println("fork/join : sum = " + sum + ", cost time = " + (endTime - startTime) + "ms");
            
            startTime = System.currentTimeMillis();
            Long sum2 = task.getCount(start, end);
            endTime = System.currentTimeMillis();
            System.out.println("normal : sum = " + sum2 + ", cost time = " + (endTime - startTime) + "ms");
        }
    }
    

    说明:SumTask继承RecursiveTask,并实现了compute方法。在compute方法中会进行任务分割,并继续生成子任务,子任务仍然以分割的方式运行。

    运行结果对比:

    fork/join : sum = 5997689267885, cost time = 290ms
    normal : sum = 5997689267885, cost time = 41ms
    

    注意事项:任务拆分的深度最好不要太多,否则很容易因创建的线程过多影响系统性能。

    work-stealing规则

    在Java的API说明中提到,ForkJoinPool线程池与ThreadPoolExecutor线程池不同的地方在于,ForkJoinPool善于利用窃取工作执行加快任务的总体执行速度。实际上,在ForkJoinPool线程池中,若一个工作线程的任务队列为空没有任务执行时,便从其他工作线程中获取任务主动执行。为了实现工作窃取,在工作线程中维护了双端队列,窃取任务线程从队尾获取任务,被窃取任务线程从队头获取任务。这种机制充分利用线程进行并行计算,减少了线程竞争。但是当队列中只存在一个任务了时,两个线程去取反而会造成资源浪费。

     
                               ForkJoinPool工作窃取图
     
  • 相关阅读:
    《代码大全2》阅读笔记08Chapter 15 Using Conditionals
    《代码大全2》阅读笔记09Chapter 16 Controlling Loops
    《代码大全2》阅读笔记12 Chapter 19 General Control Issues
    《代码大全2》阅读笔记13 Chapter 22 Developer Testing
    [转帖]Dictionary, SortedDictionary, SortedList 横向评测
    《代码大全2》阅读笔记07Chapter 12 Fundamental Data Types
    《代码大全2》阅读笔记11 Chapter 24 Refactoring
    《代码大全2》阅读笔记14 Chapter 23 Debugging
    New Concept English 3 01 A Puma at large
    (ZT)委托和事件的区别
  • 原文地址:https://www.cnblogs.com/Joy-Hu/p/10876628.html
Copyright © 2011-2022 走看看