zoukankan      html  css  js  c++  java
  • JDK并发包中ExecutorCompletionService使用

      相信大家都知道,jdk中ExecutorService是并发编程时使用很频繁的接口,并且使用很方便,那么想在有这么一个场景:

      一批任务使用线程池处理,并且需要获得结果,但是不关心任务执行结束后输出结果的先后顺序,应该如何实现?大多数人可能会想到,将任务作为一个Callable,然后调用submit塞入ExecutorService中,返回Future,然后遍历Future,依次获得结果不就行了吗?  

      那么,大家是否想过,这样有两个不好的点:1.调用submit后需要将Future存储;2.遍历Future的list时,get()方法时阻塞的,就算使用get(long timeout, TimeUnit unit)方法,避免不了需要通过while来循环获取结果

      其实如果不关心任务返回的先后顺序,那么还有一个更方便的线程池,也就是今天要分享,使用ExecutorCompletionService避免以上两个问题,并且编码简单,容易理解:

    public class CompletionServiceTest {
        //初始化固定大小为3的线程池
        private static ExecutorService executor = Executors.newFixedThreadPool(3);
    
        public static void main(String[] args) {
            List<CompletionServiceTask> tasks = new ArrayList<>();
            //新建3个任务
            CompletionServiceTask task1 = new CompletionServiceTask(6000);
            CompletionServiceTask task2 = new CompletionServiceTask(4000);
            CompletionServiceTask task3 = new CompletionServiceTask(2000);
            tasks.add(task1);
            tasks.add(task2);
            tasks.add(task3);
            addTask(tasks);
        }
    
        public static void addTask(List<CompletionServiceTask> tasks) {
            //以executor为构造器的参数,新建一个ExecutorCompletionService线程池
            ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
            for (CompletionServiceTask task : tasks) {
                //提交任务
                completionService.submit(task);
                System.out.println("添加任务 :" + task.time);
            }
            for (CompletionServiceTask task : tasks) {
                try {
                    Integer time = completionService.take().get();
                    System.out.println("任务返回结果:" + time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            //关闭线程池
            executor.shutdown();
        }
    
        static class CompletionServiceTask implements Callable<Integer> {
    
            public int time;
    
            public CompletionServiceTask(int time) {
                this.time = time;
            }
    
            @Override
            public Integer call() throws Exception {
                Thread.sleep(time);
                return time;
            }
        }
    }

      上述代码运行结果:

    添加任务 :6000
    添加任务 :4000
    添加任务 :2000
    任务返回结果:2000
    任务返回结果:4000
    任务返回结果:6000  

      从结果可以看出,虽然6000是第一个添加进去,但是最先返回的确实2000,因此我们可以看出,并非先添加的任务先返回,而是最先执行结束的任务先返回,这样的好处就是不用为了等待前面的任务,导致后续的阻塞。

      原因分析:查看ExecutorCompletionService的源码:

        private final BlockingQueue<Future<V>> completionQueue;
        public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }

      其中,有一个类型为BlockingQueue的全局变量completionQueue(这里划重点,这个阻塞队列就是用来存储线程池执行返回结果的),submit()方法会将Callable封装成一个RunnableFuture,然后将其塞入QueueingFuture中,交给executor执行,我们再看一下QueueingFuture(FutureTask的一个子类):

        private final BlockingQueue<Future<V>> completionQueue;
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }

      其通过改写FutureTask类的done()方法,将结果放入上面的BlockingQueue中,所以加入的顺序就是任务执行完成的先后顺序。

  • 相关阅读:
    eclipse注释模板设置(未整理)
    10大最适合编程的字体推荐下载,让代码看起来更美更舒服!
    系统里有Courier New字体 Eclipse没有这个字体选项
    Hadoop安装教程_集群/分布式配置
    Hadoop安装教程_单机/伪分布式配置
    VMware中三种网络连接的区别
    修改Tomcat的默认访问目录
    Ubuntu 16.04服务器 配置
    Ubuntu 16.04服务器 软件的安装及配置
    排序算法 -- 桶排序
  • 原文地址:https://www.cnblogs.com/handsomeye/p/7230910.html
Copyright © 2011-2022 走看看