概念
从JDK1.7开始,Java提供ForkJoin框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。作为一个并发框架在jdk7的时候就加入到了我们的java并发包java.util.concurrent中,并且在java 8 的lambda并行流中充当着底层框架的角色。
思维导图
核心类介绍
-
ForkJoinPool:充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。
-
ForkJoinWorkerThread:fork/join里面真正干活的"工人",本质是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。
-
ForkJoinPool.WorkQueue: 双端队列就是它,它负责存储接收的任务。
-
ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。
代码实现
我们以将一个0到10000的List集合的每个元素进行打印为例:
实现类:
package com.forkjoin.test.task; import java.util.List; import java.util.concurrent.RecursiveTask; /** * forkjoin拆分任务实现 */ public class ForkJoinTask extends RecursiveTask<List<Integer>> { private List<Integer> integerList; public ForkJoinTask(List<Integer> integerList) { this.integerList = integerList; } @Override protected List<Integer> compute() { System.out.println("数据条数:" + integerList.size()); // 当数据条数大于100时,将任务拆分 if(integerList.size() > 100) { int minSize = integerList.size() / 2; List<Integer> leftList = integerList.subList(0, minSize); List<Integer> rightList = integerList.subList(minSize, integerList.size()); // 拆分出两个任务 并行执行 ForkJoinTask leftTask = new ForkJoinTask(leftList); ForkJoinTask rightTask = new ForkJoinTask(rightList); this.invokeAll(leftTask,rightTask); }else { // 一直到拆分出的数据不大于100条时,执行任务 integerList.stream().forEach(x -> { System.out.println(x.toString()); }); } return null; } }
调用类:
package com.forkjoin.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import com.forkjoin.test.task.ForkJoinTask; public class ForkJoinTest { public static void main(String[] args) { // 生成一个0-10000的整数集合 List<Integer> list = new ArrayList<>(); for (int i = 0; i <= 10000; i++) { list.add(i); } // forkjoin拆分任务调用 ForkJoinTask task = new ForkJoinTask(list); ForkJoinPool pool=ForkJoinPool.commonPool(); pool.submit(task); pool.shutdown(); } }