zoukankan      html  css  js  c++  java
  • Future 的使用与源码解析

    Future 使用

    Future 的使用:

    • 使用线程池创建 Future 对象

      ExecutorService executorService = Executors.newSingleThreadExecutor();
      Future future = executorService.submit();
      
    • 调用 Future 的 get 方法获取返回值

      • 异步获取结果

        自旋判断任务是否已完成,完成后才调用 get 方法。

        while (!future.isDone()) {
            return future.get();
        }
        
      • 阻塞获取结果

        调用 get 方法会阻塞当前主线程,直到任务执行完成返回结果。

        return future.get();
        

    Future 的特点:

    • 在高并发环境下确保 任务只执行一次
    • 异步获取任务结果
    • 可以取消任务

    Future 的代码逻辑:

    • 线程池中,submit() 方法实际上将 Callable 封装在 FutureTask 中,将其作为 Runnable 的子类传给 execute() 真正执行;

    • FutureTask 在 run() 中调用 Callable 对象的 call() 方法并接收返回值捕获异常保存在 Object outcome 中,同时管理执行过程中的状态 state

    • FutureTask 同时作为 Future 的子类,通过 get() 返回任务的执行结果,若未执行完成则通过等待队列进行阻塞等待完成。

    我靠(call) ,我的未来(Future)在哪里???

    Future 源码

    FutureTask 实现了 Runnable 和 Future 两个接口,在 run() 方法中执行完计算时应该将结果保存起来以便通过 get() 获取。

    public interface Future<V> {
    
        boolean cancel(boolean mayInterruptIfRunning);
    
        boolean isCancelled();
    
        boolean isDone();
    
        V get() throws InterruptedException, ExecutionException;
    
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    
    }
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
    
        /**
         * Sets this Future to the result of its computation unless it has been cancelled.
         */
        void run();
    
    }
    
    FutureTask 属性
    public class FutureTask<V> implements RunnableFuture<V> {
    
        /**
     	 * The run state of this task, initially NEW.  The run state
         * transitions to a terminal state only in methods set,
         * setException, and cancel.  During completion, state may take on
         * transient values of COMPLETING (while outcome is being set) or
         * INTERRUPTING (only while interrupting the runner to satisfy a
         * cancel(true)). Transitions from these intermediate to final
         * states use cheaper ordered/lazy writes because values are unique
         * and cannot be further modified.
         *
         * state 的状态变化可以有四种方式
         * Possible state transitions:
         * NEW(0) -> COMPLETING(1) -> NORMAL(2)
         * NEW(0) -> COMPLETING(1) -> EXCEPTIONAL(3)
         * NEW(0) -> CANCELLED(4)
         * NEW(0) -> INTERRUPTING(5) -> INTERRUPTED(6)
         */
        private volatile int state;
        // 新建
        private static final int NEW          = 0;
        // 执行中
        private static final int COMPLETING   = 1;
        // 正常结束
        private static final int NORMAL       = 2;
        // 异常结束
        private static final int EXCEPTIONAL  = 3;
        // 被取消
        private static final int CANCELLED    = 4;
        // 中断中
        private static final int INTERRUPTING = 5;
        // 被中断
        private static final int INTERRUPTED  = 6;
    
        // 真正要调用 call 方法执行的线程逻辑
        /** The underlying callable; nulled out after running */
        private Callable<V> callable;
    
        // 表示通过 get 方法获取到的执行结果或者异常信息
        /** The result to return or exception to throw from get() */
        private Object outcome; // non-volatile, protected by state reads/writes
    
        // 运行 Callable 的线程,运行期间会使用 cas 保证线程安全
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
    
        // waiters 设计的意义:https://www.zhihu.com/question/60123950
        // Future 的 get()/get(timeout) 在 task 处于非完成状态时是需要阻塞等待的
        // 如果多个线程调用 get() 方法,就需要一个链表/队列来维护这些等待线程,等到任务完成再一一唤醒这些等待的线程
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters;
    
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    
        // done() 方法是一个空的方法体,交由子类来实现具体的业务逻辑。
        protected void done() { }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long stateOffset;
        private static final long runnerOffset;
        private static final long waitersOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = FutureTask.class;
                // 记录 state 的偏移量
                stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
                // 记录 runner 的偏移量
                runnerOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("runner"));
                // 记录 waiters 的偏移量
                waitersOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiters"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    
    }
    
    isCancelled
    // 判断任务在完成之前是否被取消
    public boolean isCancelled() {
        // 大于等于 CANCELLED 的状态有:CANCELLED、INTERRUPTING 和 INTERRUPTED
        // 都表示线程被取消
        return state >= CANCELLED;
    }
    
    isDone
    // 判断任务是否已经完成
    public boolean isDone() {
        // 任务正常结束、抛出异常退出、被取消、被中断都表示任务已完成
        return state != NEW;
    }
    
    cancel
    // 取消任务
    // mayInterruptIfRunning:参数表示是否中断正在执行的任务线程
    /**
        1. 如果是 cancel(false) 那么 Task 的状态变化就是
           NEW -> CANCELLED
        2. 如果是 cancel(true) 那么 Task 的状态化就是
           NEW -> INTERRUPTING -> INTERRUPTED
        */
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 1. state == NEW,才取消或中断
        // 2. 根据 mayInterruptIfRunning 参数,将 state 状态设置成中断中或已取消
        // 如果 ! 后面的执行结果不是 true,说明 state 不是 NEW,或者对 state 的 cas 操作没有成功,则返回 false
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                       mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            // 执行到这里,说明当前状态要么是 INTERRUPTING,要么是 CANCELLED
            // 而如果 mayInterruptIfRunning true,说明当前状态是 INTERRUPTING,那么就调用系统中断方法然后把状态设置成 INTERRUPTED
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    // 将状态设置成 INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    
    finishCompletion

    cancel、set、setException 方法最后都会调用 finishCompletion 方法释放 waiters 中的阻塞的线程。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            // 将 waitersOffset 置空
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 如果 FutureTask 的 waiters 置空成功了,就唤醒 waiters 中的所有等待线程
                for (;;) {
                    // 获取线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        // 唤醒线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    // next == null 表示 q 无后续节点
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        // done() 方法是一个空的方法体,交由子类来实现具体的业务逻辑。
        done();
    	// callable 置空(等待的线程直接获取处理结果)
        callable = null;        // to reduce footprint
    }
    
    get
    // 获取线程执行结果,任务未执行完成会阻塞当前主线程
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 判断当前任务是否执行完成
        if (s <= COMPLETING)
            // 如果还没有执行成功,则当前主线程阻塞等待异步子线程执行结束
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    get(long timeout, TimeUnit unit)
    // 获取线程执行结果,任务未执行完成会阻塞当前主线程
    // 但是等待时间超过 timeout 后会抛出 TimeoutException 超时异常
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // s <= COMPLETING,则执行带超时时间的 awaitDone 方法
        // 如果 awaitDone() 超时返回之后任务还没结束,则抛出超时异常
        if (s <= COMPLETING &&
            // 到达超时时间退出阻塞,并返回任务当前状态
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            // awaitDone 方法超时返回后,如果没有完成(s <= COMPLETING),则跑超时异常
            throw new TimeoutException();
        return report(s);
    }
    
    awaitDone
    // timed:是否超时等待
    // nanos:如果有超时等待,该值则为超时等待时间
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 如果有超时等待,则获取超时等待的截止时间,否则为 0
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        // 是否入队(waiters)
        boolean queued = false;
    
        // 当一个线程调用了 get() 方法后,如果任务未执行完成,就会进入死循环,一直判断当前的任务状态
        // 如果当前任务是第一次调用 get 方法,那么会把当前任务封装成一个 WaitNode 节点,添加到 queue 中。
        for (;;) {
            // 判断当前线程是否被中断
            // 具体的中断方式,可以通过 java.util.concurrent.FutureTask#cancel(boolean mayInterruptIfRunning) 进行设置
            if (Thread.interrupted()) {
                // 从等待队列中移除该节点
                removeWaiter(q);
                // 抛出中断异常
                throw new InterruptedException();
            }
    
            int s = state;
            // 如果当前线程已经运行完成,直接返回结果
            if (s > COMPLETING) {
                if (q != null)
                    // 置空当前的线程引用
                    // q = new WaitNode() 在默认构造函数里赋的值:thread = Thread.currentThread()
                    q.thread = null;
                return s;
            }
            // s == COMPLETING 是一个中间态
            // 在 set 方法中,将 call 的返回值设置到 outcome 这个成员变量,随后将状态设为 NORMAL,表示任务完成
            // 只需要让出当前调用 get 方法的线程资源即可
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                // 第一个调用 get() 方法,为当前线程创建一个 WaitNode 等待节点
                // 然后在下一次循环中,将 q 添加到 waiters 链表中
                q = new WaitNode();
            // 如果还没有入队,将 waiters 追加到 q 节点后
            else if (!queued)
                // 将节点 q 加到 waiters 头部(q.next = waiters,如果 cas 操作不成功,也没有影响)
                // 添加成功后 queued = true
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果已入队,继续顺序,执行到这里
            // 判断是否有超时限制,如果有,需要在指定的时间内获取结果
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    // 达到超时时间,删除 waiters 上的节点
                    removeWaiter(q);
                    // 到达超时时间退出阻塞,返回当前状态
                    return state;
                }
                // 阻塞指定的时间,自动唤醒
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 没有时间限制,一直阻塞当前线程
                // set、setException 还有 cancel 方法最后都会调用 finishCompletion 方法
                // 在 finishCompletion 方法里会唤醒该阻塞线程
                // 唤醒后继续执行该 for 循环,直到任务已结束,退出 awaitDone 方法
                // 退出 awaitDone 方法后,调用 get 方法获取执行结果
                LockSupport.park(this);
        }
    }
    
    removeWaiter
    // 移除等待节点,释放阻塞线程
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            // 当节点进来,先让节点的线程设置为空,表示该节点已经删除了
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                // 遍历 waiters
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    // 记录 q 的后继节点
                    s = q.next;
                    if (q.thread != null)
                        // 记录 q 的前驱节点
                        // 当遍历到 q 的下一个节点时,pred 就作为 q.next 的前序节点
                        pred = q;
                    // q 有前驱节点,将 q 从链表中剔除
                    else if (pred != null) {
                        // pred 是 q 的前序节点,s 是 q 的后继节点,这里是将 q 从链表中剔除
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    } 
                    // q 没有前驱节点,将 waiters 改为 q 的下一个节点 
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    
    report
    private V report(int s) throws ExecutionException {
        // 赋值输出结果
        Object x = outcome;
        // 任务正常结束,返回指定的返回值
        if (s == NORMAL)
            return (V)x;
        // 如果任务是被删除的话,直接抛出 CancellationException 异常
        if (s >= CANCELLED)
            throw new CancellationException();
        // 都不满足抛出 ExecutionException 异常
        throw new ExecutionException((Throwable)x);
    }
    
    set
    // 将执行结果赋值给 outcome
    protected void set(V v) {
        // 如果 state 是 NEW,把 state 设置成 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将执行结果 v 赋值给 outcome 属性
            outcome = v;
            // 将 state 设置成 NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    setException
    // 将异常信息赋值给 outcome
    protected void setException(Throwable t) {
        // 如果 state 是 NEW,把 state 设置成 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将异常对象 ex 赋值给 outcome 属性
            outcome = t;
            // 将 state 设置成 EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
    run
    public void run() {
        // 如果当前任务状态不为 NEW,说明任务已经在执行
        // 如果状态是 NEW,就会把当前执行任务的线程给 runner
        // 如果 runner 不为空,说明已经有线程在执行,退出执行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            // 如果要执行的任务不为空,并且状态 new 就执行
            // 所以如果 cancel 方法的设置 state 代码在该行代码之后执行,那么是不影响 run 方法执行的
            // 如果 cancel 方法在 set 或者 setException 前将 state 改变了,那么会影响返回值设置,也就不能获取返回值
            // 如果 cancel 方法在 set 或者 setException 后将 state 改变了,那么不会影响返回值的设置,但是也不能获取返回值(report 方法会抛异常)
            // finishCompletion 执行完成后,会将 callable 设置为 null
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行任务,并获取返回值
                    result = c.call();
                    // 如果没有发生意外就执行成功了
                    ran = true;
                } catch (Throwable ex) {
                    // 有异常
                    result = null;
                    ran = false;
                    // 设置异常
                    setException(ex);
                }
                // 执行成功了
                if (ran)
                    // 设置结果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            // 不管是否执行成功,都把 runner 设置成 null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            // 线程被取消或者被中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    FutureTask 并没有一个 RUNNING 的状态来标识该任务正在执行。正常的情况下,任务从开始创建直到运行完毕,这段过程的状态都是 NEW。

    runAndReset
    // 用于重复执行任务,返回 boolean 类型
    // 方法和 run() 方法类似
    // 这个方法设计为实现任务可以被重复执行的情况,SchduledThreadPoolExecutor 中使用的 ScheduledFutureTask 即为 FutureTask 的子类,其 run() 方法中判断如果是周期任务就调用此方法来执行任务。
    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    // 和 run 方法不同,这里只执行任务,不获取返回值
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        // 已成功执行完成,并且状态还是 NEW,则返回 true
        return ran && s == NEW;
    }
    
    handlePossibleCancellationInterrupt
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            // 当前任务状态为 INTERRUPTING,将当前线程占用的 CPU 资源释放
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    
        // assert state == INTERRUPTED;
    
        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }
    
  • 相关阅读:
    BZOJ1217: [HNOI2003]消防局的设立
    BZOJ3925: [Zjoi2015]地震后的幻想乡
    BZOJ2328: [HNOI2011]赛车游戏
    BZOJ1011: [HNOI2008]遥远的行星
    BZOJ2007: [Noi2010]海拔
    BZOJ1833: [ZJOI2010]count 数字计数
    BZOJ1026: [SCOI2009]windy数
    BZOJ1196: [HNOI2006]公路修建问题
    BZOJ1076: [SCOI2008]奖励关
    BZOJ2752: [HAOI2012]高速公路(road)
  • 原文地址:https://www.cnblogs.com/wu726/p/15609204.html
Copyright © 2011-2022 走看看