zoukankan      html  css  js  c++  java
  • java-jdk7-forkjoin带有返回值

    来自并发编程网: http://ifeve.com/fork-join-3/

    如果这个任务必须解决一个超过预定义大小的问题,你应该将这个任务分解成更多的子任务,并且用Fork/Join框架来执行这些子任务。当这些子任务完成执行,发起的任务将获得所有子任务产生的结果 ,对这些结果进行分组,并返回最终的结果。最终,当在池中执行的发起的任务完成它的执行,你将获取整个问题地最终结果。

    1, 生成二维数组模拟文档: 

    package com.wenbronk.forkjoin.withresult;
    
    import java.util.Random;
    
    /**
     * Created by wenbronk on 2017/7/26.
     */
    public class Document {
    
        private String words[] = {"the", "hello", "goodbye", "pack", "java", "thread", "pool", "random", "class", "main"};
    
        public String[][] generateDocument(int numLines, int numWords, String word) {
            int counter = 0;
            String[][] document = new String[numLines][numWords];
            Random random = new Random();
    
            // 填充数组
            for (int i=0; i<numLines; i++){
                for (int j=0; j<numWords; j++) {
                    int index=random.nextInt(words.length);
                    document[i][j]=words[index];
                    if (document[i][j].equals(word)){
                        counter++;
                    }
                }
            }
            System.out.println(document.length + ": " + document[document.length - 1].length);
    
            System.out.println("DocumentMock: The word appears " + counter + " times in the document");
            return document;
        }
    
    }

    2, 对模拟文档的进行行拆分

    package com.wenbronk.forkjoin.withresult;
    
    import java.util.ArrayList;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * Created by wenbronk on 2017/7/26.
     */
    public class DocumentTask extends RecursiveTask<Integer> {
    
        private String[][] document;
        private int start, end;
        private String word;
    
        public DocumentTask(String[][] document, int start, int end, String word) {
            this.document = document;
            this.start = start;
            this.end = end;
            this.word = word;
        }
    
        @Override
        protected Integer compute() {
            int result = 0;
    
            if (end - start < 10) {
                result = processLines(document, start, end, word);
            } else {
                int mid = (start + end) / 2;
                DocumentTask task1 = new DocumentTask(document, start, mid, word);
                DocumentTask task2 = new DocumentTask(document, mid, end, word);
    
                invokeAll(task1, task2);
                try {
                    result = groupResults(task1.get(), task2.get());
                }catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    
        private int groupResults(Integer integer, Integer integer1) {
            return integer + integer1;
        }
    
        /**
         * 要查找的单词
         */
        private int processLines(String[][] document, int start, int end, String word) {
            ArrayList<LineTask> tasks = new ArrayList<>();
            for (int i = start; i < end; i++) {
                LineTask lineTask = new LineTask(document[i], 0, document[i].length, word);
                tasks.add(lineTask);
            }
    
            invokeAll(tasks);
            int result = 0;
    //        for (LineTask task : tasks) {
            for (int i = 0; i < tasks.size(); i++) {
                LineTask task = tasks.get(i);
                try {
    //                Thread.sleep(100);
                    result += task.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    }

    3, 对行进行单词拆分

    package com.wenbronk.forkjoin.withresult;
    
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 统计单词在一行出现的次数
     * Created by wenbronk on 2017/7/27.
     */
    public class LineTask extends RecursiveTask<Integer> {
        private static final long seriaVersionUID = 1L;
        private String[] line;
        private int start, end;
        private String word;
    
        public LineTask(String[] line, int start, int end, String word) {
            this.line = line;
            this.start = start;
            this.end = end;
            this.word = word;
        }
    
        @Override
        protected Integer compute() {
            Integer result = null;
            if (end - start < 100) {
                result = count(line, start, end, word);
            }else {
                int mid = (start + end) / 2;
                LineTask task1 = new LineTask(line, start, mid, word);
                LineTask task2 = new LineTask(line, mid, end, word);
                invokeAll(task1, task2);
                try {
                    result = groupResult(task1.get(), task2.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    
        /**
         * 合并2个数值的值, 返回结果
         * @return
         */
        private Integer groupResult(Integer num1, Integer num2) {
            return num1 + num2;
        }
    
        /**
         * 查找行中出现word的次数
         */
        private Integer count(String[] line, int start, int end, String word) {
            int counter = 0;
            for (int i = start; i < end; i++) {
                if (word.equals(line[i])) {
                    counter ++;
                }
            }
    //        try {
    //            Thread.sleep(10);
    //        } catch (Exception e) {
    //            e.printStackTrace();
    //        }
            return counter;
        }
    }

    4, 入口执行类

    package com.wenbronk.forkjoin.withresult;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by wenbronk on 2017/7/27.
     */
    public class CountMain {
    
        public static void main(String[] args) {
            Document document = new Document();
            String[][] documents = document.generateDocument(100, 1000, "random");
    
            DocumentTask task = new DocumentTask(documents, 0, 100, "random");
            ForkJoinPool pool = new ForkJoinPool();
            pool.execute(task);
    
            do {
                System.out.printf("******************************************
    ");
                System.out.printf("Main: Parallelism: %d
    ",pool.getParallelism());
                System.out.printf("Main: Active Threads: %d
    ",pool.getActiveThreadCount());
                System.out.printf("Main: Task Count: %d
    ",pool.getQueuedTaskCount());
                System.out.printf("Main: Steal Count: %d
    ",pool.getStealCount());
                System.out.printf("******************************************
    ");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } while (!task.isDone());
    
            pool.shutdown();
    
            try {
                System.out.printf("Main: The word appears %d in the document ", task.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    Hdoj 1203.I NEED A OFFER! 题解
    Hdoj 4508.湫湫系列故事——减肥记I 题解
    Hdoj 2191.悼念512汶川大地震遇难同胞——珍惜现在,感恩生活 题解
    Hdoj 1248.寒冰王座 题解
    idea配置(卡顿、开发环境等配置),code style template
    经典收藏链接(C#总结)
    C#设计模式:原型模式(Prototype)及深拷贝、浅拷贝
    值类型和引用类型深入理解
    Resharper的配置和使用
    批处理bat 命令
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7244736.html
Copyright © 2011-2022 走看看