zoukankan      html  css  js  c++  java
  • ForkJoin使用

    一、Fork Join    
    分而治之的办法

    JDk为Fork/Join框架提供了很好的支持,我们想要用这个算法首先得创建一个Fork/Join任务,在JDK中这个任务就叫做:ForJoinTask,只要继承这个类就可以创建一个任务类,但是实际使用中并不是直接继承ForkJoinTask类,而是继承它的子类,它有两个子类,分别是RecursiveAction和RecursiveTask,它们之间的区别是是否返回任务结果,前者用于没有返回结果的任务,后者用于有返回结果的任务。

    ForkJoinPool
    RecursiveTask
    RecursiveAction

    二、RecursiveTask使用

    ForkJoinPoolTest:

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.ForkJoinPool;
     4 import com.cy.java8.AccumulatorRecursiveAction.AccumulatorHelper;
     5 
     6 public class ForkJoinPoolTest {
     7 
     8     private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
     9 
    10     public static void main(String[] args) {
    11         System.out.println("result = " + calc());
    12 
    13         AccumulatorRecursiveTask task = new AccumulatorRecursiveTask(0, data.length - 1, data);
    14         ForkJoinPool forkJoinPool = new ForkJoinPool();
    15         Integer result = forkJoinPool.invoke(task);     //同步调用
    16         System.out.println("AccumulatorRecursiveTask result = " + result);
    17 
    18         AccumulatorRecursiveAction action = new AccumulatorRecursiveAction(0, data.length - 1, data);
    19         forkJoinPool.invoke(action);
    20         System.out.println("AccumulatorRecursiveAction result = " + AccumulatorHelper.getResult());
    21         AccumulatorHelper.reset();
    22     }
    23 
    24     /**
    25      * 普通方法计算data的和
    26      */
    27     private static int calc() {
    28         int result = 0;
    29         for (int i = 0; i < data.length; i++) {
    30             result += data[i];
    31         }
    32         return result;
    33     }
    34 }

    AccumulatorRecursiveTask

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.RecursiveTask;
     4 
     5 /**
     6  * 继承RecursiveTask,用于与结果返回的任务
     7  */
     8 public class AccumulatorRecursiveTask extends RecursiveTask<Integer> {
     9 
    10     //累加的起始下标值
    11     private final int start;
    12     private final int end;
    13     private final int[] data;
    14 
    15     //阈值为3,当小于等于这个值的时候进行计算
    16     private final int LIMIT = 3;
    17 
    18     public AccumulatorRecursiveTask(int start, int end, int[] data) {
    19         this.start = start;
    20         this.end = end;
    21         this.data = data;
    22     }
    23 
    24     @Override
    25     protected Integer compute() {
    26         if (end - start < LIMIT) {
    27             int result = 0;
    28             for (int i = start; i <= end; i++) {
    29                 result += data[i];
    30             }
    31             return result;
    32         }
    33 
    34         int mid = (start + end) / 2;
    35 
    36         //分为两个任务
    37         AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data);
    38         AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid + 1, end, data);
    39         /*
    40         //执行子任务
    41         left.fork();
    42         right.fork();
    43         //等待子任务完成
    44         Integer leftResult = left.join();
    45         Integer rightResult = right.join();
    46         //合并子任务
    47         return rightResult + leftResult;
    48         */
    49 
    50         //提交任务
    51         invokeAll(left, right);
    52         return left.join() + right.join();
    53     }
    54 }

    三、RecursiveAction使用  

     1 package com.cy.java8;
     2 
     3 import java.util.concurrent.RecursiveAction;
     4 import java.util.concurrent.atomic.AtomicInteger;
     5 
     6 public class AccumulatorRecursiveAction extends RecursiveAction {
     7     private int start;
     8     private int end;
     9     private int[] data;
    10     private final int LIMIT = 3;
    11 
    12     public AccumulatorRecursiveAction(int start, int end, int[] data) {
    13         this.start = start;
    14         this.end = end;
    15         this.data = data;
    16     }
    17 
    18     @Override
    19     protected void compute() {
    20         if (end - start < LIMIT) {
    21             for (int i = start; i <= end; i++) {
    22                 AccumulatorHelper.accumulate(data[i]);
    23             }
    24         } else {
    25             int mid = (start + end) / 2;
    26             AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data);
    27             AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid + 1, end, data);
    28             /*left.fork();
    29             right.fork();
    30             left.join();
    31             right.join();*/
    32             invokeAll(left, right);
    33         }
    34     }
    35 
    36     static class AccumulatorHelper{
    37         private static final AtomicInteger result = new AtomicInteger(0);
    38 
    39         private static void accumulate(int value){
    40             result.getAndAdd(value);
    41         }
    42 
    43         public static int getResult(){
    44             return result.get();
    45         }
    46 
    47         public static void reset(){
    48             result.set(0);
    49         }
    50     }
    51 }

    ForkJoinPoolTest的执行结果:

    result = 55
    AccumulatorRecursiveTask result = 55
    AccumulatorRecursiveAction result = 55

     注:

    1、RecursiveAction 提供的方法 invokeAll()。它表示:启动所有的任务,并在所有任务都正常结束后返回。如果其中一个任务出现异常,则其它所有的任务都取消。invokeAll() 的参数还可以是任务的数组。

    -----------

  • 相关阅读:
    linux 进程学习笔记-运行新进程
    linux 进程学习笔记-进程状态
    linux 进程学习笔记-进程调度
    linux 进程学习笔记-进程退出/终止进程
    linux 进程学习笔记-暂停进程
    linux 进程学习笔记-进程跟踪
    linux 进程学习笔记-等待子进程结束
    linux 进程学习笔记-进程pipe管道
    linux 进程学习笔记-named pipe (FIFO)命名管道
    linux 进程学习笔记-进程信号sigal
  • 原文地址:https://www.cnblogs.com/tenWood/p/11600944.html
Copyright © 2011-2022 走看看