zoukankan      html  css  js  c++  java
  • JAVA并发工具类---------------(Fork/Join)

    Fork/Join

    分而治之

    将一个大任务分成数个小任务执行,然后将这些小人物执行后的结果进行join汇总;

    (假设:你要计算1到1000的总和,你可以把它分成1-100,101-200,......,901-1000几组完成,然后再把这几组的结果相加)

    工作窃取

    • 有一个较大的任务划分成了10个小任务。
    • 这10个小任务在一个大小为2的线程池中执行。
    • 线程池中的2个核心线程,每个线程的队列中有5个任务。
    • 线程1的任务都很简单,所以它很快就将5个任务执行完毕。
    • 线程2的任务都很复杂,当线程1执行完5个任务时,他才执行了3个任务。
    • 这时,线程1不会空闲,而且窃取线程2的等待队列中的任务(从末端开始窃取)来执行。
    • 当线程2的队列中也没有了任务之后,线程1和线程2才空闲。

    (假设:你和同事执行相同的任务,你执行的任务快,但是你的同事执行很慢,你把你的任务执行完成后,你会帮你的同事执行一部分任务,然后再偷偷将完成的任务放在你同事的任务完成列表里面)

     

    ForkJoin的主要类

    ForkJoinPool:ForkJoin线程池,实现了ExecutorService接口和工作窃取算法,用于线程调度与管理。
    ForkJoinTask:ForkJoin任务,提供了fork()方法和join()方法。通常不直接使用,而是使用以下子类: 
      RecursiveAction:无返回值的任务,通常用于只fork不join的情形。
      RecursiveTask:有返回值的任务,通常用于fork+join的情形。

    ForkJoin的使用

    一、 创建Task
    使用ForkJoin框架,需要创建一个ForkJoin的任务,而ForkJoinTask是一个抽象类,我们不需要去继承ForkJoinTask进行使用。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。其中RecursiveAction没有返回结果,RecursiveTask执行后是有返回结果,看需使用。

    二、使用ForkJoinPool进行执行
    task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,
    进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作窃取)。

    TASK任务类

    package com.qr.fork_join.ListTskDemo;
    
    import java.util.List;
    import java.util.concurrent.RecursiveTask;
    
    //
    public class DemoTask extends RecursiveTask<Integer> {
    
        //传入的参数
        final List<Integer> list;
    
        public DemoTask(List<Integer> list) {
            this.list = list;
        }
    
    
        //需要执行的逻辑
        @Override
        protected Integer compute() {
            //分组条件
            if (list.size()<=10){
                //分组后需要执行的逻辑--计算总和
                int sum=0;
                for (Integer integer : list) {
                    sum+=integer;
                }
                return sum;
            }
            // 执行子任务
            DemoTask task1=new DemoTask(list.subList(0, list.size() / 2) );
            DemoTask task2=new DemoTask( list.subList(list.size() / 2, list.size()) );
    
            //等待任务执行结束合并其结果
            task1.fork();
            task2.fork();
            //也可以使用 invokeAll(task1, task2);
    
            // 合并子任务
            return task1.join()+task2.join();
        }
    }

    主线程任务类

    package com.qr.fork_join.ListTskDemo;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class DemoTest {
        public static void main(String[] args) {
            long startTime1=System.currentTimeMillis();   //获取开始时间
            List<Integer> list=new ArrayList<Integer>();
            //正确的返回参数
            int sum=0;
            for (int i = 0; i <1000 ; i++) {
                sum+=i;
                list.add(i);
            }
            long endTime1=System.currentTimeMillis();   //获取开始时间
            System.out.println("单线程用时:"+(endTime1-startTime1)+"ms");
    
            long startTime2=System.currentTimeMillis();   //获取开始时间
            ForkJoinPool pool=new ForkJoinPool();
            DemoTask demoTask=new DemoTask(list);
            pool.submit(demoTask);
            try {
                //使用forkjoin框架返回的参数
                System.out.println(demoTask.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            long endTime2=System.currentTimeMillis();   //获取开始时间
            System.out.println("多线程用时:"+(endTime2-startTime2)+"ms");
        }
    }

    执行结果

    单线程用时:0ms
    499500
    多线程用时:76ms

      重点注意

    需要特别注意的是:

    1. ForkJoinPool 使用submit 或 invoke 提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。
    2. 这里继承的是RecursiveTask,还可以继承RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景
    3. 这一点是最容易忽略的地方,其实这里执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法。

      leftTask.fork();  
      rightTask.fork();
      
      替换为
      
      invokeAll(leftTask, rightTask);

    具体说一下原理:对于Fork/Join模式,假如Pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样可以更好的利用线程池,缩短执行的时间。

     

     

    ForkJoinTask

    fork 方法

    • fork() 做的工作只有一件事,既是把任务推入当前工作线程的工作队列里。

     

    1 public final ForkJoinTask<V> fork() {
    2     Thread t;
    3     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    4         ((ForkJoinWorkerThread)t).workQueue.push(this);
    5     else
    6         ForkJoinPool.common.externalPush(this);
    7     return this;
    8 }

    join 方法

    •  join() 的工作则复杂得多,也是它可以使得线程免于被阻塞的原因。
    1. 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。

    2. 查看任务的完成状态,如果已经完成,直接返回结果。
    3. 如果任务尚未完成,但处于自己的工作队列内,则完成它。
    4. 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式)执行,以期帮助它早日完成预 join 的任务。
    5. 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 Join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
    6. 递归地执行第 5 步。

    ForkJoinPool

    execute方法

    异步,不返回结果

    invoke方法

    同步,返回结果

    submit方法

    异步,返回结果

     

     

     

  • 相关阅读:
    区别Lua模式匹配中 %a+ 与 .-
    将硬件规定的通信协议用Lua实现(涉及到很多Lua通信的数据转换)
    Lua库-string库
    Unity3d
    Unity3d
    Unity3d
    Unity3d
    Unity3d
    Unity3d
    Unity3d
  • 原文地址:https://www.cnblogs.com/jiuhaoyun/p/11530497.html
Copyright © 2011-2022 走看看