上篇博客我们介绍了通过CyclicBarrier使线程同步,可是上述方法存在一个问题,那就是假设一个大任务跑了2个线程去完毕。假设线程2耗时比线程1多2倍。线程1完毕后必须等待线程2完毕。等待的过程线程1没法复用。如今我们准备解决问题,我们希望线程1完毕自己的任务后能去帮助线程2完毕一部分任务。
Java7引如了Fork/Join框架能够非常好的解决问题。
Fork/Join是一个用于并行运行任务的框架,是一个把大任务切割成若干个小任务。最后汇总每一个小任务结果后得到大任务结果的框架。
fork是分叉。join是结合。
以下看张图来清晰的认识一下:
事实上Fork/Join本质上是分治算法的一种实现。以下我们来看怎么详细使用:
ForkJoinTask:我们要使用Fork/Join框架,必须首先创建一个Fork/Join任务。它提供在任务中运行fork()和join()操作的机制,通常情况下我们不须要直接继承ForkJoinTask类。而仅仅须要继承它的子类,Fork/Join框架提供了下面两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool:ForkJoinTask须要通过ForkJoinPool来运行,任务切割出的子任务会加入到当前工作线程所维护的双端队列中,进入队列的头部。
当一个工作线程的队列里临时没有任务时,它会随机从其它工作线程的队列的尾部获取一个任务。
以下,我们相同实现考试系统抽题的样例。
public class GetQuestionsTask extends RecursiveTask<List> { //參数map private Map map; //參数list==仅仅放题型 private List questionTypeList; public GetQuestionsTask(List questionTypeList,Map map) { this.questionTypeList = questionTypeList; this.map=map; } @Override protected List compute() { System.out.println(questionTypeList.size()); List list = new ArrayList(); if (questionTypeList.size() < 2) { // 抽题 list = getQuestions(questionTypeList,map); } else { int size = questionTypeList.size(); int mid = size / 2; GetQuestionsTask task1 = new GetQuestionsTask( questionTypeList.subList(0, mid),map); GetQuestionsTask task2 = new GetQuestionsTask( questionTypeList.subList(mid, size),map); invokeAll(task1, task2); try { list = groupResults(task1.get(), task2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return list; } //合并抽题结果 private List groupResults(List list1, List list2) { System.out.println(Thread.currentThread().getName()+"開始合并结果......"); // 合并返回结果 List list = new ArrayList(); list.addAll(list1); list.addAll(list2); System.out.println(Thread.currentThread().getName()+"合并结果结束......"); return list; } // 抽题 private List getQuestions(List questTypeList,Map map) { List list = new ArrayList(); for(int i=0;i<questTypeList.size();i++){ System.out.println(Thread.currentThread().getName()+"開始抽题......"+questionTypeList.get(i).toString()); //假数据,向list中放试题 list.add("0"); list.add("1"); list.add("2"); list.add("3"); list.add("4"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"抽题结束..."+questionTypeList.get(i).toString()); } return list; } }
client:
//该池的线程数量不会超过0*7fff个(32767) //池中维护着ForkJoinWorkerThread对象数组,数组大小由parallelism属性决定,parallelism默觉得处理器个数 ForkJoinPool pool = new ForkJoinPool(); GetQuestionsTask task = new GetQuestionsTask(questionTypeList, map); pool.execute(task); // 试题列表=task.get() try { List finalList = task.get(); System.out.println("终于结果个数:" + finalList.size()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
总结:
Fork/Join实现了“工作窃取算法”,当一个工作线程的队列里临时没有任务时。它会随机从其它工作线程的队列的尾部获取一个任务。当然,fork/join框架的使用有一定的约束条件:
1.除了fork() 和 join()方法外,线程不得使用其它的同步工具。
线程最好也不要sleep()
2.线程不得进行I/O操作
3.线程不得抛出checked exception。