zoukankan      html  css  js  c++  java
  • 多线程(四)

    承接上面 上面的submit方法,与之相关联的有两个方法invokeAll 和invokeAny 的源码,我们来看一下这两个方法:

    首先是invokeAll的简单的,没有时间限制的方法,JDK说明,源码如下:

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
              执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        try {
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }

    1. 首先根据传入的tasks,构建返回任务的数据:

    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

    2. 根据任务,封装为FutureTask,这样就能够记录任务的状态:

    3.执行任务,这个任务执行在子类ThreadPoolExecutor的实现,这个会在以后说明,直接的理解为执行了。

    上面的一个for循环,就是构建数据,然后执行,没有阻塞的概念。

    下面的这个for循环,就有阻塞的概念,4.f.get();  取任务的执行的结果,这个就是一个会阻塞主线程的地方,因为如果任务没有完成,这个是必然会阻塞,知道任务完成(完成,取消,或者异常)。

    try/catch后面的finally语句,表示的是:

    判断任务是否done,如果没有(这种情况一般只有当任务为null才会发生),则必须把所有任务都cancel掉,底层会中断所有线程,回收资源。

    测试的代码,使用多线程(二)里面的CallabelTask

    public static void invokeAllCallableTask()throws Exception{
            ExecutorService executor = Executors.newCachedThreadPool();
            List<CallabelTask> tasks = new ArrayList<CallabelTask>(5);
            for(int i = 0; i < 5; ){
                tasks.add(new CallabelTask("task_"+(++i)));
            }
            List<Future<String>> results = executor.invokeAll(tasks);
            System.out.println("All the tasks have been submited through invokeAll method!");
            executor.shutdownNow();
            for(Future<String> f : results)
                System.out.println(f.get());
        }

     

    这段代码会在executor.invokeAll(tasks)行阻塞当前线程,直到所有任务都执行完(完成 or 取消 or异常),才会继续往下执行println语句进行打印。这里可以调用executor.shutdownNow()立马关闭线程执行器而不会有问题,因为在执行这一句的时候所有任务肯定已经执行完了。而在“AbstractExecutorService任务提交<一>”submit方法测试中,若使用shutdownNow()会有异常,因为它会去cancel正在执行的任务,很容易直到这个异常便是线程的中断异常了。

    关于第二个invokeAll重载方法,逻辑和第一个invokeAll方法是一样的,不同的是这个方法加了时间的限制:

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null || unit == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                long lastTime = System.nanoTime();
    
                // Interleave time checks and calls to execute in case
                // executor doesn't have any/much parallelism.
                Iterator<Future<T>> it = futures.iterator();
                while (it.hasNext()) {
                    execute((Runnable)(it.next()));
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0)
                        return futures;
                }
    
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        if (nanos <= 0)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }

    时间限制主要有两部分:

    第一个循环,是依此提交任务阶段,有时间限制。假如有10个任务要提交,那么循环提交的时每次都会检查是否超时,如果提交到第8个发现超时了,那么第9、10两个任务就不会在调用execute了,而是直接返回这批任务;

    在第二个循环中,这时依此取执行结果阶段,每次也会判断是否超时,一旦发现超时就立马返回这批任务。

    当然如果超时返回,将无法执行后续语句done = true; 这样在finally中就会把所有任务都cancel掉。而客户端(主线程)就会收到CancellationException。

     

     

  • 相关阅读:
    atitit.nfc 身份证 银行卡 芯片卡 解决方案 attilax总结
    atitit.php 流行框架 前三甲为:Laravel、Phalcon、Symfony2 attilax 总结
    Atitit.执行cmd 命令行 php
    Atitit. 图像处理jpg图片的压缩 清理垃圾图片 java版本
    atitit。企业组织与软件工程的策略 战略 趋势 原则 attilax 大总结
    atitit. 管理哲学 大毁灭 如何防止企业的自我毁灭
    Atitit.java的浏览器插件技术 Applet japplet attilax总结
    Atitit.jquery 版本新特性attilax总结
    Atitit. 软件开发中的管理哲学一个伟大的事业必然是过程导向为主 过程导向 vs 结果导向
    (转)获取手机的IMEI号
  • 原文地址:https://www.cnblogs.com/zhailzh/p/3974700.html
Copyright © 2011-2022 走看看