zoukankan      html  css  js  c++  java
  • java CompletionService ExecutorCompletionSerivce

    我们来想一个问题:

    如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么我们可以保留与每个任务关联的Future,然后反复使用get方法,从而通过轮询来拿到返回结果,但是这样有些繁琐。 

    而ExecutorCompletionService可以简化这个操作。这就是他的作用,可以对比一下两个代码。

    废话不说,上代码。

     1 package com.citi.test.mutiplethread.demo0511;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.Callable;
     6 import java.util.concurrent.ExecutionException;
     7 import java.util.concurrent.ExecutorService;
     8 import java.util.concurrent.Executors;
     9 import java.util.concurrent.Future;
    10 
    11 public class TestCompletionService {
    12     public static void main(String[] args) {
    13         ExecutorService service=Executors.newFixedThreadPool(10);
    14         List<Future<Integer>> list=new ArrayList<Future<Integer>>(); //跟CompletionService比,多余的步骤
    15         for(int i=0;i<9;i++){
    16             final int index=i;
    17             Future<Integer> submit = service.submit(new Callable<Integer>() { //这句话前面部分也是多余的步骤
    18                 @Override
    19                 public Integer call() throws Exception {
    20 //                    int a=1/0;
    21                     return index;
    22                 }
    23             });
    24             list.add(submit);
    25         }
    26         for(Future<Integer> temp: list){
    27             try {
    28                 Integer integer = temp.get();
    29                 System.out.println(Thread.currentThread().getName()+" "+integer);
    30             } catch (InterruptedException | ExecutionException e) {
    31                 // TODO Auto-generated catch block
    32                 e.printStackTrace();
    33             }
    34         }
    35         service.shutdown();
    36     }
    37 }

    可以看到上面的方法也是可以的,但是有些繁琐,java提供了一种更好的方法:完成服务(CompletionSerivce)

    不多说,上代码,

     1 package com.citi.test.mutiplethread.demo0511;
     2 
     3 import java.util.concurrent.Callable;
     4 import java.util.concurrent.CompletionService;
     5 import java.util.concurrent.ExecutionException;
     6 import java.util.concurrent.ExecutorCompletionService;
     7 import java.util.concurrent.ExecutorService;
     8 import java.util.concurrent.Executors;
     9 import java.util.concurrent.Future;
    10 
    11 public class TestCompletionService2 {
    12     public static void main(String[] args) {
    13         ExecutorService executor=Executors.newCachedThreadPool();
    14         CompletionService<Integer> completionService= new ExecutorCompletionService<Integer>(executor);
    15         for(int i=0;i<10;i++){
    16             final int index=i;
    17             completionService.submit(new Callable<Integer>() {
    18                 @Override
    19                 public Integer call() throws Exception {
    20                     return index;
    21                 }
    22             });
    23         }
    24         for(int i=0;i<10;i++){
    25             try {
    26                 Future<Integer> take = completionService.take(); //可以看到我们可以直接用completionService 来拿到Future对象。非常简便。
    27                 Integer integer = take.get();
    28                 System.out.println(Thread.currentThread().getName()+" "+integer);
    29             } catch (InterruptedException e) {
    30                 // TODO Auto-generated catch block
    31                 e.printStackTrace();
    32             } catch (ExecutionException e) {
    33                 // TODO Auto-generated catch block
    34                 e.printStackTrace();
    35             }
    36         }
    37         executor.shutdown();
    38     }
    39 }

    可以看到,我们不用像原来那样,

    在向ExecutorService中提交任务之后,都去拿Future对象,

    而是在提交完所有任务后,直接通过completionService.take来拿future对象了。

    说一下原理,通过表象可以看出

    CompletionService将Executor和BlockingQueue的功能融合在一起,你可以将callable任务提交给它执行,然后使用类似队列的操作的take和poll等方法来获得已完成的结果。

    ExecutorCompletionSerivce实现了CompletionService,将计算部分委托给一个Executor。

    通过源码可以看到,

    CompletionService 这是个接口,定义了一些规范。

    ExecutorCompletionSerivce是CompletionService的实现类,在构造函数中创建了一个BlockingQueue来保存计算完成的结果。

    当提交某个任务时,该任务首先被包装为一个QueueingFuture,这是FutureTask的一个子类,然后改写子类的done方法,并将结果放到这个BlockingQueue中,之后想拿结果时,可以直接用BlockingQueue的take和poll方法来获取结果,这里而且是阻塞的。

    成员变量和内部类

    //成员变量。
    private final Executor executor; 
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    
    //继承自FutureTask,重写done方法,来实现在任务完成后,把结果加入到BlockingQueue中
    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    构造方法有两个

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    
    //可以传入想用的阻塞队列
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    submit方法具体实现

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        //将callable提交给ExecutorCompletionSerivce之后,任务会被包装成QueueingFuture,而QueueingFuture继承自FutureTask,只不过重写了done方法,重写之后的done方法,可以看到是把包装之后的task放入到了阻塞队列中,这样在提交完所有任务之后,我们可以直接调用ExecutorCompletion.take方法来获得结果了
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
    
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
  • 相关阅读:
    匈牙利游戏
    钓鱼
    路由选择
    借教室
    有趣的数
    广告印刷
    海战
    暑假周进度报告(一)
    在Oracle创建一个自己用的用户及角色
    下载,安装oracle数据库以及navicat连接数据库
  • 原文地址:https://www.cnblogs.com/liumy/p/11645154.html
Copyright © 2011-2022 走看看