zoukankan      html  css  js  c++  java
  • JUC源码分析-线程池篇(二)FutureTask

    JUC源码分析-线程池篇(二)FutureTask

    JDK5 之后提供了 Callable 和 Future 接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。本文从源代码角度分析下具体的实现原理。

    1. 接口介绍

    FutureTask 类结构

    1.1 Callable 接口

    对于需要执行的任务需要实现 Callable 接口,Callable 接口定义如下:

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    可以看到 Callable 是个泛型接口,泛型 V 就是要 call() 方法返回的类型。Callable 接口和 Runnable 接口很像,都可以被另外一个线程执行,但是正如前面所说的,Runnable 不会返回数据也不能抛出异常。

    1.2 Future 接口

    Future 接口代表异步计算的结果,通过 Future 接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future 接口的定义如下:

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    • cancel(): 用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回 false。如果任务还没有被执行,则会返回 true 并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若 mayInterruptIfRunning 为 true,则会立即中断执行任务的线程并返回 true,若 mayInterruptIfRunning 为 false,则会返回 true 且不会中断任务执行线程。
    • isCanceled(): 判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回 true,否则返回 false。
    • isDone(): 判断任务是否已经完成,如果完成则返回 true,否则返回 false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回 true。
    • get(): 获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出 CancellationException 异常,如果任务执行过程发生异常则会抛出 ExecutionException 异常,如果阻塞等待过程中被中断则会抛出 InterruptedException 异常。
    • get(long timeout,Timeunit unit): 带超时时间的 get() 版本,如果阻塞等待过程中超时则会抛出 TimeoutException 异常。

    1.3 FutureTask

    Future 只是一个接口,不能直接用来创建对象,FutureTask 是 Future 的实现类,FutureTask 的结构如下:

    FutureTask -> FutureRunnable -> Future/Runnable 
    

    可以看到,FutureTask 实现了 RunnableFuture 接口,则 RunnableFuture 接口继承了 Runnable 接口和 Future 接口,所以 FutureTask 既能当做一个 Runnable 直接被 Thread 执行,也能作为 Future 用来得到 Callable 的计算结果。

    2. FutureTask 源码分析

    2.1 FutureTask 生命周期

    private volatile int state;
    private static final int NEW          = 0;  // 初始状态
    private static final int COMPLETING   = 1;  // 任务已经执行完(正常或者异常),准备赋值结果
    private static final int NORMAL       = 2;  // 任务已经正常执行完,并已将任务返回值赋值到结果
    private static final int EXCEPTIONAL  = 3;  // 任务执行失败,并将异常赋值到结果
    private static final int CANCELLED    = 4;  // 取消
    private static final int INTERRUPTING = 5;  // 准备尝试中断执行任务的线程
    private static final int INTERRUPTED  = 6;  // 对执行任务的线程进行中断(未必中断到)
    

    FutureTask 状态变化如图:

    FutureTask状态

    • NEW: 表示是个新的任务或者还没被执行完的任务。这是初始状态。
    • COMPLETING: 任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到 outcome 字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到 COMPLETING。但是这个状态会时间会比较短,属于中间状态。
    • NORMAL: 任务已经执行完成并且任务执行结果已经保存到 outcome 字段,状态会从 COMPLETING 转换到 NORMAL。这是一个最终态。
    • EXCEPTIONAL: 任务执行发生异常并且异常原因已经保存到 outcome 字段中后,状态会从 COMPLETING 转换到 EXCEPTIONAL。这是一个最终态。
    • CANCELLED: 任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了 cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从 NEW 转化为 CANCELLED 状态。这是一个最终态。
    • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了 cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从 NEW 转化为 INTERRUPTING。这是一个中间状态。
    • INTERRUPTED: 调用 interrupt() 中断任务执行线程之后状态会从 INTERRUPTING 转换到 INTERRUPTED。这是一个最终态。

    可以看到 NEW 为起始状态,而 NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED 这些状态为终止状态,而 COMPLETING 和 INTERRUPTING 为中间暂时状态。为什么要引入 COMPLETING 和 INTERRUPTING 为两个中间状态呢?COMPLETING -> INTERRUPTED 中间有赋值 outcome=v 的过程,INTERRUPTING -> INTERRUPTED 有 t.interrupt 过程。这个过程要保证没有其它线程干扰。(个人理解)

    有一点需要注意的是,所有值大于 COMPLETING 的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

    2.2 内部结构

    // 内部封装的 Callable 对象。如果是 Runnable 则会通过 Executors#callable 封装成 Callable
    private Callable<V> callable;
    private Object outcome;             // 用于保存计算结果或者异常信息。
    private volatile Thread runner;     // 用来运行 Callable 的线程
    private volatile WaitNode waiters;  // FutureTask中用了 Trieber Stack 来保存等待的线程
                                        // 这个队列使用 Treiber stack(可以理解为基于 CAS 的无锁的栈,先进后出)
    

    FutureTask 用了 Treiber Stack 来维护等待任务完成的线程,在 FutureTask 的任务完成/取消/异常后在 finishCompletion 钩子方法中会唤醒栈中等待的线程。

    2.3 构造函数

    先从 FutureTask 的构造函数看起:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    

    可以看到,waiters 初始值为 null,初始状态为 NEW。

    一般发起任务线程跟执行任务线程通常情况下都不会是同一个线程,在任务执行线程(一般为线程池)执行任务的时候,任务发起线程可以查看任务执行状态、获取任务执行结果、取消任务等等操作,接下来分析下这些操作。

    2.4 任务执行线程

    2.4.1 核心方法 run

    executor.submit(task) 将任务提交到线程池中后,毫无疑问 FutureTask 会对原有的 run 方法进行包装,当 run 方法执行完成后设置结果值,并唤醒所有等待的线程。

    /**
     * run 方法执行有两个条件:1. state=NEW; 2. runner=null
     * 1. 执行前 state=NEW & runner=null
     * 2. 执行中 state=NEW & runner=Thread.currentThread()
     * 3. 执行后 state!=NEW & runner=null,根据是否有异常执行 set(result) 或 setException(ex),
     *    set/setException 都会更新 state 状态,之后线程的状态就不是 NEW
     * 因此,多个线程同时调用 run 方法的情况 callable 也只会执行一次
     */
    public void run() {
        // 1. state为 NEW 且对 runner 变量 CAS 成功。 对 state 的判断写在前面,是一种优化。 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                // 2. 是否成功运行的标志位,而不是把 set 方法写在 try 中,是为了不想捕获 set 的异常。
                //    比如:子类覆写 FutureTask#done 方法(set->finishCompletion >done) 抛出异常,
                //    然而实际上提交的任务是有正常的结果的。
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 3. 如果A/B两个线程同时执行到步骤①,当A线程重新将runner设置为 null 时,B线程是否可能会重新执行呢?
            //    实际上A线程一旦已经调用 set/setException 方法,整个流程已经结束了,所以不可能重复执行
            runner = null;
    
            // 4. 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法
            //    以至于线程在执行后续操作时被中断
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    总结: run 方法是对 callable 进行了一层包装,在执行完成后设置返回结果,并唤醒所有等待的线程。

    1. 判断当前任务的 state 不为 NEW 则说明任务或者已经执行过,或者已经被取消,直接返回。
    2. 如果状态为 NEW 则接着会通过 unsafe 类把任务执行线程引用 CAS 的保存在 runner 字段中,有且只有一个线程能成功,相当于一个独占锁。
    3. 如果任务执行完成,则调用 set 或 setException 方法,同时唤醒所有的等待线程。

    2.4.2 设置结果 set/setException

    // 设置返回结果,唤醒等待线程。思考为什么需要 COMPLETING 这个中间状态
    protected void set(V v) {
        // 可能任务已经取消,state 的状态就不再是 NEW
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    

    总结: 根据前面的分析,不管是任务执行异常还是任务正常执行完毕,或者取消任务,最后都会调用 finishCompletion() 方法,该方法实现如下:

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            // 1. 必须将栈顶 CAS 为 null,否则重读栈顶并重试。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 2. 遍历并唤醒栈中节点对应的线程。
                for (;;) {
                    Thread t = q.thread;        // 从头节点开始唤醒所有的等待线程
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);  // 唤醒等待线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
    
        // 3. ExecutorCompletionService#QueueingFuture 中把结果加到阻塞队列里。
        //    CompletionService 谁用谁知道,奥秘全在这。
        done();     // 子类覆盖
    
        // 4. callable 置为 null 主要为了减少内存开销,更多可以了解 JVM memory footprint 相关资料
        callable = null;        // to reduce footprint
    }
    

    总结: 这个方法的实现比较简单,依次遍历 waiters 链表,唤醒节点中的线程,然后把 callable 置空。被唤醒的线程会各自从 awaitDone() 方法中的 LockSupport.park() 阻塞中返回,然后会进行新一轮的循环。在新一轮的循环中会返回执行结果或者更确切的说是返回任务的状态。

    2.5 任务调用线程

    2.5.1 获取执行结果 get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)    // NEW 或 COMPLETING
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException(); // 超时了,任务还未执行结束
        return report(s);
    }
    

    总结: 如果任务还未执行结束就调用 awaitDone 阻塞当前线程,这里的结束包括任务正常执行完毕,任务执行异常,任务被取消。一旦任务执行结束则调用 report() 返回结果。

    // report 根据任务最终的状态,返回结果或抛出异常
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)    // 1. 正常执行完计算任务
            return (V)x;
        if (s >= CANCELLED) // 2. 取消
            throw new CancellationException();
        throw new ExecutionException((Throwable)x); // 3. 异常
    }
    

    2.5.2 等待任务结束 awaitDone

    当调用 get() 获取任务结果但是任务还没执行完成的时候,调用线程会调用 awaitDone() 方法进行阻塞等待,该方法定义如下:

    // 阻塞等待线程执行完成(正常、异常、取消)后返回
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // 1. 调用 get 的线程是否被其他线程中断
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            // 2. 任务已经执行完成,无论是正常、异常、取消
            if (s > COMPLETING) {   // 已经执行完成
                if (q != null)      // help GC
                    q.thread = null;
                return s;
            // 3. 结果正在赋值,让出 CPU 等待
            } else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 4. 初始化节点,重试一次循环
            else if (q == null)
                q = new WaitNode();
            // 5. queued 记录是否已经入栈,此处准备将节点压栈
            //    节点入队失败,自旋直至成功
            else if (!queued)
                // 这是 Treiber Stack算法入栈的逻辑。 Treiber Stack 是一个基于 CAS 的无锁并发栈实现
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            // 6. 挂起线程
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);    // 超时直接删除节点后返回
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this); // 只有任务取消(cancel)和任务执行结束(run),才能唤醒线程
                                        // 此时再一次for循环时state的状态肯定大于COMPLETING
        }
    }
    

    总结: awaitDone 有以下几种情况:

    1. 调用 get 的线程是否被其他线程中断,直接抛出 InterruptedException。
    2. 任务结束返回结果,这里的结果是指任务的状态。只要大于 COMPLETING 就表示任务结束。
    3. 调用 get 的线程自旋入队,并将线程挂起,等待唤醒。只有 finishCompletion 都会唤醒线程,而这个方法只有在任务取消 cancel 或任务执行结束 run 方法中才会调用。当然如果任务超时,直接就返回了,这个状态可能为 NEW。

    2.5.3 删除节点 removeWaiter

    /**
     * 清理用于保存等待线程栈里的无效节点,所谓节点无效就是内部的 thread 为 null(类比 ThreadLocalMap)
     *   
     * 一般有以下几种情况:
     * 1. 节点调用 get 超时。
     * 2. 节点调用 get 中断。
     * 3. 节点调用 get 拿到 task 的状态值(> COMPLETING)。
     *
     * 此方法干了两件事情:
     * 1. 置标记参数 node 的 thread 为 null
     * 2. 清理栈中的无效节点
     *
     * 如果在遍历过程中发现有竞争则重新遍历栈。
     */
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                // pre(前继节点) -> current(当前节点) -> next(后继节点),对应下面的 pre -> q -> s
                // 1.1 q.thread!=null 则更新 pre 节点继续遍历
                // 1.2 pre!=null && q.thread==null 时 current 不时头节点,直接删除 current
                // 1.3 pre==null && q.thread==null 时 current 是头节点,更新头节点为 next
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    // 1.1 保存当前节点 q 的前继节点 pre
                    if (q.thread != null)
                        pred = q;
                    // 1.2 q 节点已经失效,此时根据 q 是否是头节点分两种情况
                    //     q 不是头节点,直接删除,为什么不需要原子性操作?
                    else if (pred != null) {
                        pred.next = s;              // 踢除当前节点 q
                        if (pred.thread == null)    // check for race
                            continue retry;
                    // 1.3 q 是头节点且失效了,原子性更新头节点
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    

    总结: 删除节点时,执行到 1.2 时直接删除节点,没有使用 CAS 原子性操作,难道是线程安全的,还是说即使出现不一致的情况,也不影响最终的结果?

    2.5.4 取消任务 cancel

    /**
     * mayInterruptIfRunning:false 时不允许在线程运行时中断,true 时允许运行时中断线程,但不保证一定会中断线程。
     * 1. true 时,将状态修改成 INTERRUPTING,执行 thread.interrupt()
     * 2. false 时,将状态修改成 CANCELLED
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();  // interrupt 不一定能中断线程
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion(); // 唤醒所有等待线程
        }
        return true;
    }
    

    总结: cancel 方法会做下面几件事:

    1. 修改任务的状态。需要注意的是参数名称既然叫 mayInterruptIfRunning 则说明可能无法通过 interrupt 停止任务。要看业务线程对 interrupt 是否响应。如下面的代码将 while 条件改为 true 后,调用 cancel 根本无法中断任务。

      // !Thread.interrupted() 改为 true 后无法中断任务
      Future<?> future = executorService.submit(() -> { 
          while (!Thread.interrupted()) System.out.println("1"); 
      });
      future.cancel(true);
      
    2. 调用 finishCompletion 唤醒所有的等待任务结果的线程。

    3. Other

    3.1 FutureTask 版本说明

    与 JDK6 版本不同,JDK7 的 FutureTask 不再基于 AQS 来构建,而是在内部采用简单的 Treiber Stack 来保存等待线程。

    executor.submit(task1).cancel(true);  
    executor.submit(task2);  
    

    看上面的代码,虽然中断的是 task1,但可能 task2 得到中断信号。参考 FutureTask jdk6不同的区别

    3.2 FutureTask 问题

    参考FutureTask存在的问题

    3.3 FutureTask 使用注意事项

    (1) 线程池使用FutureTask

    线程池使用 FutureTask 的时候如果拒绝策略设置为了 DiscardPolicy 和 DiscardOldestPolicy 并且在被拒绝的任务的 Future 对象上调用无参 get 方法那么调用线程会一直被阻塞。所以当使用 Future 的时候,尽量使用带超时时间的 get 方法。参考线程池使用FutureTask时候需要注意的一点事

    3.4 Treiber Stack

    Treiber Stack 在 R. Kent Treiber 在 1986 年的论文 Systems Programming: Coping with Parallelism 中首次出现。它是一种无锁并发栈,其无锁的特性是基于 CAS 原子操作实现的。具体实现参考《Java并发编程实战》一书的 15.4.1 小结中的实现。

    1. Treiber Stack介绍
    2. Treiber Stack简单分析
    3. Synchronized方式和CAS方式实现线程安全性能思考

    参考:

    1. 本文转载至 《深入学习 FutureTask》:http://www.importnew.com/25286.html
    2. 《FutureTask源码解读》:https://www.cnblogs.com/micrari/p/7374513.html

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    use tomcat to access the file cross the environment
    data audit on hadoop fs
    Good practice release jar to Nexus
    套路!从Ruby 到 Cocoapods的发布
    单元测试之NSNull 检测
    UIwebView 和 H5交互详情
    IT 需要知道的一些专业名词和解释 (长期更新)
    Git 操作 学习资源 网址
    GCD
    软件工程——个人总结
  • 原文地址:https://www.cnblogs.com/binarylei/p/10958885.html
Copyright © 2011-2022 走看看