zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(12)----Fork/Join框架

    1. Fork/Join 的概念

      Fork指的是将系统进程分成多个执行分支(线程),Join即是等待,当fork()方法创建了多个线程之后,需要等待这些分支执行完毕之后,才能得到最终的结果,因此join就表示等待。在实际的使用中,如果毫无顾忌的使用fork()来开启线程,可能会由于线程过多而影响系统性能,因此在jdk中提供了ForkJoinPool线程池来对fork进行处理,以节省资源。

      对于线程池来说,提交的任务数量并不总是与线程数相等的,大多数情况下一个物理线程可能需要执行多个逻辑任务。所以每个线程必然会有一个任务队列。在实际的执行过程中,可能会出现A线程已经执行完成队列中的所有任务了,但是B线程中还有很多任务等着执行,此时A线程就会从B线程中拿到任务过来处理,尽可能的达到平衡。需要注意的是,当线程开始帮助别的线程执行任务时,总会从其他的线程任务队列的底部开始拿,而线程执行自己任务的时候,总会从队列的顶部开始拿,这样就你能有效的避免了线程之间数据的竞争。

    2. 使用方式

      在看使用方式之前,先来看ForkJoinPool的一个重要的接口:

      

     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

      它接收一个ForkJoinTask作为任务,ForkJoinTask就是持有fork()分解和join()等待的任务。它有两个重要的子类,即RecursiveTask有返回值任务 和RecursiveAction无返回值任务,在使用的时候只需要根据场景继承它的的两个子类之一即可。示例代码为带返回值的任务的使用方式:

    package com.wangx.thread.t7;
    
    import java.util.ArrayList;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 求和
     */
    public class CountTask extends RecursiveTask<Long> {
    
        //任务分配标准
        private static final int THRESHOLD = 1000;
        private long start;
        private long end;
    
        public CountTask(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        /**
         * 分而治之,分成多个小任务执行,最后再执行汇总
         * @return
         */
        @Override
        public Long compute() {
            long sum = 0;
            boolean canCompute = (end - start) < THRESHOLD;
            //开始和结束之间间隔小于1000
            if (canCompute) {
                //执行数据求和
                for (long i = start; i <= end; i++) {
                    sum += i;
                }
            } else {
                //分成100个小任务
                long step = (start + end) / 100;
                ArrayList<CountTask> countTaskList = new ArrayList<>();
    
                long pos = start;
    
                for (int i = 0; i < 100; i++) {
                    long lastOne = pos + step;
                    CountTask task = new CountTask(pos, lastOne);
                    pos += step+1;
                    countTaskList.add(task);
                    //开启子任务
                    task.fork();
                }
                //等待所有子任务都执行完毕后再对子任务进行求和
                for (CountTask countTask : countTaskList) {
                    sum += countTask.join();
                }
            }
            return sum;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
    
            CountTask countTask = new CountTask(0, 20000L);
    
            ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(countTask);
    
            try {
                long res = forkJoinTask.get();
                System.out.println("sum=" + res );
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

      该示例是用于计算数列和的,必然是需要返回值的,正好可以继承RecursiveTask的方式来使用,使用forkJoinPool.submit()将任务交给线程池,线程池返回一个ForkJoinTask对象,调用该对象的get()方法获取结果,当在执行get()方法时任务没有执行完成,则主线程会一直等待所有线程执行完成。

      compute()方法主要对任务进行分解,当求和数量大于THRESHOLD个时,就需要再次分解任务,否则直接求和,分解任务时,简单的将原有的任务分成100个小任务,并使用fork()方法提交,再然后调用join()方法等待所有任务执行完成后,最后对每个子任务的结果再次进行求和,得到最终结果。

    原文 并发编程学习笔记(12)----Fork/Join框架

  • 相关阅读:
    linux 安装Python3
    MYSQL 字符集设置(终端的字符集)
    Linux LVM Logical Volume Management 逻辑卷的管理
    oracle 重命名和重定位数据文件(Oracle Renaming and Relocating Datafiles)
    Azkaban编译
    基于hive的transform实现自定义分隔符数据导出
    MapReduce优化设置
    hive.groupby.skewindata环境变量与负载均衡
    hive的基本操作
    Shell 数组的定义和使用
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10868899.html
Copyright © 2011-2022 走看看