zoukankan      html  css  js  c++  java
  • 廖雪峰Java11多线程编程-3高级concurrent包-9Fork_Join

    1. Fork/Join模式

    线程池可以高效执行大量小任务:

    Fork/Join线程池可以执行一种特殊的任务:

    • 把一个大任务拆成多个小任务并行执行
    • Fork/Join是在JDK 1.7引入的

    Fork/Join模式的应用:

    • java.util.Arrays.parallelSort(array):方法内部通过Fork/Join对一个大数组进行并行排序,在多核CPU上就可以大大提高排序的速度。

    例如:计算一个有1000万个元素数组的和。
    如果把一个大数组拆成2个数组,就可以利用双核CPU并行计算,最后把2个结果相加就是最终的结果。

    如果拆成2个数组以后,每个数组仍然很大,可以进一步拆分成4个数组,我们就可以让4核CPU并行执行。

    Fork/Join就是把一个大任务不断的拆成小任务,执行并行计算的一种模式。

    class SumTask extends RecursiveTask<Long> {
        @Override
        protected Long compute(){
            //把一个大任务分拆成2个子任务
            SumTask subtask1 = new Sumtask(...);
            SumTask subtask2 = new Sumtask(...);
    
            //调用invokeAll()同时运行2个小任务,当2个任务都完成以后,invokeAll才返回
            invokeAll(subtask1, subtask2); 
    
            //通过join()获得2个子任务的结果
            Long result1 = subtask1.join();
            Long result2 = subtask2.join();
    
            return result1 + result2; //返回结果
        }
    }
    

    使用Fork/Join的关键在于,在compute方法内部,我们需要把一个大任务分拆成2个小任务,然后调用invokeAll()这个方法同时运行2个小任务。
    当2个任务都运行结束以后,invokeAll()才会返回。接着,我们调用join()方法获得2个小任务的执行结果,最后把2个结果相加返回。

    Recursive可以不断的把自身拆成小任务并行执行

    2.示例

    1.创建长度为1000的随机数组成的数组,并计算和作为期望值
    2.使用SumTask创建一个ForkJoinTask对象。
    3.通过ForkJoinPool.commonPool()方法获得一个commonPool(),然后用invoke来执行这个任务
    4.求和的任务SumTask继承制RecursiveTask,返回值是Long类型,指定常量是THREHOLD。构造方法是传入一个数组、开始索引、结束索引。
    5.SumTask的compute方法:我们首先判断任务是否足够的小。是,就直接进行计算,并且返回计算的结果;否,即继续拆分,这样我们就获得了subtask1,subtask2 这两个子任务。紧接着调用invokeAll()方法,同时执行这两个子任务。然后我们用join()方法获得两个子任务的结果,相加返回。

    import java.awt.*;
    import java.util.Random;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    class SumTask extends RecursiveTask<Long> {//求和的任务继承至RecursiveTask,返回值是Long类型
        //执行常量THRESHOLD,用它来分解任务
        static final int THRESHOLD = 250;
        long[] array;
        int start;
        int end;
        SumTask(long[] array,int start,int end){
            //创建SumTask时,传入一个数组,起始结束位置的索引
            this.array = array;
            this.start = start;
            this.end = end;
        }
        public Long compute(){
            //执行compute方法时,任务足够小,直接计算返回执行的结果
            if(end - start <= THRESHOLD){
                long sum = 0;
                for(int i=start;i<end;i++){
                    sum += this.array[i];
                    try{
                        Thread.sleep(2);
                    }catch (InterruptedException e){}
                }
                return sum;
            }
            //如果任务太大,就一分为二,拆成2个子任务
            int middle = (start + end) / 2;
            System.out.println(String.format("split %d-%d ==> %d~%d ,%d~%d",start,end,start,middle,middle,end));
            SumTask subtask1 = new SumTask(this.array,start,middle);
            SumTask subtaks2 = new SumTask(this.array,middle,end);
            //调用invokeAll同时执行这两个任务
            invokeAll(subtask1,subtaks2);
            //用join获取子任务的结果
            Long subresult1 = subtask1.join();
            Long subresult2 = subtaks2.join();
            Long result = subresult1 + subresult2;
            System.out.println(String.format("result = %d + %d ==> %d",subresult1,subresult2,result));
            return result;
        }
    }
    public class ForkJoinTaskSample {
        //对一个大数组进行求和
        public static void main(String[] args) throws Exception{
            long[] array = new long[1000]; //创建一个包含1000个元素的数组
            long expectedSum = 0;
            for(int i=0;i<array.length;i++){
                //1.创建数组的过程中,并计算数组的和作为期望值
                array[i] = random();
                expectedSum += array[i];
            }
    
            System.out.println("expectedSum: "+expectedSum);
            //创建一个ForkJoinTask
            ForkJoinTask<Long> task = new SumTask(array,0,array.length);
            Long startTime = System.currentTimeMillis();
            //通过ForkJoinPool.commonPool获得一个ForkJoinPool,用invoke方法执行这个任务
            Long result = ForkJoinPool.commonPool().invoke(task);
            Long endTime = System.currentTimeMillis();
            System.out.println("Fork/join sum: "+result+" in "+(endTime-startTime));
        }
        static Random random = new Random(0);
        static long random(){
            return random.nextInt(10000);
        }
    }
    

    3. 总结:

    • Fork/Join是一种基于分治的算法:分解任务+合并结果
    • Fork/JoinPool线程池可以把一个大任务拆成小任务并行执行
    • 任务类必须继承自RecursiveTask/RecursiveAction。
      * RecursiveTask有返回值;RecursiveAction没有返回值
    • 使用Fork/Join模式可以进行并行计算提高效率
  • 相关阅读:
    多想一点和多做一步
    js 判断字符是否以汉字开头
    代码片断编辑测试窗
    部署WAR文件到tomcat
    ROS机器人程序设计(原书第2版)补充资料 (柒) 第七章 3D建模与仿真 urdf Gazebo V-Rep Webots Morse
    ROS机器人程序设计(原书第2版)补充资料 (陆) 第六章 点云 PCL
    ROS机器人程序设计(原书第2版)补充资料 (伍) 第五章 计算机视觉
    ROS机器人程序设计(原书第2版)补充资料 (肆) 第四章 在ROS下使用传感器和执行器
    ROS机器人程序设计(原书第2版)补充资料 (叁) 第三章 可视化和调试工具
    ROS机器人程序设计(原书第2版)补充资料 (贰) 第二章 ROS系统架构及概念
  • 原文地址:https://www.cnblogs.com/csj2018/p/11033148.html
Copyright © 2011-2022 走看看