Java7引入了ForkJoinPool框架。这个框架的主要应用场景是把大任务拆解成小的任务并行执行。先看看ForkJoinPool的类结构和RecursiveTask的类结构:
再看一下API文档
我们主要关注文档中的几个方法,也是我们经常会用到的几个方法。(ps:ForkJoinPool适用于CPU计算密集型的任务)
execute()、invoke()、submit() 这三个方法都是执行任务的方法。其中,要想获取返回值,我们就调用invoke()和submit()方法。
我们先看一个例子,看看ForkJoinPool是怎么使用的。我们有一个业务场景,并发采集用户信息和配偶信息。先写一个类,这个类要继承RecursiveTask。然后实例化之后才能交给ForkJoinPool去执行。
class TaskTest extends RecursiveTask<ResultBean> { @Override protected ResultBean compute() { RecursiveTask<String> userInfoTask = new RecursiveTask<String>() { @Override protected String compute() { System.out.println("开始采集用户信息"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //模拟获取用户信息 return "{'username':'zs', 'gender': 'men'}"; } }; RecursiveTask<String> spouseInfoTask = new RecursiveTask<String>() { @Override protected String compute() { System.out.println("开始采集配偶信息"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "{'spouseName':'ch', 'gender':'women'}"; } };
//执行任务用invokeAll()或者fork() invokeAll(userInfoTask, spouseInfoTask);
//userInfoTask.fork();
//spouseInfoTask.fork(); String userInfoJoin = userInfoTask.join(); String spouseInfoJoin = spouseInfoTask.join(); return new ResultBean(userInfoJoin, spouseInfoJoin); } }
fork()方法,开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
join()方法,等待该任务线程处理完毕,获取返回结果。
ResultBean类。省略的setter和getter方法。
class ResultBean { /** 用户信息 */ private String userInfo; /** 配偶信息 */ private String spouseInfo; public ResultBean() { } public ResultBean(String userInfo, String spouseInfo) { this.userInfo = userInfo; this.spouseInfo = spouseInfo; } }
定义好了,写个main方法测试一下
public class ForkJoinTest { public static void main(String[] args) { TaskTest taskTest = new TaskTest(); //1、invoke方法 // ResultBean bean = ForkJoinPool.commonPool().invoke(taskTest); // System.out.println("结果:" + bean); //2、submit方法 ForkJoinTask<ResultBean> submitResult = ForkJoinPool.commonPool().submit(taskTest); try { //直接调用getRawResult方法返回的值是null submitResult.getRawResult(); //1、没有等待超时时间的 ResultBean bean = submitResult.get(); System.out.println("get:" + bean); //2、有等待超时时间的 //submitResult.get(1000, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
参考:
【1】http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
【2】并发编程网,http://ifeve.com/tag/forkjoinpool/
【3】廖雪峰官方网站,https://www.liaoxuefeng.com/wiki/1252599548343744/1306581226487842