zoukankan      html  css  js  c++  java
  • FutureTask源码分析

    说FutureTask之前,我们先来说一下Future这个接口。

    Future 接口上有许多注释,总结下来如下:

    • 定义了异步计算的接口,提供了计算是否完成的 check、等待完成和取回等多种方法;
    • 如果想得到结果可以使用 get 方法,此方法(无参方法)会一直阻塞到异步任务计算完成;
    • 取消可以使用 cancel 方法,但一旦任务计算完成,就无法被取消了;

    主要方法如下:

    // 如果任务已经成功了,或已经取消了,是无法再取消的,会直接返回取消成功(true)
    // 如果任务还没有开始进行时,发起取消,是可以取消成功的。
    // 如果取消时,任务已经在运行了,mayInterruptIfRunning 为 true 的话,就可以打断运行中的线程
    // mayInterruptIfRunning 为 false,表示不能打断直接返回
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 返回线程是否已经被取消了,true 表示已经被取消了
    // 如果线程已经运行结束了,isCancelled 和 isDone 返回的都是 true
    boolean isCancelled();
    
    // 线程是否已经运行结束了
    boolean isDone();
    
    // 等待结果返回
    // 如果任务被取消了,抛 CancellationException 异常
    // 如果等待过程中被打断了,抛 InterruptedException 异常
    V get() throws InterruptedException, ExecutionException;
    
    // 等待,但是带有超时时间的,如果超时时间外仍然没有响应,抛 TimeoutException 异常
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    

    Future 主要定义了各种对任务进行管理的方法,比如说取消任务,得到任务的计算结果等等。

    接下来再说说RunnableFuture这个接口,这个接口就有意思了。

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    RunnableFuture继承了Runnable和Future这两个接口。RunnableFuture 接口的最大目的,就是让 Future 可以对 Runnable 进行管理,可以取消 Runnable,查看 Runnable 是否完成等等。

    最后,就是我们今天要讲的FutureTask。

    public class FutureTask<V> implements RunnableFuture<V> {...}
    

    FutureTask 实现了 RunnableFuture 接口,也就是说间接实现了 Runnnable 接口(RunnableFuture 实现了 Runnnable 接口),就是说 FutureTask 本身就是个 Runnnable,同时 FutureTask 也实现了 Future,也就是说 FutureTask 具备对任务进行管理的功能(Future 具备对任务进行管理的功能)。

    FutureTask的属性如下所示:

    // 任务状态 使用volatile修饰
    private volatile int state;
    private static final int NEW          = 0;//线程任务创建
    private static final int COMPLETING   = 1;//任务执行中
    private static final int NORMAL       = 2;//任务执行结束
    private static final int EXCEPTIONAL  = 3;//任务异常
    private static final int CANCELLED    = 4;//任务取消成功
    private static final int INTERRUPTING = 5;//任务正在被打断中
    private static final int INTERRUPTED  = 6;//任务被打断成功
    
    // 组合了 Callable 
    private Callable<V> callable;
    // 异步线程返回的结果
    private Object outcome; 
    // 当前任务所运行的线程 使用volatile修饰
    private volatile Thread runner;
    // 记录调用 get 方法时被等待的线程 使用volatile修饰
    private volatile WaitNode waiters;
    
    
       // state属性的偏移量
        private static final long stateOffset;
       // runner属性的偏移量
        private static final long runnerOffset;
       // waiter属性的偏移量
        private static final long waitersOffset;
    

    Callable 是FutrueTask的一个属性,这也就让 FutureTask 具备了转化 Callable 和 Runnable 的功能。关于Callable和Runnable的区别,小伙伴可以自行去学习一下。至于FutureTask 是如何将Runnable转化为Callable,下面会说到。

    构造方法

    FutureTask提供了两种构造方法,分别如下:

    // 使用 Callable 进行初始化 
    public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
        // 任务状态初始化
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
    // 使用 Runnable 初始化,并传入 result 作为返回结果。
    // Runnable 是没有返回值的,所以 result 一般没有用,置为 null 就好了
    public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    FutureTask的两个构造器,只有一个目的,就是把入参都转化成 Callable,那么为什么不都转化成 Runnnable 呢?主要是因为 Callable 的功能比 Runnnable 丰富,Callable 有返回值,而 Runnnable 没有。

    第二个构造方法中,传入一个Runnable,最后却被转化为一个Callable,这是怎么实现的呢?点击进入Executors#callable方法,如下:

    public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    

    其底层调用的RunnableAdapter的构造方法。RunnableAdapter,顾名思义,是一个适配器类,主要对Runnable进行适配。因为Runnnable 和 Callable 两者都是接口,两者之间是无法进行转化的,这样一个类来进行转化。那么具体是如何转化的呢?

    // 将 Runnable 转化成 Callable 的工具类
    static final class RunnableAdapter<T> implements Callable<T> {
            // Runnable作为属性
            final Runnable task;
            // 返回结果
            final T result;
        
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
        
            public T call() {
                //实际调用的还是Runnable对象的run()方法
                task.run();
                //返回结果
                return result;
            }
        }
    

    可以看出:

    • 首先 RunnableAdapter 实现了Callable,所以 RunnableAdapter 就是 Callable
    • 其次 Runnable 是 RunnableAdapter 的一个属性,在构造 RunnableAdapter 的时候会传进来,并且在 call 方法里面调用 Runnable 的 run 方法;

    这是一个典型的适配模型,我们要把 Runnable 适配成 Callable,首先要实现 Callable 的接口,接着在 Callable 的 call 方法里面调用被适配对象(Runnable)的方法

    需要注意的是,使用FutureTask(Runnable runnable, V result)这个构造方法时,可以因为Runnable是没有返回值的,所以一般传入的Vnull

    核心方法

    下面根据一个小demo来看一下FutureTask是如何使用的。

     public static void main(String[] args) throws ExecutionException, InterruptedException {
    
    
            FutureTask<String> task = new FutureTask<>(() -> "futureTask demo");
           
         
         // run()方法可以直接调用,或者是开启新的线程进行调用
            task.run();
    
    //        Thread t = new Thread(task);
    //        t.start();
    
            String s = task.get();
    
            System.out.println(s);
    
        }
    

    run方法

    先来看一下run方法。

    public void run() {
             // 任务状态不是新建,或者当前任务已经有线程在执行了,直接返回
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                
                Callable<V> c = callable;
                // Callable 不为空,并且已经初始化完成
                if (c != null && state == NEW) {
                    // 需要返回的结果 
                    V result;
                    // 是否正常执行
                    boolean ran;
                    try {
                        //执行
                        result = c.call();
                        // 如果代码执行到这里,说明没有异常,正常执行了
                        ran = true;
                    } catch (Throwable ex) {
                        //异常处理
                        result = null;
                        ran = false;
                        // 设置异常结果
                        setException(ex);
                    }
                    //运行结束
                    if (ran)
                        //设置返回结果
                        set(result);
                }
            } finally {
               // 这里清空了执行的线程变量,与方法开头的判断对应 防止并发调用
                runner = null;
                // 必须在runner为空后重新读取状态,以防止中断被遗漏
                int s = state;
                // 如果被中断了
                if (s >= INTERRUPTING)
                // 如果正在中断或中断状态
                // 调用handlePossibleCancellationInterrupt执行线程让步
                // 确保来自cancel(true)的任何中断仅在运行或runAndReset时才传递给任务
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    需要注意的是:

    • run 方法是没有返回值的,通过给 outcome 属性赋值(set(result)),get 时就能从 outcome 属性中拿到返回值;
    • FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,在执行 c.call() 代码时,如果入参是 Runnable 的话, 调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。

    setException方法

    protected void setException(Throwable t) {
        // 将state的值由NEW设置为COMPLETING
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                //获取异常信息
                outcome = t;
                 // 将state的值设置为EXCEPTIONAL 代表异常
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    

    set方法

    protected void set(V v) {
        // 将state的值由NEW设置为COMPLETING
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 获取执行返回结果
                outcome = v;
                // 将state的值设置为NORMAL 代表正常执行结束
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    

    在setException和set方法中都调用到了finishCompletion方法,这个方法的作用后面再详解。另外,在这两个方法中,都是先将state由NEW设置为COMPLETING,再设置为其他的。因此,COMPLETING只是个瞬时的过渡状态。

    runAndReset方法

    这个方法与run相似但又不同。run方法没有返回值,一般情况下,一旦任务执行结束,任务执行结果确定,任务不能重新启动或取消。runAndReset方法返回boolean类型值,该方法却不会去设置执行任务的结果,但却会去重置任务的状态。

     protected boolean runAndReset() {
         // 任务状态不是新建,或者当前任务已经有线程在执行了,直接返回
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return false;
            boolean ran = false;
            int s = state;
            try {
                Callable<V> c = callable;
                if (c != null && s == NEW) {
                    try {
                        c.call(); // don't set result
                        ran = true;
                    } catch (Throwable ex) {
                        setException(ex);
                    }
                    //相比较与run方法,此处不会去设置任务执行结果 因而state的状态也不会改变
                }
            } finally {
                // 这里清空了执行的线程变量,与方法开头的判断对应 防止并发调用
                runner = null;
                s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
           //这里并没有改变state的状态,还是NEW状态
            return ran && s == NEW;
        }
    

    runAndReset和run方法最大的区别是 runAndReset 不需要设置返回值,并且不需要改变任务的状态,也就是不改变state的状态,一直是NEW状态。这样的话势必会造成正常情况下是无法获取程序的结果的。之所以这么做也是因为任务是要复用的,因为这个方法是用来做周期循环调度的。所以也不会改变状态,也不会设置结果值。具体的体现我们可以再ScheduleThreadPoolExecutor中具体查看。

    get方法

    get方法有两种,无参和带有过期时间的。两者区别在于,无参get()方法若是获取不到返回结果时,则会一直阻塞,直到获取结果。带有过期时间的get(long timeout, TimeUnit unit)方法,若是在指定时间内获取不到结果,则会直接抛出异常,停止等待。二者方法上没多大区别。

     public V get() throws InterruptedException, ExecutionException {
            int s = state;
         // 如果任务正在执行
            if (s <= COMPLETING)
                //执行任务
                s = awaitDone(false, 0L);
         // 任务执行成功,返回执行的结果
            return report(s);
        }
    
    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
        // 如果任务已经在执行中了,并且等待一定的时间后,仍然在执行中,直接抛出异常
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
        // 任务执行成功,返回执行的结果
            return report(s);
        }
    

    这两个方法都使用到了awaitDone和report方法,还是老规矩,先易后难,先来看report方法。

    private V report(int s) throws ExecutionException {
           // 临时变量,保存执行结果
            Object x = outcome;
           // 如果正常执行结束
            if (s == NORMAL)
                //返回执行结果
                return (V)x;
            // 任务被取消,抛出CancellationException异常
            if (s >= CANCELLED)
                throw new CancellationException();
        // 其他状态,抛出执行异常ExecutionException
            throw new ExecutionException((Throwable)x);
        }
    

    report方法比较简单,如果run方法已经运行完毕,result会通过set方法进行设置。在report方法中,根据state的状态来判断,如果任务正常执行结束,则直接返回执行结果。若不是,则分别返回相应的异常。

    awaitDone

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
           //计算截止时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
           // 当前线程被包装的等待队列节点
            WaitNode q = null;
           // 是否已经入队
            boolean queued = false;
           // 自旋
            for (;;) {
                // 如果线程被打断
                if (Thread.interrupted()) {         ------------------ 1
                    //移除结点q
                    removeWaiter(q);
                    // 抛出异常                       
                    throw new InterruptedException();
                }
    
                int s = state;
                //任务完成
                if (s > COMPLETING) {                ------------------ 2
                    
                    if (q != null)
                        //线程置为null
                        q.thread = null;
                        // 返回s             
                    return s;
                }
               //说明马上状态就改变了,因为COMPLETING是个瞬时状态 那么此时肯定不会入队了,所以让出时间片
                else if (s == COMPLETING) // cannot time out yet  -------3
                    // 让出cpu使用权
                    Thread.yield();
                else if (q == null)        ------------------------------4
                    //新建一个WaitNode结点
                    q = new WaitNode();
                   // 如果还没入队
                else if (!queued)         -----------------------------------5
                    //头插法 使自身成为新链表的头结点
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                // 如果使用了等待时间
                else if (timed) {          ----------------------------------6
                    // 计算剩余时间
                    nanos = deadline - System.nanoTime();
                    // 超时的话,就移除q 返回state
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    // 使当前线程进入等待
                    LockSupport.parkNanos(this, nanos);
                }
                else                 ----------------------------------7
                    LockSupport.park(this);
            }
        }
    

    执行任务的runner只能有一个,但是获取结果的waiter线程可以有多个。多个线程同时去调用get()获取结果,这些线程被包装成WaitNode并形成一个链表。WaitNode的结构很简单,一目了然,如下所示:

    static final class WaitNode {
            // 线程
            volatile Thread thread;
            // 下一个结点
            volatile WaitNode next;
            // 构造方法
            WaitNode() { thread = Thread.currentThread(); }
        }
    

    这里又是使用自旋来进行处理,果然是Doug Lea大神的风格。这里分支较多,我们从头开始捋一下逻辑。

    假定任务此刻没有执行完,也没有通过cancle(true)取消,即state == NEW:

    第一次循环:1、2、3都是false,进入到4,生成一个新的WaitNode结点,由于只能进入一个分支,开启第二次循环;

    第二次循环:1、2、3、4都是false,进入到5,如果加入队列不成功,下一次循环依然会进到这里,直到

    queued==true

    .

    .

    .

    第N次:1、2、3、4、5都是false,如果有超时限制,进入到6,判断超时与否,如果超时就将结点q移除并返回state;没有的话就使线程进入等待。如果没有超时,直接让线程进入等待状态。

    简而言之就是,awaitDone方法里有一个死循环,直到有一个确定的状态返回,如果状态大于 COMPLETING ,也就是 成功了,就返回该状态,如果正在进行中,则让出CPU时间片进行等待。如果都不是,则让该线程阻塞等待。在哪里唤醒呢?答案就在上面setException和set方法中还没有提到的finishCompletion方法。这个方法的主要作用就是唤醒处于等待状态的线程

    //删除并唤醒等待链表中所有的结点
    private void finishCompletion() {
            // assert state > COMPLETING;
            // 开始从头遍历WaitNode组成的链表
            for (WaitNode q; (q = waiters) != null;) {
                //清除等待线程
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        //解锁等待的线程
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            //唤醒线程
                            // 此处唤醒的线程,会在awaitDone继续for循环执行
                            LockSupport.unpark(t);
                        }
                        //当前结点的下一个结点
                        WaitNode next = q.next;
                        //遍历完了 退出循环
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        // 指向下一个结点 用于遍历链表
                        q = next;
                    }
                    break;
                }
            }
    
           //留给子类的扩展方法,这里什么都没做
            done();
           // 置空callable
            callable = null;        // to reduce footprint
        }
    

    这个方法的实现比较简单,依次遍历waiters链表,唤醒节点中的线程,然后把callable置空。
    被唤醒的线程会各自从awaitDone()方法中的LockSupport.park*()阻塞中返回,然后会进行新一轮的循环。在新一轮的循环中会返回执行结果(或者更确切的说是返回任务的状态)。

    cancel方法

    上面已经说过了,FutureTask实现RunnableTask,因而既是一个Runnable,也是一个Future。而Future是可以对任务进行管理和监控的。因此FutureTask也具备该属性,用户可以通过调用cancel方法取消任务的执行。

     public boolean cancel(boolean mayInterruptIfRunning) {
          //mayInterruptIfRunning true代表中断,false代表取消
          //只有state==New且通过cas修改state值成功 才往下执行 否则return false
            if (!(state == NEW &&
                  // mayInterruptIfRunning? NEW->INTERRUPTING:NEW->CANCELLED
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {   //中断的情况,设置线程中断
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            // 线程中断
                            t.interrupt();
                    } finally { 
                        // state INTERRUPTING->INTERRUPTED
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    

    我们可以看到,cancel同时具有取消和中断两种功能,只用一个方法就可以实现。

    1.当我们的任务还是NEW状态,又改变状态成功,这说明任务已经无法执行了,设置线程状态,如果不处于NEW状态,或者修改状态失败则直接结束方法。

    2.不满足的情况下就会判断任务是否为中断,如果中断的话就把线程的状态也设置为中断,并改变最终的状态。

    3.最终也还是调用finishCompletion释放等待的线程。

    cancel方法总结:

    如果run()尚未被执行 则将callable置空且修改状态为非NEW(这样run()方法就不会执行)

    如果run()正在执行且callable.call()尚未执行完成 则调用thread.interrpt()通知线程停止(只是通知 无法保证打断线程 具体原因自行查阅interrpt()资料) 由于cancle修改了state状态 所以setException()和set()无法保存结果

    如果run()执行完毕 或者 callable.call()执行完成 由于 state!=NEW 所以cancle()不继续执行 返回失败。

    总结

    通过 FutureTask, Runnnable、Callable和Future 都串起来了,使 FutureTask 具有三者的功能,统一了 Runnnable 和 Callable,更方便使用。同时在使用的时候,要注意以下几点:

    1.任务开始运行后,不能再次运行,保证只运行一次(runAndReset 方法除外);
    2.任务还未开始,或者任务已被运行,但未结束,这两种情况下都可以取消; 如果任务已经结束,则不可以被取消 。

    参考:

    https://www.imooc.com/read/47/article/870

    https://juejin.im/post/5efc9c405188252e884e698f

    https://juejin.im/post/5ab06d776fb9a028d140eb7a#heading-4

  • 相关阅读:
    Representation Data in OpenCascade BRep
    Render OpenCascade Geometry Surfaces in OpenSceneGraph
    Render OpenCascade Geometry Curves in OpenSceneGraph
    OpenCascade Shape Representation in OpenSceneGraph
    Geometry Surface of OpenCascade BRep
    Geometry Curve of OpenCascade BRep
    Tyvj2017清北冬令营入学测试
    Spfa算法模板
    洛谷1016 旅行家的预算
    洛谷1290 欧几里得的游戏
  • 原文地址:https://www.cnblogs.com/reecelin/p/13274320.html
Copyright © 2011-2022 走看看