zoukankan      html  css  js  c++  java
  • 并发系列(二)——FutureTask类源码简析

    背景

      本文基于JDK 11,主要介绍FutureTask类中的run()、get()和cancel() 方法,没有过多解析相应interface中的注释,但阅读源码时建议先阅读注释,明白方法的主要的功能,再去看源码会更快。

      文中若有不正确的地方欢迎大伙留言指出,谢谢了!

    1、FutureTask类图

      1.1 FutureTask简介

      FutureTask类图如下(使用IDEA生成)。如图所示,FutureTask实现了Future接口的所有方法,并且实现了Runnable接口,其中,Runnable接口的现实类用于被线程执行,而Future代表的是异步计算的结果。因此,FutureTask类可以理解为,执行run()(实现Runnable接口中的方法),通过Future的get()方法获取结果。

      1.2 FutureTask的属性

     //任务线程总共有七中状态如下:
        * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        /** The underlying callable; nulled out after running */
        //在run()方法中调用
        private Callable<V> callable;
        /** The result to return or exception to throw from get() */
        //任务执行结果,callable.call()正常执行的返回值
        private Object outcome; // non-volatile, protected by state reads/writes
        /** The thread running the callable; CASed during run() */
        //任务线程
        private volatile Thread runner;
        /** Treiber stack of waiting threads */
        //等待任务结果的线程组成的节点,放在链表对列中
        private volatile WaitNode waiters;

     2、源码解析

      2.1 run()方法

    public void run() {
            //1、若是任务的状态不是NEW,且使用CAS将runner置为当前线程则直接返回
            if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                //2、任务不为null,且state的状态为NEW的情况下才执行任务
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //执行任务并接收执行结果
                        result = c.call();
                        //正常执行结果则将标识置为true
                        ran = true;
                    } catch (Throwable ex) {
                        //3、任务发生异常,执行或cancel(),则结果置为null,并记录异常信息
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    //4、任务正常结束,则设置返回结果
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                //5、若是异常导致,走另一个流程
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

      1)若任务的状态不是NEW,或者使用CAS将runner置为当前线程失败,则直接返回的原因是防止多线程调用;

      2)再度确认任务执行的前置条件;

      3)任务执行异常,将result置为null,并记录异常,setException()源码如下:

    protected void setException(Throwable t) {
            //使用CAS将状态置为中间态COMPLETING
            if (STATE.compareAndSet(this, NEW, COMPLETING)) {
                outcome = t;
                STATE.setRelease(this, EXCEPTIONAL); // final state
                //任务处于结束态时,遍历唤醒等待result的线程
                finishCompletion();
            }
        }

      任务的状态变化为NEW  - >  COMPLETING  ->  EXCEPTIONAL

      4)任务正常结果则会设置result之后,唤醒waitNode的链表对列中等待任务结果的线程;

      5)异常后的调用逻辑如下:

     //保证调用cancel在run方法返回之前中断执行任务
        private void handlePossibleCancellationInterrupt(int s) {
            // It is possible for our interrupter to stall before getting a
            // chance to interrupt us.  Let's spin-wait patiently.
            if (s == INTERRUPTING)
                //自旋等待
                while (state == INTERRUPTING)
                //当前线程让出CPU执行权
                    Thread.yield(); // wait out pending interrupt
        }

       2.2  get()方法

      源码分析如下:

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                //等待任务完成
                s = awaitDone(false, 0L);
            //返回结果
            return report(s);
        }

      其中,等待过程分析如下:

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // The code below is very delicate, to achieve these goals:
            // - call nanoTime exactly once for each call to park
            // - if nanos <= 0L, return promptly without allocation or nanoTime
            // - if nanos == Long.MIN_VALUE, don't underflow
            // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
            //   and we suffer a spurious wakeup, we will do no worse than
            //   to park-spin for a while
            long startTime = 0L;    // Special value 0L means not yet parked
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                int s = state;
                //1、任务的状态已经处于最终的状态,则将任务线程的引用置为null,直接返回状态
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                //2、任务的状态为COMPLETING说明任务已经接近完成,则当前线程让出CPU权限以便任务执行线程获取到CPU执行权
                else if (s == COMPLETING)
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    Thread.yield();
                //3、当前线程被中断,则将当前线程从等待任务结果的对列中移除,并抛出异常
                else if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                //4、任务线程的状态小于COMPLETING,则将当前调用get()方法的线程新建一个Node
                else if (q == null) {
                    if (timed && nanos <= 0L)
                        return s;
                    q = new WaitNode();
                }
                //5、若由当前线程构成的Node未加入链表中,则加入
                else if (!queued)
                    queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
                //6、是否开启了超时获取结果
                else if (timed) {
                    final long parkNanos;
                    if (startTime == 0L) { // first time
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else {
                        long elapsed = System.nanoTime() - startTime;
                        //7、超时则从栈中移除当前线程
                        if (elapsed >= nanos) {
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed;
                    }
                    // nanoTime may be slow; recheck before parking
                    //当前线程挂起
                    if (state < COMPLETING)
                        LockSupport.parkNanos(this, parkNanos);
                }
                else
                    LockSupport.park(this);
            }
        }

      获取到返回的状态值后,根据其状态值判断是返回结果还是抛出异常。

      2.2 cancel()方法

    public boolean cancel(boolean mayInterruptIfRunning) {
            //1、若任务线程的状态为NEW,则将其状态从NEW置为INTERRUPTING、CANCELLED
            if (!(state == NEW && STATE.compareAndSet
                  (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                //CAS改变任务线程的状态失败,则直接返回false,表示cancel失败
                return false;
            try {    // in case call to interrupt throws exception
                //2、改变任务线程的状态成功后,根据是否中断running的任务线程的标识位,决定是否中断正在运行的任务线程
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        //任务线程不为null,则使用interrupt()中断
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        //设置状态
                        STATE.setRelease(this, INTERRUPTED);
                    }
                }
            } finally {
                //3、清理等待任务结果的等待线程
                finishCompletion();
            }
            return true;
        }

     3、总结

      1)执行run()方法,是在调用在Callable的call()方法,其实在初始化时被指定;

      2)调用get()方法,若是任务线程还在执行,则会把调用get的线程封装成waitNode塞入到FutureTask类内部的阻塞链表对列中,可以有多个线程同时调用get()方法;

      3)cancel()方法是通过对任务线程调用interrupt()实现;

  • 相关阅读:
    改进ls的实现(课下作业)
    stat命令的实现-mysate
    (选做)实现mypwd
    2019-2020-1 20175209 20175213 20175214 实验五 通讯协议设计
    2019-2020-1 20175209 20175213 20175214 实验四 外设驱动程序设计
    2019-2020-1 20175209 20175213 20175214 实验三 并发程序
    2019-2020-1 20175209 20175213 20175214 实验三 并发程序
    2019-2020-1 20175209 20175213 20175214 实验一 开发环境的熟悉
    2018-2019-2 20175213实验五 《网络编程与安全》实验报告
    2018-2019-2 20175213实验四 《Android开发基础》实验报告
  • 原文地址:https://www.cnblogs.com/love-yh/p/13375236.html
Copyright © 2011-2022 走看看