一.ForkJoinPool是什么
ForkJoinPool 是 Java 7 中引入的并发库类。它通常是以递归的方式运行,采用分治思想将大任务分割成几个小任务,小任务继续分割成更小的任务,直至任务不可分割,然后运行这些任务。
通常Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask(通常实现其三个抽象子类)为任务、ForkJoinWorkerThread作为执行任务的具体线程实体这三者构成的任务调度机制。
通俗的说,ForkJoin框架的作用主要是为了实现将大型复杂任务进行递归的分解,直到任务足够小才直接执行,从而递归的返回各个足够小的任务的结果汇集成一个大任务的结果,
依次类推最终得出最初提交的那个大型复杂任务的结果,这和方法的递归调用思想是一样的。
当然ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制。
ThreadPoolExecutor执行的任务是Future的实现类FutureTask、执行线程的实体是内部类Worker。
ForkJoinPool执行的任务就是Future的实现类ForkJoinTask、执行线程就是ForkJoinWorkerThread。
二.ForkJoinPool能做什么
ForkJoinPool 的适用范围不大,仅限于某任务能被分解成多个子任务,且这些子任务运行的结果可以合并成最终结果。
ForkJoinPool 中实现了一种工作窃取算法,所谓工作窃取指的是闲置线程的任务队列空了,就从其他忙碌线程中的任务队列中处理任务。
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。特别适合用于“分而治之”,递归计算的算法。
三.ForkJoinPool原理
ForkJoinPool
是 Java SE 7 新功能“分叉/结合框架”的核心类。
分叉/结合框架是一个比较特殊的线程池框架,专用于需要将一个任务不断分解成子任务(分叉),再不断进行汇总得到最终结果(结合)的计算过程。
比起传统的线程池类ThreadPoolExecutor
,ForkJoinPool
实现了工作窃取算法,使得空闲线程能够主动分担从别的线程分解出来的子任务,从而让所有的线程都尽可能处于饱满的工作状态,提高执行效率。
ForkJoinPool
提供了三类方法来调度子任务:
(1)execute
系列异步执行指定的任务。
(2)invoke
和 invokeAll
执行指定的任务,等待完成,返回结果。
(3)submit
系列异步执行指定的任务并立即返回一个 Future
对象。
子任务由 ForkJoinTask
的实例来代表。它是一个抽象类,JDK 为我们提供了两个实现:RecursiveTask
和 RecursiveAction
,分别用于需要和不需要返回计算结果的子任务。
ForkJoinTask
提供了三个静态的 invokeAll
方法来调度子任务,注意只能在 ForkJoinPool
执行计算的过程中调用它们。
ForkJoinPool
和 ForkJoinTask
还提供了很多让人眼花缭乱的公共方法,其实它们大多数都是其内部实现去调用的,对于应用开发人员来说意义不大。
下面以统计 D 盘文件个数为例。这实际上是对一个文件树的遍历,我们需要递归地统计每个目录下的文件数量,最后汇总,非常适合用分叉/结合框架来处理:
ForkJoinPool 实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,
如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool 的运行模式如下:
ForkJoinPool需要使用相对少的线程来处理大量的任务。
比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。
以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。
比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。
那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。
而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。
但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
ForkJoinPool ExecutorForkJoinPool组成类:
(1)ForkJoinPool:
充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。
它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。
它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。
(2)ForkJoinWorkerThread:
fork/join里面真正干活的"工人",本质是一个线程。
里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。
然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。
(3)ForkJoinPool.WorkQueue:
双端队列就是它,它负责存储接收的任务。
(4)ForkJoinTask:
代表fork/join里面任务类型,一般用它的两个子类RecursiveTask、RecursiveAction。
这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。
工作方式:使用一种分治算法,递归地将任务分割成更小的子任务,其中阈值可配置,然后把子任务分配给不同的线程执行并发执行,最后再把结果组合起来。该用法常见于数组与集合的运算。
由于提交的任务不一定能够递归地分割成ForkJoinTask,且ForkJoinTask执行时间不等长,所以ForkJoinPool使用一种工作窃取的算法,允许空闲的线程“窃取”分配给另一个线程的工作。
由于工作无法平均分配并执行。所以工作窃取算法能更高效地利用硬件资源。
流程图细节:
四.ForkJoinPool使用
问题:计算1至10000000的正整数之和。
方案一:最为普通的for循环解决
最简单的,显然是不使用任何并行编程的手段,只用最直白的 for-loop 来实现。为了面向接口编程,下面我们把计算的方法定义成接口,不同的方案书写不同的实现即可。
下面就是具体的实现代码:
public interface Calculator { /** * 把传进来的所有numbers 做求和处理 * * @param numbers * @return 总和 */ long sumUp(long[] numbers); }
public class ForLoopCalculator implements Calculator { @Override public long sumUp(long[] numbers) { long total = 0; for (long i : numbers) { total += i; } return total; } }
public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 10000000).toArray(); Instant start = Instant.now(); Calculator calculator = new ForLoopCalculator(); long result = calculator.sumUp(numbers); Instant end = Instant.now(); System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms"); System.out.println("结果为:" + result); }
运行结果:
输出:
耗时:10ms
结果为:50000005000000
方案二:ExecutorService多线程方式实现
在 Java 1.5 引入 ExecutorService 之后,基本上已经不推荐直接创建 Thread 对象,而是统一使用 ExecutorService。
毕竟从接口的易用程度上来说 ExecutorService 就远胜于原始的 Thread,更不用提 java.util.concurrent 提供的数种线程池,Future 类,Lock 类等各种便利工具。
由于上面是面向接口的设计,因此只需要加一个使用 ExecutorService 的实现类:
public class ExecutorServiceCalculator implements Calculator { private int parallism; private ExecutorService pool; public ExecutorServiceCalculator() { parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心数 默认就用cpu核心数了 pool = Executors.newFixedThreadPool(parallism); } //处理计算任务的线程 private static class SumTask implements Callable<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override public Long call() { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; } } @Override public long sumUp(long[] numbers) { List<Future<Long>> results = new ArrayList<>(); // 把任务分解为 n 份,交给 n 个线程处理 4核心 就等分成4份呗 // 然后把每一份都扔个一个SumTask线程 进行处理 int part = numbers.length / parallism; for (int i = 0; i < parallism; i++) { int from = i * part; //开始位置 int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //结束位置 //扔给线程池计算 results.add(pool.submit(new SumTask(numbers, from, to))); } // 把每个线程的结果相加,得到最终结果 get()方法 是阻塞的 // 优化方案:可以采用CompletableFuture来优化 JDK1.8的新特性 long total = 0L; for (Future<Long> f : results) { try { total += f.get(); } catch (Exception ignore) { } } return total; } }
public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 10000000).toArray(); Instant start = Instant.now(); Calculator calculator = new ExecutorServiceCalculator(); long result = calculator.sumUp(numbers); Instant end = Instant.now(); System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms"); System.out.println("结果为:" + result); // 打印结果500500 }
运行结果:
输出:
耗时:30ms
结果为:50000005000000
方案三:采用ForkJoinPool(Fork/Join)
代码如下:
public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; //执行任务RecursiveTask:有返回值 RecursiveAction:无返回值 private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } //此方法为ForkJoin的核心方法:对任务进行拆分 拆分的好坏决定了效率的高低 @Override protected Long compute() { // 当需要计算的数字个数小于6时,直接采用for loop方式计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; } else { // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定 int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator() { // 也可以使用公用的线程池 ForkJoinPool.commonPool(): // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1)); pool.shutdown(); return result; } }
运行结果:
输出:
耗时:390ms
结果为:50000005000000
可以看出,使用了 ForkJoinPool 的实现逻辑全部集中在了 compute() 这个函数里,仅用了14行就实现了完整的计算过程。
特别是,在这段代码里没有显式地“把任务分配给线程”,只是分解了任务,而把具体的任务到线程的映射交给了 ForkJoinPool 来完成。
方案四:采用并行流(JDK8以后的推荐做法)
public static void main(String[] args) { Instant start = Instant.now(); long result = LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum); Instant end = Instant.now(); System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms"); System.out.println("结果为:" + result); // 打印结果500500 }
运行结果:
输出:
耗时:130ms
结果为:50000005000000
并行流底层还是Fork/Join框架,只是任务拆分优化得很好。
耗时效率方面解释:Fork/Join 并行流等当计算的数字非常大的时候,优势才能体现出来。也就是说,如果你的计算比较小,或者不是CPU密集型的任务,不太建议使用并行处理。
根据上面的示例代码,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。我们可以根据函数名假设一下 fork() 和 join() 的作用:
fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
join():等待该任务的处理线程处理完毕,获得返回值。