zoukankan      html  css  js  c++  java
  • 多线程(五)

    invokeAll 说完以后,我们来看AbstractExecutorService的invokeAny方法,这个方法前面有代码说明过,与invokeAll不同的是,在给定的任务中,如果某一个任务完成(没有异常抛出),则返回任务执行的结果。这点从方法的返回值上面也能看出来。并不要求所有的任务全部的完成,只要一个完成(没有异常)即可。JDK的说明:

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。

    返回值为T 也说明了这一点。

    源代码:

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }

    这个代码没有什么好说的,关键点地方在于doInvokeAny()方法上面,直接上源码:

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            // For efficiency, especially in executors with limited
            // parallelism, check to see if previously submitted tasks are
            // done before submitting more of them. This interleaving
            // plus the exception mechanics account for messiness of main
            // loop.
    
            try {
                // Record exceptions so that if we fail to obtain any
                // result, we can throw the last exception we got.
                ExecutionException ee = null;
                long lastTime = timed ? System.nanoTime() : 0;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1;
    
                for (;;) {
                    Future<T> f = ecs.poll();
                    if (f == null) {
                        if (ntasks > 0) {
                            --ntasks;
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        else if (active == 0)
                            break;
                        else if (timed) {
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            long now = System.nanoTime();
                            nanos -= now - lastTime;
                            lastTime = now;
                        }
                        else
                            f = ecs.take();
                    }
                    if (f != null) {
                        --active;
                        try {
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
                for (Future<T> f : futures)
                    f.cancel(true);
            }
        }

    这个比较的复杂,还是一步步的来:

    首先是根据任务建立返回值。

    List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

    ntasks 为当前任务数。

    然后是建立ExecutorCompletionService JDK对这个类的说明是:

    使用提供的 Executor 来执行任务的 CompletionService。此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问的队列上。该类非常轻便,适合于在执行几组任务时临时使用。具体的源代码,我们会在后面加以分析。

    具体的实现的代码在try/catch中

    ntasks 当前的任务数

    active 当前运行的进程数

    // Start one task for sure; the rest incrementally

    futures.add(ecs.submit(it.next()));

    首先提交一个task,然后进入循环,这样能够提交效率。

    然后进入无限循环,被分为了两种情况:

    if (f == null)

    我们还有首先说一下ecs.poll 的方法的作用:检索并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null

    如果f == null 说明此事没有任务完成,分为三种情况:

    (1) 当前进程还没有提交完,则继续提交进程(越到后面越慢,所以在提交任务的时候一般把执行时间较短的任务放在前面,即是从到小的排列顺序,能够提交效率)

    (2) 如果当前执行的进程数为0,直接的跳出循环,这个是一个异常的分支,跳出循环以后,执行的是:if (ee == null) ee = new ExecutionException(); throw ee; 抛出异常。

    (3) 如果已经没有可以提交的任务,并且还有任务在执行,也就是(1)/(2) 不成立,立即去获取任务的结果,如果有时间限制,则是f = ecs.poll(nanos, TimeUnit.NANOSECONDS);,获取不到则抛出超时异常 。没有则是f.get();

    这里针对条件(3)进行说明,如果没有可以提交的任务,并且当前还有任务只在执行的情况下才会执行到(3),这个也是为了能够更加的快速的获取结果,尽快的获得任务重最先完成的哪一个任务。这样能够阻塞的获取任务的结果。

    If(f != null)

    f 不为null 说明已经有任务完成了,减少正在执行的任务数,直接取得执行结果(可能会阻塞)。

    最后在finally中,取消所有的任务。

    InvokeAll的主要的逻辑已经分析完毕,我们还遗留一个小尾巴,他怎么保证任务执行完以后直接的塞到结果的序列中,这个比较的重要,就是Poll方法。

    首先我们来看一下但是调用的构建函数源码:

    public ExecutorCompletionService(Executor executor,
                                         BlockingQueue<Future<V>> completionQueue) {
            if (executor == null || completionQueue == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = completionQueue;
        }

    从构建的函数来看这个executor就是abstractExecutorService。我们使用的方法主要是:

    ecs.submit(it.next())

    ecs.poll();

    ecs.poll(nanos, TimeUnit.NANOSECONDS);

    可以看到这些方法的主要的关键在于结果队列completionQueue,无论是take,还是poll 都是BlockingQueue支持的,从源代码中方法 poll()以及 take()能够看到。源代码上面是:

    public Future<V> submit(Runnable task, V result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
        public Future<V> take() throws InterruptedException {
            return completionQueue.take();
        }
    
        public Future<V> poll() {
            return completionQueue.poll();
        }

    那就还剩下一个主要的问题:完成的任务是怎么放到这个队列中的?那么我们首先还是需要关注提交任务的方法:

    public Future<V> submit(Runnable task, V result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            executor.execute(new QueueingFuture(f));
            return f;
        }

    QueueingFuture 为内部类,源码为:

    private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }

    看到这个方法的重定义:protected void done() { completionQueue.add(task); }

    关于接口done()的方法JDK说明:

    protected void done()

    当此任务转换到状态 isDone(不管是正常地还是通过取消)时,调用受保护的方法。默认实现不执行任何操作。子类可以重写此方法,以调用完成回调或执行簿记。注意,可以查询此方法的实现内的状态,从而确定是否已取消了此任务。

    子类QueueingFuture 重写done()方法, 当task完成的,会把执行的结果放到completionQueue中。实现前面所说的当任务完成时,放到了完成的阻塞队列中。

    因为这个原因,在JDK中关于ExecutorCompletionService有两种应用的情况:

    假定您有针对某个问题的一组求解程序,每个求解程序都能返回某种类型的 Result 值,并且您想同时运行它们,使用方法 use(Result r) 处理返回非 null 值的每个求解程序的返回结果。可以这样编写程序:

    void solve(Executor e, Collection<Callable<Result>> solvers)
    
        throws InterruptedException, ExecutionException {
    
            CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    
            for (Callable<Result> s : solvers)
    
                ecs.submit(s);
    
            int n = solvers.size();
    
            for (int i = 0; i < n; ++i) {
    
                Result r = ecs.take().get();
    
                if (r != null)
    
                    use(r);
    
            }

    假定您想使用任务集中的第一个非 null 结果,而忽略任何遇到异常的任务,并且在第一个任务就绪时取消其他所有任务:

    void solve(Executor e, Collection<Callable<Result>> solvers) 
          throws InterruptedException {
            CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
            int n = solvers.size();
            List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
            Result result = null;
            try {
                for (Callable<Result> s : solvers)
                    futures.add(ecs.submit(s));
                for (int i = 0; i < n; ++i) {
                    try {
                        Result r = ecs.take().get();
                        if (r != null) {
                            result = r;
                            break;
                        }
                    } catch(ExecutionException ignore) {}
                }
            }
            finally {
                for (Future<Result> f : futures)
                    f.cancel(true);
            }
    
            if (result != null)
                use(result);
        }

    在JDK说明的第二种情况和invokeAny比较的像,可以进行类比。

  • 相关阅读:
    惭愧无法面对的SQL ORDER BY
    JVM参数官方说明
    Java Unsafe 测试代码
    好记性不如烂笔头-Duration与Period中字母含义
    计算机组成原理中源码、反码、补码存在意义
    线程池参数、线程池扩容以及拒绝策略触发时机demo代码
    朴素贝叶斯法
    K近邻法与kd树
    EM算法
    熵、交叉熵、KL散度、JS散度
  • 原文地址:https://www.cnblogs.com/zhailzh/p/3981347.html
Copyright © 2011-2022 走看看