zoukankan      html  css  js  c++  java
  • 【JUC源码解析】FutureTask

    简介

    FutureTask, 一个支持取消行为的异步任务执行器。

    概述

    FutureTask实现了Future,提供了start, cancel, query等功能,并且实现了Runnable接口,可以提交给线程执行。

    源码分析

    状态

    1     private volatile int state; // 执行器状态
    2     private static final int NEW = 0; // 初始值
    3     private static final int COMPLETING = 1; // 完成进行时
    4     private static final int NORMAL = 2; // 正常结束
    5     private static final int EXCEPTIONAL = 3; // 发生异常
    6     private static final int CANCELLED = 4; // 已经取消
    7     private static final int INTERRUPTING = 5; // 中断进行时
    8     private static final int INTERRUPTED = 6; // 中断结束

    转换

    NEW -> COMPLETING -> NORMAL
    NEW -> COMPLETING -> EXCEPTIONAL
    NEW -> CANCELLED
    NEW -> INTERRUPTING -> INTERRUPTED

    属性

    1     private Callable<V> callable; // 任务
    2     private Object outcome; // 返回结果
    3     private volatile Thread runner; // 执行线程
    4     private volatile WaitNode waiters; // 等待线程们

    run()

     1     public void run() {
     2         // UNSAFE.compareAndSwapObject, CAS保证Callable任务只被执行一次
     3         if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
     4             return;
     5         try {
     6             Callable<V> c = callable; // 拿到执行任务
     7             if (c != null && state == NEW) { // 任务不为空,并且执行器状态是初始值,才会执行;如果取消就不执行了
     8                 V result;
     9                 boolean ran; // 记录是否执行成功
    10                 try {
    11                     result = c.call(); // 执行任务
    12                     ran = true; // 成功
    13                 } catch (Throwable ex) {
    14                     result = null; // 异常,清空结果
    15                     ran = false; // 失败
    16                     setException(ex); // 记录异常
    17                 }
    18                 if (ran) // 问题:ran变量可以省略吗,把set(result);移到try块里面?
    19                     set(result); // 设置结果
    20             }
    21         } finally {
    22             runner = null; // 直到set状态前,runner一直都是非空的,为了防止并发调用run()方法。
    23             int s = state;
    24             if (s >= INTERRUPTING) // 有别的线程要中断当前线程,把CPU让出去,自旋等一下
    25                 handlePossibleCancellationInterrupt(s);
    26         }
    27     }
    1.  检查state,非NEW,说明已经启动,直接返回;否则,设置runner为当前线程,成功则继续,否则,返回。
    2. 调用Callable.call()方法执行任务,成功则调用set(result)方法,失败则调用setException(ex)方法,最终都会设置state,并调用finishCompletion()方法,唤醒阻塞在get()方法上的线程们。
    3. 接2,如注释所示,如果省略ran变量,并把"set(result);" 语句移动到try代码块"ran = true;" 语句处,会怎样呢?首先,从代码逻辑上看,是没有问题的,但是,考虑到"set(result);"方法万一抛出异常甚至是错误了呢?set()方法最终会调用到用户自定义的done()方法,所以,不可省略。
    4. 如果state为INTERRUPTING, 则主动让出CPU,自旋等待别的线程执行完中断流程。见handlePossibleCancellationInterrupt(int s) 方法。

     set(V)

    1     protected void set(V v) {
    2         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // CAS state NEW -> COMPLETING
    3             outcome = v; // 将结果赋值给outcome
    4             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 然后将state修改为NORMAL,表示现在可以获取返回结果
    5             finishCompletion(); // 唤醒等待的所有线程
    6         }
    7     }

    setException(Throwable)

    1     protected void setException(Throwable t) {
    2         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // CAS state NEW -> COMPLETING
    3             outcome = t; // 将结果赋值给outcome
    4             UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 然后将state修改为EXCEPTIONAL,表示现在可以获取返回异常信息
    5             finishCompletion(); // 唤醒等待的所有线程
    6         }
    7     }

    handlePossibleCancellationInterrupt(int)

    1     private void handlePossibleCancellationInterrupt(int s) {
    2         if (s == INTERRUPTING) // 当state为INTERRUPTING时
    3             while (state == INTERRUPTING) // 表示有线程正在中断当前线程
    4                 Thread.yield(); // 让出CPU,自旋等待中断
    5     }

    finishCompletion()

     1     private void finishCompletion() {
     2         for (WaitNode q; (q = waiters) != null;) {
     3             if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // CAS waiters -> null,避免重复
     4                 for (;;) {
     5                     Thread t = q.thread; // 获得结点对应的线程
     6                     if (t != null) { // 如果不为空
     7                         q.thread = null; // 复位thread属性
     8                         LockSupport.unpark(t); // 唤醒此线程
     9                     }
    10                     WaitNode next = q.next; // 取得下一个结点
    11                     if (next == null) // 如果为空则跳出内层循环,继而跳出外层循环
    12                         break;
    13                     q.next = null; // 复位next属性
    14                     q = next; // 移动到下一个结点
    15                 }
    16                 break; // 跳出外层循环
    17             }
    18             // 如果CAS失败,说明waiters发生变化,则重试,直至CAS成功,或者waiters为null(别的线程已经完成此操作)
    19         }
    20 
    21         done(); // 调用done()方法,hook钩子,子类实现
    22 
    23         callable = null; // 复位callable,其所表示的任务已结束
    24     }

    将结点一个一个地弹出,并唤醒其所持的线程。

    WaitNode

    1     static final class WaitNode {
    2         volatile Thread thread; // 线程
    3         volatile WaitNode next; // 指向下一个结点
    4 
    5         WaitNode() {
    6             thread = Thread.currentThread(); // 当前线程
    7         }
    8     }

    runAndReset()

     1     protected boolean runAndReset() {
     2         // UNSAFE.compareAndSwapObject, CAS保证Callable任务只被执行一次
     3         if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
     4             return false;
     5         boolean ran = false; // 记录是否执行成功
     6         int s = state; // 状态
     7         try {
     8             Callable<V> c = callable; // 任务
     9             if (c != null && s == NEW) { // 任务不为空,并且执行器状态是初始值,才会执行;如果取消就不执行了
    10                 try {
    11                     c.call(); // 不设置结果,可重复执行
    12                     ran = true;
    13                 } catch (Throwable ex) {
    14                     setException(ex); // 直至遇到异常
    15                 }
    16             }
    17         } finally {
    18             runner = null; // 置空,其他线程可以继续执行
    19             s = state; // 状态
    20             if (s >= INTERRUPTING) // 有别的线程要中断当前线程,把CPU让出去,自旋等一下
    21                 handlePossibleCancellationInterrupt(s);
    22         }
    23         return ran && s == NEW; // 返回结果,执行成功,并且,state为NEW,才返回true
    24     }

    get()

    1     public V get() throws InterruptedException, ExecutionException {
    2         int s = state; // 执行器状态
    3         if (s <= COMPLETING) // 如果状态小于等于COMPLETING,说明任务正在执行,需要等待
    4             s = awaitDone(false, 0L); // 等待
    5         return report(s); // 报告结果
    6     }

    get(long, TimeUnit)

    1     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    2         if (unit == null) // 参数校验
    3             throw new NullPointerException();
    4         int s = state; // 执行器状态
    5         if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果状态小于等于COMPLETING,说明任务正在执行,需要等待;等待指定时间,state依然小于等于COMPLETING
    6             throw new TimeoutException(); // 抛出超时异常
    7         return report(s); // 报告结果
    8     }

    report(int)

    1     private V report(int s) throws ExecutionException {
    2         Object x = outcome; // 返回结果
    3         if (s == NORMAL) // 如果state为NORMAL,表示任务正常结束,返回结果
    4             return (V) x;
    5         if (s >= CANCELLED) // 如果任务大于等于CANCELLED,表示任务已经取消,抛出取消异常
    6             throw new CancellationException();
    7         throw new ExecutionException((Throwable) x); // 否则,抛出执行异常
    8     }

    awaitDone(boolean timed, long nanos)

     1     private int awaitDone(boolean timed, long nanos) throws InterruptedException {
     2         final long deadline = timed ? System.nanoTime() + nanos : 0L; // 计算deadline
     3         WaitNode q = null; // 等待结点
     4         boolean queued = false; // 是否已经入队
     5         for (;;) {
     6             if (Thread.interrupted()) { // 如果当前线程已经标记中断,则直接移除此结点,并抛出中断异常
     7                 removeWaiter(q);
     8                 throw new InterruptedException();
     9             }
    10 
    11             int s = state; // 执行器状态
    12             if (s > COMPLETING) { // 如果状态大于COMPLETING,说明任务已经完成,或者已经取消,直接返回
    13                 if (q != null)
    14                     q.thread = null; // 复位线程属性
    15                 return s; // 返回
    16             } else if (s == COMPLETING) // 如果状态等于COMPLETING,说明正在整理结果,自旋等待一会儿
    17                 Thread.yield();
    18             else if (q == null) // 初始,构建结点
    19                 q = new WaitNode();
    20             else if (!queued) // 还没入队,则CAS入队
    21                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
    22             else if (timed) { // 是否允许超时
    23                 nanos = deadline - System.nanoTime(); // 计算等待时间
    24                 if (nanos <= 0L) { // 超时
    25                     removeWaiter(q); // 移除结点
    26                     return state; // 返回结果
    27                 }
    28                 LockSupport.parkNanos(this, nanos); // 线程阻塞指定时间
    29             } else
    30                 LockSupport.park(this); // 阻塞线程
    31         }
    32     }

    removeWaiter(WaitNode)

     1     private void removeWaiter(WaitNode node) {
     2         if (node != null) { // 移除结点的标准是thread属性已经为null
     3             node.thread = null; // 每个线程对应一个结点,当需要移除时,仅仅需要将thread属性置为null即可,此过程可并发执行
     4             retry: for (;;) { // 然而,移除操作需要穿行协作,一有变动,须重试
     5                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { // 记录前驱结点,当前结点,以及后继结点
     6                     s = q.next; // 后继结点
     7                     if (q.thread != null) // 不为null, 前驱指向当前结点,即整体右移,左边的结点是安全的。表示不必删除
     8                         pred = q;
     9                     else if (pred != null) { // 如果当前线程为空,前驱不为null
    10                         pred.next = s; // 前驱越过当前结点指向后继结点,等于把当前结点移除了
    11                         if (pred.thread == null) // 如果前驱结点thread属性也为null,则很可能任务完成,线程unpark了,需要从头检查一下。
    12                             continue retry;
    13                     } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) // CAS失败,waiters变动,重试
    14                         continue retry;
    15                 }
    16                 break; // 跳出
    17             }
    18         }
    19     }

    cancel(boolean)

     1     public boolean cancel(boolean mayInterruptIfRunning) {
     2         // 状态不为NEW,或者CAS NEW -> INTERRUPTING/CANCELLED, 直接返回
     3         if (!(state == NEW
     4                 && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
     5             return false;
     6         try {
     7             if (mayInterruptIfRunning) { // 中断
     8                 try {
     9                     Thread t = runner;
    10                     if (t != null)
    11                         t.interrupt(); // 中断此线程
    12                 } finally {
    13                     UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // state设置为INTERRUPTED
    14                 }
    15             }
    16         } finally {
    17             finishCompletion(); // 唤醒阻塞线程
    18         }
    19         return true;
    20     }

    至此,结束。

      

    尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_futuretask.html

  • 相关阅读:
    LeetCode 88. Merge Sorted Array
    LeetCode 75. Sort Colors
    LeetCode 581. Shortest Unsorted Continuous Subarray
    LeetCode 20. Valid Parentheses
    LeetCode 53. Maximum Subarray
    LeetCode 461. Hamming Distance
    LeetCode 448. Find All Numbers Disappeared in an Array
    LeetCode 976. Largest Perimeter Triangle
    LeetCode 1295. Find Numbers with Even Number of Digits
    如何自学并且系统学习计算机网络?(知乎问答)
  • 原文地址:https://www.cnblogs.com/aniao/p/aniao_futuretask.html
Copyright © 2011-2022 走看看