zoukankan      html  css  js  c++  java
  • 关于java线程池 Ⅱ

    上一篇翻译了线程池主要部分的api,经过一段时间的学习,这里记录一下这段时间对jdk自带线程池的学习成果。

    为了方便说明,先放一张类图,包括了jdk线程池主要涉及到的类,为了条理清晰去掉了部分依赖和关联关系

    说明

    概述

    把握住主要的几个类可以比较快的了解整个实现的思想,下面罗列一下,细节的部分会在之后详细描述:

    主要的接口定义:Executor,Future,ExecutorService,Runnable

    主要的抽象类:AbstractExecutorService

    主要的实现类:ThreadPoolExecutor

    模型骨架

    按顺序讲讲我理解的这些代码的coder也就是Doug Lea在编写这部分代码的时候的思路:

    1.首先想到的是最抽象的定义,应该有个执行者,这个执行者的行为只有一个,就是执行可以运行的(Runnable)的任务,出于此目的,他设计了Executor接口,定义了一个行为execute(Runnable command).

    2.因为Runnable接口只能保证一个任务(task)是可执行的,并不能记录这个任务执行期间实时的状态及执行结果等特性,并且也不能够提供类似取消执行任务这样的操作,所以需要另设一个接口,通过这个接口来保证前面所描述的这些特性能够实现,在这样的前提下,诞生了Future接口,具体接口行为定义如下:

    1.cancel(boolean mayInterruptIfRunning):取消执行操作,这个取消操作对队列中尚在等待的任务有效,已经执行的任务在调用此方法的时候将会返回false。但是,若将mayInterruptIfRunning方法的值设置为true时,将会中断正在执行的任务,正在执行的任务应当抛出InterruptException.

    2.isCancelled():判断当前任务是否已经取消

    3.isDone():判断当前任务是否完成

    4.get():调用此方法的线程,在必要的情况下,等待执行任务的线程执行完毕,并最终获取返回结果。

    5.get(long timeout,TimeUnit unit):调用此方法的线程,在一定的时间内等待执行任务的线程执行完毕,指定时间内未能获取返回结果将最终导致抛出TimeOutException异常。

    3.定义了Executor,Future这两类元素,再加上Runnable元素,模型的草图就差不多设计完成了,但是实际上Executor并不能与Future协同工作,记得吗?Executor只有一个行为,execute(Runnable command).Executor实际上只能与Runnable协同工作,那么为了使Executor同时还能与Future协同工作,ExecutorService接口诞生了,具体接口行为定义如下:

    1.shutdown():关闭当前的ExecutorService,这个方法允许等待执行的任务执行完毕。

    2.shutdownNow():关闭当前的ExecutorService,这个方法会阻止等待执行的任务启动,并且会尝试中断正在执行的任务。

    3.isShutdown():判断当前ExecutorService是否已经关闭

    4.isTerminated():判断在执行了shutdown操作之后是否所有的任务都已经完成了。(不管是普通的执行完毕还是抛出异常都算执行终止)

    5.awaitTermination(long timeout,TimeUnit unit):等待在执行了shutdown操作之后剩余的等待执行的任务执行完毕,这个操作将会在指定时间内阻塞线程,直到返回结果或者超时。

    6.submit(Callable<T> task):Future<T>  :提交一个Callable任务,并返回一个Future对象,如果你想在这个方法上阻塞只到这个任务执行完毕,你可以这么写:exec.submit(task).get();

    7.submit(Runnable task,T result):Future<T>   :这个方法与6类似,只是因为提交的是Runnable任务,这个任务自身执行完毕是没有返回值的,所以需要自己设置返回结果,即result;

    8.submit(Runnable taks):Future<T>   :还是同上,只是放回的Future<T>执行get()方法时将会返回null.

    9.invokeAll(Collection<? extends Callable<T> tasks):List<Future<T>>:批量的执行Callable任务,并返回一个List<Future<T>>

    10.invokeAll(Collection<? extends Callable<T> tasks,long  timeout, TimeUnit unit):List<Future<T>>  :猜也能猜个八九不离十了,在指定时间内批量的执行Callable任务,并返回一个List<Future<T>>,接口定义中这样描述,不管是所有任务完成或者是超时,都将返回List<Future<T>>。这意味着,这个Future的list中会有一部分Future指示的任务可能是未完成的。

    11.invokeAny(Collection<? extends Callable<T> tasks>):T  :这个行为比较有意思,批量执行Callable任务(不保证全部执行),返回其中执行成功的一个任务的执行结果。一旦有任意一个任务成功返回,或者抛出异常,则其他还未完成的任务将被取消。

    12.invokeAny(Collection<? extends Callable<T> tasks>, long timeout, TimeUnit unit):和上一个行为是一样的,只是限制了再一定时间内完成。

     

    实际上到了这一步,整个模型的骨架已经搭建起来了。但是,到目前为止可以看出的是,实际上这个模型搭建的是单个执行者(Executor),多个任务(Runnable或者Callable)的工作场景。多个执行者,多个任务的工作场景,实际上是由ThreadPoolExecutor继承了AbstractExecutorService而实现的。下面我们主要看看这两个类。

    详细设计

    AbstractExecutorService

    先着重介绍AbstractExecutorService,这个抽象类实现了ExecutorService的submit,invokeAll,invokeAny方法,即所有处理任务的方法(但是要注意,它并没有实现Executor的execute方法,这个方法将在ThreadPoolExecutor中实现,这里的所有处理任务的方法都只是对execute方法的包装)。

    这个类中有两个有意思的设计,我们主要看看这两个设计:

    1.这个类上有一个有意思的变化,还记得之前说的ExecutorService是可以同时Runnable与Future协同工作的吗?从类图上可以看出来,实际上AbstractExecutorService依赖于FutureTask这个类,而这个类又实现了RunnableFuture接口,而RunnableFuture接口继承了Future接口和Runnable接口!看出来了没?FutureTask实际上是一个Future与Runnable的混合产物!FutureTask即具有Future的行为,也具有Runnable的行为。这样就方便了,不管是想要Future作为返回值,还是想要Runnable作为参数,都只需要提供一个FutureTask就可以满足需要了,同时,统一使用FutureTask最主要的目的,按照我的思路,应该是屏蔽Future对象与Runnable对象之间的信息交流,减少实际编写时的代码开销(即把Future与Runnable对象的信息交互统一到了FutureTask中)

    AbstractExecutorService提供了两个方法来将传入的Runnable对象和Callable对象转换成FutureTask对象,如下所示:

    /**
         * Returns a <tt>RunnableFuture</tt> for the given runnable and default
         * value.
         *
         * @param runnable the runnable task being wrapped
         * @param value the default value for the returned future
         * @return a <tt>RunnableFuture</tt> which when run will run the
         * underlying runnable and which, as a <tt>Future</tt>, will yield
         * the given value as its result and provide for cancellation of
         * the underlying task.
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        /**
         * Returns a <tt>RunnableFuture</tt> for the given callable task.
         *
         * @param callable the callable task being wrapped
         * @return a <tt>RunnableFuture</tt> which when run will call the
         * underlying callable and which, as a <tt>Future</tt>, will yield
         * the callable's result as its result and provide for
         * cancellation of the underlying task.
         * @since 1.6
         */
        protected <T> RunnableFuture<T> 
    newTaskFor(Callable<T>
     callable) {
            return new FutureTask<T>(callable);
        }

    submit方法通过调用newTaskFor方法获取FutureTask对象,在调用execute(Runnable task)时,使用了FutureTask对象作为传入参数,返回时也使用了同一个FutureTask对象作为返回值,这样就对外屏蔽了Future与Runnable之间的信息交流,不必再在每个submit方法中都将返回的结果拼装成Future对象,然后返回。如下所示:

    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Object> ftask = 
    newTaskFor(task,
     null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> 
    submit(Callable<T>
     task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    这里顺带一提FutureTask的实现:应该不难发现,我们讲到这里为止,都没有说明,如果出现共享资源竞争,即多个线程同时执行时,线程安全问题怎么解决?这个问题实际上就是在FutureTask中解决的。FutureTask有个私有属性 sync,FutureTask的所有行为实际上都是由sync实现的。而这个sync所属的类Sync实际上是FutureTask的内部类.这个类继承了AbstractQueuedSynchronizer 。Sync可以保证在共享资源竞争时,有且仅有一个线程获得资源(这一部分我打算下一次仔细研究过以后再放上博客)。

    2.AbstractExecutorService在实现invokeAny方法时,引入了ExecutorCompletionService。

    先从为何这么设计说起。回头看看invokeAny的接口行为定义:批量执行Callable任务(不保证全部执行),返回其中执行成功的一个任务的执行结果。一旦有任意一个任务成功返回,或者抛出异常,则其他还未完成的任务将被取消。也就是说,要想实现这个行为,必须得知道当前是否有任意一个任务完成,那么怎么来判断是否有任意一个任务是否完成呢?(注意:是判断有没有任意一个任务完成,而不是判断当前任务是否完成,否则可以直接调用FutureTask的isDone()方法即可判断当前任务是否完成)可以看看FutureTask是否实现了这么一个方法。进入FutureTask的类,可以看到并没有这么一个方法,但是,其中有一个方法值得注意,如下所示:

    /**
         * Protected method invoked when this task transitions to state
         * <tt>isDone</tt> (whether normally or via cancellation). The
         * default implementation does nothing.  Subclasses may override
         * this method to invoke completion callbacks or perform
         * bookkeeping. Note that you can query status inside the
         * implementation of this method to determine whether this task
         * has been cancelled.
         */
        protected void done() { }

    这里实现了一个空的done()方法,根据注释,这个方法将会在任务状态变为完成的时候执行。那么就好办了,只需要实现一个类继承FutureTask然后实现done()方法,将已经完成的任务放入一个队列中,然后监控这个队列,一旦这个队列有值,就说明有一个任务完成了,invokeAny方法即可以返回这个完成的任务的结果了。

    这个就是ExecutorCompletionService诞生的原因。ExecutorCompletionService类有一个内部类 QueueingFuture 继承自FutureTask,它实现了done()方法,将完成的任务加入了一个LinkedBlockingQueue<Future<V>>中。如下所示:

    /**
         * FutureTask extension to enqueue upon completion
         */
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }

    ExecutorCompletionService的submit方法为真正任务的执行者提供了一个QueueingFuture,并提供了一系列监控已完成任务队列:completionQueue的方法。这样在实际处理当中,就可以通过监控ExecutorCompletionService的completionQueue来判断是否有任务完成,这部分的具体代码在AbstractEexcutorService的

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)中,如下所示:

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this); //用ExecutorCompletionService对自己做了一层包装
    
            // For efficiency, especially in executors with limited
            // parallelism, check to see if previously submitted tasks are
            // done before submitting more of them. This interleaving
            // plus the exception mechanics account for messiness of main
            // loop.
    
            try {
                // Record exceptions so that if we fail to obtain any
                // result, we can throw the last exception we got.
                ExecutionException ee = null;
                long lastTime = (timed)? System.nanoTime() : 0;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1;
    
                for (;;) {
                    Future<T> f = ecs.poll(); //ExecutorCompletionService提供的监控completionQueue的方法,如果队列为空则不进行等待,直接返回null
                    if (f == null) {
                        if (ntasks > 0) {
                            --ntasks;
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        else if (active == 0)
                            break;
                        else if (timed) {
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS); //ExecutorCompletionService提供的监控completionQueue的方法,如果队列为空则等待指定的时间,如果超时时队列仍然为空,则返回null 
                            if (f == null)
                                throw new TimeoutException();
                            long now = System.nanoTime();
                            nanos -= now - lastTime;
                            lastTime = now;
                        }
                        else
                            f = ecs.take(); //ExecutorCompletionService提供的监控completionQueue的方法,如果队列为空会一直等待直到队列中有值,然后取到值返回
                    }
                    if (f != null) {
                        --active;
                        try {
                            return f.get();
                        } catch (InterruptedException ie) {
                            throw ie;
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
                for (Future<T> f : futures)
                    f.cancel(true);
            }
        }

    以上就是AbstractExecutorServcie中两个值得注意的地方,剩下的invokeAll方法由于比较简单就不介绍了。下面介绍一下真正实现线程池的关键类,ThreadPoolExecutor。

    ThreadPoolExecutor

    感觉这章要写不完了。。另起一章讲这个ThreadPoolExecutor吧。。

     

  • 相关阅读:
    【剑指Offer】面试题55
    一大波趣图:CSS的力量
    你必须收藏的Github技巧
    3月份GitHub上最热门的Java开源项目
    趣图:快下班了,剩一个bug,修复一下再走
    Java程序员必备的Intellij插件
    为什么前后端分离了,你比从前更痛苦?
    趣图:好好干,今天再加个班
    高并发下的下单功能设计
    Mybatis的基本要素--核心对象
  • 原文地址:https://www.cnblogs.com/crazybit/p/3150806.html
Copyright © 2011-2022 走看看