zoukankan      html  css  js  c++  java
  • Java线程之CompletionService批处理任务

    如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果,怎么办呢?

    为此你可以保存与每个任务相关联的Future,然后不断地调用 timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务 (Completion service)。

    CompletionService整合了Executor和BlockingQueue的功能。

    你可以将Callable任务提交给它去执行,然 后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。 ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

    ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。 计算完成时会调用FutureTask中的done方法。

    当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是 FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了 BlockingQueue,它会在结果不可用时阻塞。

    直接看demo:

    package javademo;

    import java.util.Random;  

    import java.util.concurrent.BlockingQueue;  

    import java.util.concurrent.Callable;  

    import java.util.concurrent.CompletionService;  

    import java.util.concurrent.ExecutionException;  

    import java.util.concurrent.ExecutorCompletionService;  

    import java.util.concurrent.ExecutorService;  

    import java.util.concurrent.Executors;  

    import java.util.concurrent.Future;  

    import java.util.concurrent.LinkedBlockingQueue;  

      /***

       * 两钟方式出来线程运行结果

       * @author think

       *

       */

    public class CompletionServiceTest {  

        public static void main(String[] args) throws Exception {  

        CompletionServiceTest cst = new CompletionServiceTest();  

        cst.count1();  

        cst.count2();  

        }  

          /***

            * 使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  

            * @throws Exception

            */

        public void count1() throws Exception{  

            ExecutorService exec = Executors.newCachedThreadPool();  

            BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  

            for(int i=0; i<10; i++){  

                Future<Integer> future =exec.submit(getTask());  

                queue.add(future);  

            }  

            int sum = 0;  

            int queueSize = queue.size();  

            for(int i=0; i<queueSize; i++){  

                sum += queue.take().get();  

            }  

            System.out.println("总数为:"+sum);  

            exec.shutdown();  

        }  

    /***

     * 使用CompletionService(完成服务)保持Executor处理的结果  

     * @throws InterruptedException

     * @throws ExecutionException

     */

        public void count2() throws InterruptedException, ExecutionException{  

            ExecutorService exec = Executors.newCachedThreadPool();  

            CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  

            for(int i=0; i<10; i++){  

                execcomp.submit(getTask());  

            }  

            int sum = 0;  

            for(int i=0; i<10; i++){  

               //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  

                Future<Integer> future = execcomp.take();  

                sum += future.get();  

            }  

            System.out.println("总数为:"+sum);  

            exec.shutdown();  

        }  

        /**

         * 得到一个任务  

         * @return Callable

         */

        public Callable<Integer> getTask(){  

            final Random rand = new Random();  

            Callable<Integer> task = new Callable<Integer>(){  

                @Override  

                public Integer call() throws Exception {  

                    int i = rand.nextInt(10);  

                    int j = rand.nextInt(10);  

                    int sum = i*j;  

                    System.out.print(sum+" ");  

                    return sum;  

                }  

            };  

            return task;  

        }  

    }  

    1. import java.util.Random;  
    2. import java.util.concurrent.BlockingQueue;  
    3. import java.util.concurrent.Callable;  
    4. import java.util.concurrent.CompletionService;  
    5. import java.util.concurrent.ExecutionException;  
    6. import java.util.concurrent.ExecutorCompletionService;  
    7. import java.util.concurrent.ExecutorService;  
    8. import java.util.concurrent.Executors;  
    9. import java.util.concurrent.Future;  
    10. import java.util.concurrent.LinkedBlockingQueue;  
    11.   
    12. public class Test17 {  
    13.     public static void main(String[] args) throws Exception {  
    14.         Test17 t = new Test17();  
    15.         t.count1();  
    16.         t.count2();  
    17.     }  
    18. //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
    19.     public void count1() throws Exception{  
    20.         ExecutorService exec = Executors.newCachedThreadPool();  
    21.         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
    22.         for(int i=0; i<10; i++){  
    23.             Future<Integer> future =exec.submit(getTask());  
    24.             queue.add(future);  
    25.         }  
    26.         int sum = 0;  
    27.         int queueSize = queue.size();  
    28.         for(int i=0; i<queueSize; i++){  
    29.             sum += queue.take().get();  
    30.         }  
    31.         System.out.println("总数为:"+sum);  
    32.         exec.shutdown();  
    33.     }  
    34. //使用CompletionService(完成服务)保持Executor处理的结果  
    35.     public void count2() throws InterruptedException, ExecutionException{  
    36.         ExecutorService exec = Executors.newCachedThreadPool();  
    37.         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
    38.         for(int i=0; i<10; i++){  
    39.             execcomp.submit(getTask());  
    40.         }  
    41.         int sum = 0;  
    42.         for(int i=0; i<10; i++){  
    43. //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
    44.             Future<Integer> future = execcomp.take();  
    45.             sum += future.get();  
    46.         }  
    47.         System.out.println("总数为:"+sum);  
    48.         exec.shutdown();  
    49.     }  
    50.     //得到一个任务  
    51.     public Callable<Integer> getTask(){  
    52.         final Random rand = new Random();  
    53.         Callable<Integer> task = new Callable<Integer>(){  
    54.             @Override  
    55.             public Integer call() throws Exception {  
    56.                 int i = rand.nextInt(10);  
    57.                 int j = rand.nextInt(10);  
    58.                 int sum = i*j;  
    59.                 System.out.print(sum+" ");  
    60.                 return sum;  
    61.             }  
    62.         };  
    63.         return task;  
    64.     }  
    65.     /** 
    66.      * 执行结果: 
    67.         6   6   14  40  40  0   4   7   0   0   总数为:106 
    68.         12  6   12  54  81  18  14  35  45  35  总数为:312 
    69.      */  
    70. }  
  • 相关阅读:
    Mysql的row_format(fixed与dynamic)
    no-referrer-when-downgrade什么意思
    a标签属性 rel=noopener noreferrer
    深入理解ob_flush和flush的区别
    win7下php7.1运行getenv('REMOTE_ADDR')fastcgi停止运行
    学会了这项技能,你就能获得任何想要的信息!
    原来游戏技术行业最大的秘密竟然是...
    王亮:游戏AI探索之旅——从alphago到moba游戏
    入门系列之在Ubuntu 14.04上备份,还原和迁移MongoDB数据库
    入门系列之在Ubuntu上安装Drone持续集成环境
  • 原文地址:https://www.cnblogs.com/xubiao/p/5463283.html
Copyright © 2011-2022 走看看