zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(12)----Fork/Join框架

    1. Fork/Join 的概念

      Fork指的是将系统进程分成多个执行分支(线程),Join即是等待,当fork()方法创建了多个线程之后,需要等待这些分支执行完毕之后,才能得到最终的结果,因此join就表示等待。在实际的使用中,如果毫无顾忌的使用fork()来开启线程,可能会由于线程过多而影响系统性能,因此在jdk中提供了ForkJoinPool线程池来对fork进行处理,以节省资源。

      对于线程池来说,提交的任务数量并不总是与线程数相等的,大多数情况下一个物理线程可能需要执行多个逻辑任务。所以每个线程必然会有一个任务队列。在实际的执行过程中,可能会出现A线程已经执行完成队列中的所有任务了,但是B线程中还有很多任务等着执行,此时A线程就会从B线程中拿到任务过来处理,尽可能的达到平衡。需要注意的是,当线程开始帮助别的线程执行任务时,总会从其他的线程任务队列的底部开始拿,而线程执行自己任务的时候,总会从队列的顶部开始拿,这样就你能有效的避免了线程之间数据的竞争。

    2. 使用方式

      在看使用方式之前,先来看ForkJoinPool的一个重要的接口:

      

     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

      它接收一个ForkJoinTask作为任务,ForkJoinTask就是持有fork()分解和join()等待的任务。它有两个重要的子类,即RecursiveTask有返回值任务 和RecursiveAction无返回值任务,在使用的时候只需要根据场景继承它的的两个子类之一即可。示例代码为带返回值的任务的使用方式:

    package com.wangx.thread.t7;
    
    import java.util.ArrayList;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 求和
     */
    public class CountTask extends RecursiveTask<Long> {
    
        //任务分配标准
        private static final int THRESHOLD = 1000;
        private long start;
        private long end;
    
        public CountTask(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        /**
         * 分而治之,分成多个小任务执行,最后再执行汇总
         * @return
         */
        @Override
        public Long compute() {
            long sum = 0;
            boolean canCompute = (end - start) < THRESHOLD;
            //开始和结束之间间隔小于1000
            if (canCompute) {
                //执行数据求和
                for (long i = start; i <= end; i++) {
                    sum += i;
                }
            } else {
                //分成100个小任务
                long step = (start + end) / 100;
                ArrayList<CountTask> countTaskList = new ArrayList<>();
    
                long pos = start;
    
                for (int i = 0; i < 100; i++) {
                    long lastOne = pos + step;
                    CountTask task = new CountTask(pos, lastOne);
                    pos += step+1;
                    countTaskList.add(task);
                    //开启子任务
                    task.fork();
                }
                //等待所有子任务都执行完毕后再对子任务进行求和
                for (CountTask countTask : countTaskList) {
                    sum += countTask.join();
                }
            }
            return sum;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
    
            CountTask countTask = new CountTask(0, 20000L);
    
            ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(countTask);
    
            try {
                long res = forkJoinTask.get();
                System.out.println("sum=" + res );
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

      该示例是用于计算数列和的,必然是需要返回值的,正好可以继承RecursiveTask的方式来使用,使用forkJoinPool.submit()将任务交给线程池,线程池返回一个ForkJoinTask对象,调用该对象的get()方法获取结果,当在执行get()方法时任务没有执行完成,则主线程会一直等待所有线程执行完成。

      compute()方法主要对任务进行分解,当求和数量大于THRESHOLD个时,就需要再次分解任务,否则直接求和,分解任务时,简单的将原有的任务分成100个小任务,并使用fork()方法提交,再然后调用join()方法等待所有任务执行完成后,最后对每个子任务的结果再次进行求和,得到最终结果。

    原文 并发编程学习笔记(12)----Fork/Join框架

  • 相关阅读:
    366. Find Leaves of Binary Tree输出层数相同的叶子节点
    716. Max Stack实现一个最大stack
    515. Find Largest Value in Each Tree Row查找一行中的最大值
    364. Nested List Weight Sum II 大小反向的括号加权求和
    156. Binary Tree Upside Down反转二叉树
    698. Partition to K Equal Sum Subsets 数组分成和相同的k组
    244. Shortest Word Distance II 实现数组中的最短距离单词
    187. Repeated DNA Sequences重复的DNA子串序列
    java之hibernate之基于主键的双向一对一关联映射
    java之hibernate之基于主键的单向一对一关联映射
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10868899.html
Copyright © 2011-2022 走看看