zoukankan      html  css  js  c++  java
  • 线程池 一 ForkJoinPool

    java.util.concurrent
    public class ForkJoinPool extends AbstractExecutorService
    public abstract class ForkJoinTask<V> implements Future<V>, Serializable
    public class ForkJoinWorkerThread extends Thread
    

    Fork/Join

    Fork/Join技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。
    
    ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,
    把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
    
    JDK用来执行Fork/Join任务的工作线程池默认大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。
    
    目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。
    
    
    1. Fork/Join框架主要由ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask来实现
    
    2. ForkJoinPool中只可以运行ForkJoinTask类型的任务
        (在实际使用中,也可以接收Runnable/Callable任务,但在真正运行时,也会把这些任务封装成ForkJoinTask类型的任务)
    
    3. ForkJoinTask表示一个任务,
        ForkJoinTask的子类中有RecursiveAction和RecursiveTask
        RecursiveAction无返回结果,RecursiveTask有返回结果
        工作中,一般重写RecursiveAction或RecursiveTask的compute(),完成计算或者可以进行任务拆分。
    
    4. 调用ForkJoinTask的fork()的方法,可以让其他空闲的线程执行这个ForkJoinTask
        调用ForkJoinTask的join()的方法,将多个小任务的结果进行汇总。
    
    5. ForkJoinWorkerThread是运行ForkJoinTask任务的工作线程
    

    work-stealing(工作窃取)算法

    ForkJoinPool中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),
    工作线程优先处理来自自身队列的任务(LIFO),然后以FIFO的顺序随机窃取其他队列中的任务。

    ForkJoinPool

    ForkJoinPool并行的实现了分治算法(Divide-and-Conquer):把任务递归的拆分为各个子任务,这样可以更好的利用系统资源

    ForkJoinPool中的任务分为两种:

    1. 一种是本地提交的任务(Submission task,如 execute、submit 提交的任务)
    2. 另外一种是 fork 出的子任务(Worker task)

    提交任务

    外部任务(external/submissions task)提交三种方式:
    
        execute()是直接向池提交一个任务来异步执行,无返回结果;
    
        invoke()会一直阻塞到任务执行完成返回计算结果
    
        submit()也是异步执行,但是会返回提交的任务,在适当的时候可通过task.get()获取执行结果,实现同步到主线程
    
    
    子任务(Worker task)提交:
        由任务的fork()方法完成。
        任务被分割(fork)之后调用了ForkJoinPool.WorkQueue.push()方法直接把任务放到队列中等待被执行。
    
        public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
            int s1, s2;
            t2.fork();
            if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
                t1.reportException(s1);
            if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
                t2.reportException(s2);
        }
    

    示例

    对于fork/join模式,假如pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B和C,
    然后A当监工不干活,B和C去完成A交代的任务。
    所以上面的模式相当于浪费了一个线程。
    
    如果使用invokeAll相当于A分工给B和C后,A和B和C都去完成工作。这样缩短了执行的时间。
    
    
    /**
     * 测试 ForkJoinPool 线程池的使用
     */
    public class task {
    
        /**
         * 测试使用 ForkJoinPool 无返回值的任务执行
         */
        public static void noResultTask() throws Exception {
            ForkJoinPool pool = new ForkJoinPool();
            pool.submit(new PrintTask(1, 200));
            pool.shutdown();
        }
    }
    
    
    /**
     * 无返回值的打印任务
     */
    class PrintTask extends RecursiveAction {
    
        private static final long serialVersionUID = 1L;
        private static final int THRESHOLD = 50;
        private int start;
        private int end;
    
        public PrintTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected void compute() {
            //当 结束值 比 起始值 大于 50 时,按数值区间平均拆分为两个任务;否则直接打印该区间的值
            if (end - start < THRESHOLD) {
                for (int i = start; i <= end; i++) {
                    System.out.println(Thread.currentThread().getName() + ", i = " + i);
                }
            } else {
                int middle = (start + end) / 2;
                //分成两个子任务
                PrintTask firstTask = new PrintTask(start, middle);
                PrintTask secondTask = new PrintTask(middle + 1, end);
                //任务拆分
                //firstTask.fork();
                //secondTask.fork();
                invokeAll(firstTask,secondTask);
            }
        }
    }
    
    
    public class task {
    
        /**
         * 测试使用 ForkJoinPool 有返回值的任务执行,对结果进行合并。计算 1 到 200 的累加和
         */
        public static void hasResultTask() throws Exception {
            //线程池
            ForkJoinPool pool = new ForkJoinPool();
            //提交任务
            ForkJoinTask<Integer> task = pool.submit(new CalculateTask(1, 200));
            //得到结果
            int result = task.get();
            //关闭线程
            pool.shutdown();
        }
    
    }
    
    
    /**
     * 有返回值的计算任务
     */
    class CalculateTask extends RecursiveTask<Integer> {
    
        private static final long serialVersionUID = 1L;
        private static final int THRESHOLD = 50;
        private int start;
        private int end;
    
        public CalculateTask(int start, int end) {
            super();
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            //当 结束值 比 起始值 大于 50 时,按数值区间平均拆分为两个任务,进行两个任务的累加值汇总
            //否则直接计算累加值
            if (end - start <= THRESHOLD) {
                int result = 0;
                for (int i = start; i <= end; i++) {
                    result += i;
                }
                return result;
            } else {
                int middle = (start + end) / 2;
                CalculateTask firstTask = new CalculateTask(start, middle);
                CalculateTask secondTask = new CalculateTask(middle + 1, end);
                //任务拆分
                invokeAll(firstTask,secondTask);
                //任务合并
                return firstTask.join() + secondTask.join();
            }
        }
    
    }
    
  • 相关阅读:
    【洛谷3214】[HNOI2011] 卡农(思维)
    【洛谷2609】[ZJOI2012] 数列(高精度)
    【洛谷4501】[ZJOI2018] 胖(二分+RMQ)
    【洛谷4726】【模板】多项式指数函数(多项式 exp)
    uC/OS-II之入门与介绍20160525
    [转]Delphi 关键字详解
    [转]单元文件结构
    Delphi ComboBox的属性和事件、及几个鼠标事件的触发
    Delphi 用ToolButton和MonthCalendar实现DateTimePicker的功能
    Delphi 动态改变Rzsplitter的Orientation(方向)属性
  • 原文地址:https://www.cnblogs.com/loveer/p/11416921.html
Copyright © 2011-2022 走看看