来自并发编程网: http://ifeve.com/fork-join-3/
如果这个任务必须解决一个超过预定义大小的问题,你应该将这个任务分解成更多的子任务,并且用Fork/Join框架来执行这些子任务。当这些子任务完成执行,发起的任务将获得所有子任务产生的结果 ,对这些结果进行分组,并返回最终的结果。最终,当在池中执行的发起的任务完成它的执行,你将获取整个问题地最终结果。
1, 生成二维数组模拟文档:
package com.wenbronk.forkjoin.withresult; import java.util.Random; /** * Created by wenbronk on 2017/7/26. */ public class Document { private String words[] = {"the", "hello", "goodbye", "pack", "java", "thread", "pool", "random", "class", "main"}; public String[][] generateDocument(int numLines, int numWords, String word) { int counter = 0; String[][] document = new String[numLines][numWords]; Random random = new Random(); // 填充数组 for (int i=0; i<numLines; i++){ for (int j=0; j<numWords; j++) { int index=random.nextInt(words.length); document[i][j]=words[index]; if (document[i][j].equals(word)){ counter++; } } } System.out.println(document.length + ": " + document[document.length - 1].length); System.out.println("DocumentMock: The word appears " + counter + " times in the document"); return document; } }
2, 对模拟文档的进行行拆分
package com.wenbronk.forkjoin.withresult; import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.RecursiveTask; /** * Created by wenbronk on 2017/7/26. */ public class DocumentTask extends RecursiveTask<Integer> { private String[][] document; private int start, end; private String word; public DocumentTask(String[][] document, int start, int end, String word) { this.document = document; this.start = start; this.end = end; this.word = word; } @Override protected Integer compute() { int result = 0; if (end - start < 10) { result = processLines(document, start, end, word); } else { int mid = (start + end) / 2; DocumentTask task1 = new DocumentTask(document, start, mid, word); DocumentTask task2 = new DocumentTask(document, mid, end, word); invokeAll(task1, task2); try { result = groupResults(task1.get(), task2.get()); }catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result; } private int groupResults(Integer integer, Integer integer1) { return integer + integer1; } /** * 要查找的单词 */ private int processLines(String[][] document, int start, int end, String word) { ArrayList<LineTask> tasks = new ArrayList<>(); for (int i = start; i < end; i++) { LineTask lineTask = new LineTask(document[i], 0, document[i].length, word); tasks.add(lineTask); } invokeAll(tasks); int result = 0; // for (LineTask task : tasks) { for (int i = 0; i < tasks.size(); i++) { LineTask task = tasks.get(i); try { // Thread.sleep(100); result += task.get(); } catch (Exception e) { e.printStackTrace(); } } return result; } }
3, 对行进行单词拆分
package com.wenbronk.forkjoin.withresult; import java.util.concurrent.RecursiveTask; /** * 统计单词在一行出现的次数 * Created by wenbronk on 2017/7/27. */ public class LineTask extends RecursiveTask<Integer> { private static final long seriaVersionUID = 1L; private String[] line; private int start, end; private String word; public LineTask(String[] line, int start, int end, String word) { this.line = line; this.start = start; this.end = end; this.word = word; } @Override protected Integer compute() { Integer result = null; if (end - start < 100) { result = count(line, start, end, word); }else { int mid = (start + end) / 2; LineTask task1 = new LineTask(line, start, mid, word); LineTask task2 = new LineTask(line, mid, end, word); invokeAll(task1, task2); try { result = groupResult(task1.get(), task2.get()); } catch (Exception e) { e.printStackTrace(); } } return result; } /** * 合并2个数值的值, 返回结果 * @return */ private Integer groupResult(Integer num1, Integer num2) { return num1 + num2; } /** * 查找行中出现word的次数 */ private Integer count(String[] line, int start, int end, String word) { int counter = 0; for (int i = start; i < end; i++) { if (word.equals(line[i])) { counter ++; } } // try { // Thread.sleep(10); // } catch (Exception e) { // e.printStackTrace(); // } return counter; } }
4, 入口执行类
package com.wenbronk.forkjoin.withresult; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; /** * Created by wenbronk on 2017/7/27. */ public class CountMain { public static void main(String[] args) { Document document = new Document(); String[][] documents = document.generateDocument(100, 1000, "random"); DocumentTask task = new DocumentTask(documents, 0, 100, "random"); ForkJoinPool pool = new ForkJoinPool(); pool.execute(task); do { System.out.printf("****************************************** "); System.out.printf("Main: Parallelism: %d ",pool.getParallelism()); System.out.printf("Main: Active Threads: %d ",pool.getActiveThreadCount()); System.out.printf("Main: Task Count: %d ",pool.getQueuedTaskCount()); System.out.printf("Main: Steal Count: %d ",pool.getStealCount()); System.out.printf("****************************************** "); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone()); pool.shutdown(); try { System.out.printf("Main: The word appears %d in the document ", task.get()); } catch (Exception e) { e.printStackTrace(); } } }