zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

    AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现。这些方法包括submit,invokeAny和InvokeAll。

    注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略。这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来。

    首先来看submit方法,它的基本逻辑是这样的:

    1. 生成一个任务类型和Future接口的包装接口RunnableFuture的对象

    2. 执行任务

    3. 返回future。

        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    因为submit支持Callable和Runnable两种类型的任务,因此newTaskFor方法有两个重载方法:

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }

    上一篇文章里曾经说过Callable和Runnable的区别在于前者带返回值,也就是说Callable=Runnable+返回值。因此java中提供了一种adapter,把Runnable+返回值转换成Callable类型。这点可以在newTaskFor中的FutureTask类型的构造函数的代码中看到:

        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            sync = new Sync(callable);
        }
    
        public FutureTask(Runnable runnable, V result) {
            sync = new Sync(Executors.callable(runnable, result));
        }

    以下是Executors.callable方法的代码:

        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }

    那么RunnableAdapter的代码就很好理解了,它是一个Callable的实现,call方法的实现就是执行Runnable的run方法,然后返回那个value。

        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }

    接下来先说说较为简单的invokeAll:

    1. 为每个task调用newTaskFor方法生成得到一个既是Task也是Future的包装类对象的List

    2. 循环调用execute执行每个任务

    3. 再次循环调用每个Future的get方法等待每个task执行完成

    4. 最后返回Future的list。

        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null || unit == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                // 为每个task生成包装对象
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                long lastTime = System.nanoTime();
    
                // 循环调用execute执行每个方法
                // 这里因为设置了超时时间,所以每次执行完成后
                // 检查是否超时,超时了就直接返回future集合
                Iterator<Future<T>> it = futures.iterator();
                while (it.hasNext()) {
                    execute((Runnable)(it.next()));
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0)
                        return futures;
                }
    
                // 等待每个任务执行完成
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        if (nanos <= 0)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }

    最后说说invokeAny,它的难点在于只要一个任务执行成功就要返回,并且会取消其他任务,也就是说重点在于找到第一个执行成功的任务。

    这里我想到了BlockingQueue,当所有的任务被提交后,任务执行返回的Future会被依次添加到一个BlockingQueue中,然后找到第一个执行成功任务的方法就是从BlockingQueue取出第一个元素,这个就是doInvokeAny方法用到的ExecutorCompletionService的基本原理。

    因为两个invokeAny方法都是调用doInvokeAny方法,下面是doInvokeAny的代码分析:

        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
            // ExecutorCompletionService负责执行任务,后面调用用poll返回第一个执行结果
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            // 这里出于效率的考虑,每次提交一个任务之后,就检查一下有没有执行完成的任务
    
            try {
                ExecutionException ee = null;
                long lastTime = timed ? System.nanoTime() : 0;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // 先提交一个任务
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1;
    
                for (;;) {
                    // 尝试获取有没有执行结果(这个结果是立刻返回的)
                    Future<T> f = ecs.poll();
                    // 没有执行结果
                    if (f == null) {
                        // 如果还有任务没有被提交执行的,就再提交一个任务
                        if (ntasks > 0) {
                            --ntasks;
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        // 没有任务在执行了,而且没有拿到一个成功的结果。
                        else if (active == 0)
                            break;
                        // 如果设置了超时情况
                        else if (timed) {
                            // 等待执行结果直到有结果或者超时
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            // 这里的更新不可少,因为这个Future可能是执行失败的情况,那么还需要再次等待下一个结果,超时的设置还是需要用到。
                            long now = System.nanoTime();
                            nanos -= now - lastTime;
                            lastTime = now;
                        }
                        // 没有设置超时,并且所有任务都被提交了,则一直等到第一个执行结果出来
                        else
                            f = ecs.take();
                    }
                    // 有返回结果了,尝试从future中获取结果,如果失败了,那么需要接着等待下一个执行结果
                    if (f != null) {
                        --active;
                        try {
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                // ExecutorCompletionService执行时发生错误返回了全是null的future
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
                // 尝试取消所有的任务(对于已经完成的任务没有影响)
                for (Future<T> f : futures)
                    f.cancel(true);
            }
        }

    后面接着分析ThreadPoolExecutor和ScheduledThreadPoolExecutor。

  • 相关阅读:
    CDN技术分享
    大型网站架构技术一览
    Remember-Me功能
    spring-security用户权限认证框架
    关于 tomcat 集群中 session 共享的三种方法
    Nginx+Tomcat+Terracotta的Web服务器集群实做
    Terrocotta
    使用hibernate tools插件生成POJO
    Session简介
    Cookie设置HttpOnly,Secure,Expire属性
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3910428.html
Copyright © 2011-2022 走看看