zoukankan      html  css  js  c++  java
  • 多线程(四)

    承接上面 上面的submit方法,与之相关联的有两个方法invokeAll 和invokeAny 的源码,我们来看一下这两个方法:

    首先是invokeAll的简单的,没有时间限制的方法,JDK说明,源码如下:

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
              执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        try {
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }

    1. 首先根据传入的tasks,构建返回任务的数据:

    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

    2. 根据任务,封装为FutureTask,这样就能够记录任务的状态:

    3.执行任务,这个任务执行在子类ThreadPoolExecutor的实现,这个会在以后说明,直接的理解为执行了。

    上面的一个for循环,就是构建数据,然后执行,没有阻塞的概念。

    下面的这个for循环,就有阻塞的概念,4.f.get();  取任务的执行的结果,这个就是一个会阻塞主线程的地方,因为如果任务没有完成,这个是必然会阻塞,知道任务完成(完成,取消,或者异常)。

    try/catch后面的finally语句,表示的是:

    判断任务是否done,如果没有(这种情况一般只有当任务为null才会发生),则必须把所有任务都cancel掉,底层会中断所有线程,回收资源。

    测试的代码,使用多线程(二)里面的CallabelTask

    public static void invokeAllCallableTask()throws Exception{
            ExecutorService executor = Executors.newCachedThreadPool();
            List<CallabelTask> tasks = new ArrayList<CallabelTask>(5);
            for(int i = 0; i < 5; ){
                tasks.add(new CallabelTask("task_"+(++i)));
            }
            List<Future<String>> results = executor.invokeAll(tasks);
            System.out.println("All the tasks have been submited through invokeAll method!");
            executor.shutdownNow();
            for(Future<String> f : results)
                System.out.println(f.get());
        }

     

    这段代码会在executor.invokeAll(tasks)行阻塞当前线程,直到所有任务都执行完(完成 or 取消 or异常),才会继续往下执行println语句进行打印。这里可以调用executor.shutdownNow()立马关闭线程执行器而不会有问题,因为在执行这一句的时候所有任务肯定已经执行完了。而在“AbstractExecutorService任务提交<一>”submit方法测试中,若使用shutdownNow()会有异常,因为它会去cancel正在执行的任务,很容易直到这个异常便是线程的中断异常了。

    关于第二个invokeAll重载方法,逻辑和第一个invokeAll方法是一样的,不同的是这个方法加了时间的限制:

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null || unit == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                long lastTime = System.nanoTime();
    
                // Interleave time checks and calls to execute in case
                // executor doesn't have any/much parallelism.
                Iterator<Future<T>> it = futures.iterator();
                while (it.hasNext()) {
                    execute((Runnable)(it.next()));
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0)
                        return futures;
                }
    
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        if (nanos <= 0)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }

    时间限制主要有两部分:

    第一个循环,是依此提交任务阶段,有时间限制。假如有10个任务要提交,那么循环提交的时每次都会检查是否超时,如果提交到第8个发现超时了,那么第9、10两个任务就不会在调用execute了,而是直接返回这批任务;

    在第二个循环中,这时依此取执行结果阶段,每次也会判断是否超时,一旦发现超时就立马返回这批任务。

    当然如果超时返回,将无法执行后续语句done = true; 这样在finally中就会把所有任务都cancel掉。而客户端(主线程)就会收到CancellationException。

     

     

  • 相关阅读:
    2020软件工程作业——团队03
    2020软件工程作业 ——团队02
    2020软件工程作业05
    2020软件工程作业04
    2020软件工程作业03
    2020软件工程作业02
    2020软件工程作业01
    微服务:基本介绍
    excel模板解析前后设计变化,以及我对此的看法和感受
    纸上得来终觉浅,绝知此事要躬行——Spring boot任务调度
  • 原文地址:https://www.cnblogs.com/zhailzh/p/3974700.html
Copyright © 2011-2022 走看看