zoukankan      html  css  js  c++  java
  • 并发编程(四)TaskFuture

    并发编程(四)TaskFuture

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Future<Object> future = executorService.submit(() -> {
        TimeUnit.SECONDS.sleep(5);
        return 5;
    });
    Object result = future.get();
    

    ExecutorService 异步执行任务返回一个 Future,本节重点分析 Future 的 get 方法是如何拿到返回结果的呢?

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    

    下面我们重点分析 FutureTask 类

    FutureTask

    一、基本变量

    (1) 核心成员变量

    // 1. 执行的回调方法。如果是 Runnable 就通过 Executors#callable 包装成一个 Callable
    private Callable<V> callable;
    
    // 2. 保存计算结果或者异常信息。non-volatile, protected by state reads/writes
    private Object outcome;
    
    // 3. 执行 callable 的线程,run 方法中通过 CAS 保证原子性操作
    private volatile Thread runner;
    
    // 4. 等待结果的线程队列,eg: 不同的线程同时调用 get()
    //    这个队列使用 Treiber stack(可以理解为基于 CAS 的无锁的栈,先进后出)
    private volatile WaitNode waiters;
    

    (2) 状态变化

    /* 
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    

    任务执行正常结束前,state 会被设置成 COMPLETING,代表任务即将完成,接下来很快就会被设置为 NARMAL 或者 EXCEPTIONAL,这取决于调用 Runnable 中的 call() 方法是否抛出了异常。有异常则后者,反之前者。

    任务提交后、任务结束前取消任务,那么有可能变为 CANCELLED 或者 INTERRUPTED。在调用 cancel 方法时,如果传入 false 表示不中断线程,state 会被置为 CANCELLED,反之 state 先被变为 INTERRUPTING,后变为 INTERRUPTED。

    总结下,FutureTask 的状态流转过程,可以出现以下四种情况:

    1. 任务正常执行并返回。 NEW -> COMPLETING -> NORMAL
    2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL
    3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED
    4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED 

    补充:Unsafe

    Unsafe 是 JDK 底层的类库,位于 sun.misc.Unsafe 中,在 java.util.concurrent 广泛使用。

    private static final UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state"));
    
    // 更新 state 状态
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
    

    二、run

    /**
     * run 方法执行有两个条件:1. state=NEW; 2. runner=null
     * 1. 执行前 state=NEW & runner=null
     * 2. 执行中 state=NEW & runner=Thread.currentThread()
     * 3. 执行后 state!=NEW & runner=null,根据是否有异常执行 set(result) 或 setException(ex),无论执行成功与否都会更新 state 状态
     * 因此,多个线程同时调用 run 方法的情况 callable 也只会执行一次
     */
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // set 方法会回调钩子方法 done(),可能抛出异常
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
    
            // 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    protected void set(V v) {
        // 通过 CAS 状态来确认计算没有被取消,而且线程只执行了一次
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            // 必须将栈顶 CAS 为 null,否则重读栈顶并重试。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 遍历并唤醒栈中等待的线程
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    // 将 next 域置为 null,这样对 GC 友好
                    q.next = null; 
                    q = next;
                }
                break;
            }
        }
    
        /*
         * done 方法是暴露给子类的一个钩子方法。
         * 这个方法在 ExecutorCompletionService.QueueingFuture 中的 override 实现是把结果加到阻塞队列里。
         */
        done();
    
        callable = null;
    }
    
    private void handlePossibleCancellationInterrupt(int s) {
        /*
         * 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
         *
         * 实际上,当前调用 cancel 方法的线程不一定能够中断到本线程。
         * 有可能 cancel 方法里读到 runner 是 null,甚至有可能是其它并发调用 run/runAndReset 方法的线程。
         * 但是也没办法判断另一个线程在 cancel 方法中读到的 runner 到底是什么,所以索性自旋让出 CPU 时间片也没事。
         */
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
    }
    

    三、get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 如果线程已经执行完成直接返回
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    /**
     * 等待任务执行完毕,如果任务取消或者超时则停止
     * @param timed 为 true 表示设置超时时间
     * @param nanos 超时时间
     * @return 任务完成时的状态
     * @throws InterruptedException
     */
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            // 1. callable 已执行完成,无论成功或失败直接返回执行结果
            if (s > COMPLETING) {
                // 已执行完,为了 GC 需要清 q.thread
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 2. COMPLETING 是一个很短暂的状态,调用 Thread.yield 期望让出时间片,之后重试循环
            else if (s == COMPLETING)
                Thread.yield();
            // 3. 初始化节点,重试一次循环
            else if (q == null)
                q = new WaitNode();
            // 4. queued 记录是否已经入栈,此处准备将节点压栈
            else if (!queued)
                /*
                 *  这是 Treiber Stack 算法入栈的逻辑。
                 *  Treiber Stack 是一个基于 CAS 的无锁并发栈实现
                 *  更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
                 */
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            // 5. 如果有时限,判断是否超时,未超时则park剩下的时间。
            else if (timed) {
                nanos = deadline - System.nanoTime();
                // 超时,移除栈中节点
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    
    /**
     * 清理用于保存等待线程栈里的无效节点,所谓节点无效就是内部的 thread 为 null(类比 ThreadLocalMap)
     *   
     * 一般有以下几种情况:
     * 1. 节点调用 get 超时。
     * 2. 节点调用 get 中断。
     * 3. 节点调用 get 拿到 task 的状态值(> COMPLETING)。
     *
     * 此方法干了两件事情:
     * 1. 置标记参数 node 的 thread 为 null
     * 2. 清理栈中的无效节点
     *
     * 如果在遍历过程中发现有竞争则重新遍历栈。
     */
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                // pre -> current -> next,如果 current 无效就把 pre.next=next
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    // 1. 如果当前节点仍有效,则置 pred 为当前节点,继续遍历
                    if (q.thread != null)
                        pred = q;
    
                    // 2. 当前节点已无效且有前驱,则将前驱的后继置为当前节点的后继实现删除节点。
                    //    如果前驱节点已无效,则重新遍历 waiters 栈。
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    }
                    // 3. 当前节点已无效,且当前节点没有前驱,则将栈顶置为当前节点的后继。
                    //    失败的话重新遍历 waiters 栈。
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    
    /**
     * 导出结果。
     */
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 1. 正常执行完计算任务
        if (s == NORMAL)
            return (V)x;
        // 2. 取消
        if (s >= CANCELLED)
            throw new CancellationException();
        // 3. 执行计算任务时发生异常
        throw new ExecutionException((Throwable)x);
    }
    

    四、cancal

    /**
     * mayInterruptIfRunning=false 时,不允许在线程运行时中断,设成 true 的话就允许但不保证一定会中断线程。
     * 1. true 时,将状态修改成 INTERRUPTING,执行 thread.interrupt()
     * 2. false 时,将状态修改成 CANCELLED
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    

    参考:

    1. 《FutureTask 源码解读》:http://www.cnblogs.com/micrari/p/7374513.html

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    PAT 甲级 1126 Eulerian Path (25 分)
    PAT 甲级 1126 Eulerian Path (25 分)
    PAT 甲级 1125 Chain the Ropes (25 分)
    PAT 甲级 1125 Chain the Ropes (25 分)
    PAT 甲级 1124 Raffle for Weibo Followers (20 分)
    PAT 甲级 1124 Raffle for Weibo Followers (20 分)
    PAT 甲级 1131 Subway Map (30 分)
    PAT 甲级 1131 Subway Map (30 分)
    AcWing 906. 区间分组 区间贪心
    AcWing 907. 区间覆盖 区间贪心
  • 原文地址:https://www.cnblogs.com/binarylei/p/10068923.html
Copyright © 2011-2022 走看看