zoukankan      html  css  js  c++  java
  • Fork/Join框架

    Fork-Join框架

    分而治之:Fork/Join框架
    分而治之是一种有效处理大量数据的方法,简单地说,如果你要处理1000个数据,但是又不具备处理它们的能力,那么你可以只处理其中的10个数据,再分阶段处理100次,将它们的结果进行合成,就是最终想要的结果。
     
    在实际使用过程中,如果毫无顾忌的使用fork()方法开启线程进行处理,可能会导致线程过多而严重影响性能。所以在JDK中提供了一个ForkJoinPool线程池,对于fork()方法不急于开启线程,而是提交给这个线程池进行处理,以节省系统资源。
     
    由于线程池的优化,实际的任务数量和线程数量并不会是一一对应的关系,一个物理线程需要处理多个任务,因此也需要任务队列。实际处理过程中,可能会有这种情况:
    A线程的任务已经处理完成,B线程堆积了许多任务,这时候A线程就会帮助B线程,从B线程的任务队列中获取任务来处理,尽可能的达到平衡,同时,处理自己的任务的时候,从任务队列的顶部开始获取数据,帮助别的线程的时候,从底部获取数据,尽量避免竞争。
     
    看一下ForkJoinPool线程池的一个重要的方法:
    通过submit方法可以向线程池提交一个ForkJoinTask任务,所谓ForkJoinTask任务就是支持fork()方法分解及join()方法等待的任务。ForkJoinTask有两个重要的子类:带返回值和不带返回值的两个抽象类。我们可以自定义任务,根据需要选择继承抽象类!
    计算0加到90000000L
     1 public class CountTask extends RecursiveTask<Long> {
     2     private static final int THRESHOLD = 10000;
     3     private long start;
     4     private long end;
     5  6     public CountTask(long start, long end) {
     7         this.start = start;
     8         this.end = end;
     9     }
    10 11     @Override
    12     protected Long compute() {
    13         long sum = 0;
    14         if (end - start <= 10000) {
    15             for (long i = start; i <= end; i++) {
    16                 sum += i;
    17             }
    18         } else {
    19             //将任务先分成100个部分
    20             long step = (end - start) / 100;
    21             long pos = start;
    22             List<CountTask> taskList = new ArrayList<>();
    23             for (int i = 0; i < 100; i++) {
    24                 long last = pos + step;
    25                 if (last > end)
    26                     last = end;
    27                 CountTask task = new CountTask(pos, last);
    28                 task.fork();
    29                 taskList.add(task);
    30                 pos = last + 1;
    31             }
    32 33             for (CountTask task : taskList) {
    34                 sum += task.join();
    35             }
    36         }
    37         return sum;
    38     }
    39 40     /**
    41      * 计算0加到90000000L
    42      * @param args
    43      * @throws ExecutionException
    44      * @throws InterruptedException
    45      */
    46     public static void main(String[] args) throws ExecutionException, InterruptedException {
    47         long start = System.currentTimeMillis();
    48         CountTask task = new CountTask(0, 90000000L);
    49         ForkJoinPool pool = new ForkJoinPool();
    50         ForkJoinTask<Long> result = pool.submit(task);
    51         long end = System.currentTimeMillis();
    52         System.out.println(end-start);
    53         System.out.println("result:"+result.get());
    54     }
    55 }
    View Code
     
    结果如下:
     
    如果使用单线程累加计算:
     1 public class CountDemo {
     2     public static void main(String[] args) {
     3         long start = System.currentTimeMillis();
     4         long sum = 0L;
     5         for (long i = 0; i <= 90000000L; i++) {
     6             sum += i;
     7         }
     8         long end = System.currentTimeMillis();
     9         System.out.println(end-start);
    10         System.out.println("result:"+sum);
    11     }
    12 }
    View Code
     
    还是可以看到明显的差别的。
     
     
    另外还有一点需要注意的是:fork方法会让子任务调用compute方法:
     1 /**
     2  * Fork/Join框架
     3  */
     4 public class ForkJoinDemo {
     5     public static void main(String[] args) throws ExecutionException, InterruptedException {
     6         ForkJoinPool pool = new ForkJoinPool();
     7         MyFork task = new MyFork(1, 100);
     8         ForkJoinTask<Integer> result = pool.submit(task);
     9         System.out.println(result.get());
    10         pool.shutdown();
    11     }
    12 }
    13 14 /**
    15  * 使用Fork/Join计算1加到100
    16  */
    17 class MyFork extends RecursiveTask<Integer> {
    18     private static final Integer JUDGE_VALUE = 10;
    19     private int start;
    20     private int end;
    21 22     public MyFork(int start, int end) {
    23         this.start = start;
    24         this.end = end;
    25     }
    26 27     @Override
    28     protected Integer compute() {
    29         Integer result = 0;
    30         if (end - start <= JUDGE_VALUE) {
    31             for (int i = start; i <= end; i++) {
    32                 result += i;
    33             }
    34         } else {
    35             System.out.println("fork");
    36             int mid = (end - start ) / 2 + start;
    37             MyFork task1 = new MyFork(start, mid);
    38             MyFork task2 = new MyFork(mid + 1, end);
    39             task1.fork();
    40             task2.fork();
    41             result = task1.join() + task2.join();
    42         }
    43         return result;
    44     }
    45 }
    View Code
    虽然在compute中只分成2个子任务,但是最终fork打印了15次,说明task1和task2实际上还被划分成了多个子任务执行。
  • 相关阅读:
    HIVE 技巧积累之合并重叠日期
    记一次hive版本升级
    【转】elasticsearch中字段类型默认显示{ "foo": { "type": "text", "fields": { "keyword": {"type": "keyword", "ignore_above": 256} }
    【转】深入理解Linux修改hostname
    VMware中 CentOS7挂载windows共享文件夹
    Apache版hadoop编译
    No route info of this topic
    java遍历文件夹及所有子文件
    关闭spring整合kafka时,消费者一直打印kafka日志
    (转)详解shell中>/dev/null 2>&1到底是什么
  • 原文地址:https://www.cnblogs.com/blogforvi/p/13437017.html
Copyright © 2011-2022 走看看