zoukankan      html  css  js  c++  java
  • 11、java5线程池之异步任务CompletionService

    JDK文档描述:

    public interface CompletionService<V>

    将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。 通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。 内存一致性效果:线程中向 CompletionService 提交任务之前的操作 happen-before 该任务执行的操作,后者依次 happen-before 紧跟在从对应 take() 成功返回的操作
    所有已知实现类: 
        ExecutorCompletionService 

    子类的JDK描述:

    public class ExecutorCompletionService<V>extends Objectimplements CompletionService<V>
    使用提供的 Executor 来执行任务的 CompletionService。此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问的队列上。该类非常轻便,适合于在执行几组任务时临时使用。

    所有的方法:

    方法摘要
     Future<V> poll()
              获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null
     Future<V> poll(long timeout, TimeUnit unit)
              获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。
     Future<V> submit(Callable<V> task)
              提交要执行的值返回任务,并返回表示挂起的任务结果的 Future。
     Future<V> submit(Runnable task, V result)
              提交要执行的 Runnable 任务,并返回一个表示任务完成的 Future,可以提取或轮询此任务。
     Future<V> take()
              获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。

    JDK自带的例子:

    假定您有针对某个问题的一组求解程序,每个求解程序都能返回某种类型的 Result 值,并且您想同时运行它们,使用方法 use(Result r) 处理返回非 null 值的每个求解程序的返回结果。可以这样编写程序

    void solve(Executor e, Collection<Callable<Result>> solvers)
          throws InterruptedException, ExecutionException {
            CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
            for (Callable<Result> s : solvers)
                ecs.submit(s);
            int n = solvers.size();
            for (int i = 0; i < n; ++i) {
                Result r = ecs.take().get();
                if (r != null) 
                    use(r);
            }
        }

    假定您想使用任务集中的第一个非 null 结果,而忽略任何遇到异常的任务,并且在第一个任务就绪时取消其他所有任务:

    void solve(Executor e, Collection<Callable<Result>> solvers) 
          throws InterruptedException {
            CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
            int n = solvers.size();
            List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
            Result result = null;
            try {
                for (Callable<Result> s : solvers)
                    futures.add(ecs.submit(s));
                for (int i = 0; i < n; ++i) {
                    try {
                        Result r = ecs.take().get();
                        if (r != null) {
                            result = r;
                            break;
                        }
                    } catch(ExecutionException ignore) {}
                }
            }
            finally {
                for (Future<Result> f : futures)
                    f.cancel(true);
            }
    
            if (result != null)
                use(result);
        }

    自己写的例子:(谁先得到结果我就先取到谁的结果)

    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class ThreadPool2 {
        public static void main(String[] args) {
            test2();
        }
            
        private static void test2(){
            ExecutorService es = Executors.newFixedThreadPool(5);
            CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es);
            for(int i=1 ; i<=5; i++){
                final int task = i;
                cs.submit(new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        try {
                            Thread.sleep(new Random().nextInt(5000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Callable 任务【" + task + "】运行完成");
                        return new Random().nextInt(100);
                    }
                });
            }
            System.out.println("我是取结果前的代码");
            //异步取结果
            for(int i=0; i<5; i++){
                try {
                    System.out.println(cs.take().get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("我是取结果后的代码");
            es.shutdown();
        }
    }

    运行的结果:

    我是取结果前的代码
    Callable 任务【3】运行完成
    25
    Callable 任务【1】运行完成
    16
    Callable 任务【4】运行完成
    38
    Callable 任务【5】运行完成
    17
    Callable 任务【2】运行完成
    4
    我是取结果后的代码
  • 相关阅读:
    李宏毅机器学习笔记11(Unsurpervised Learning 03——Deep Generative model)
    李宏毅机器学习笔记09(Unsupervised Learning 01——Clustering and PCA)
    Nginx配置Https重定向 Chrome跳转到%2a.xxx.com的问题
    什么是REST——适合初学者的一种简单解释,第一部分:介绍
    微信授权全解
    师曾文正
    《不留痕迹》观后感
    半年的计划
    想做一名淡泊名利的逐梦者
    如何让微信浏览器返回上一页时强制刷新
  • 原文地址:https://www.cnblogs.com/yangzhilong/p/4799317.html
Copyright © 2011-2022 走看看