zoukankan      html  css  js  c++  java
  • 并发编程(四)TaskFuture

    并发编程(四)TaskFuture

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Future<Object> future = executorService.submit(() -> {
        TimeUnit.SECONDS.sleep(5);
        return 5;
    });
    Object result = future.get();
    

    ExecutorService 异步执行任务返回一个 Future,本节重点分析 Future 的 get 方法是如何拿到返回结果的呢?

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    

    下面我们重点分析 FutureTask 类

    FutureTask

    一、基本变量

    (1) 核心成员变量

    // 1. 执行的回调方法。如果是 Runnable 就通过 Executors#callable 包装成一个 Callable
    private Callable<V> callable;
    
    // 2. 保存计算结果或者异常信息。non-volatile, protected by state reads/writes
    private Object outcome;
    
    // 3. 执行 callable 的线程,run 方法中通过 CAS 保证原子性操作
    private volatile Thread runner;
    
    // 4. 等待结果的线程队列,eg: 不同的线程同时调用 get()
    //    这个队列使用 Treiber stack(可以理解为基于 CAS 的无锁的栈,先进后出)
    private volatile WaitNode waiters;
    

    (2) 状态变化

    /* 
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    

    任务执行正常结束前,state 会被设置成 COMPLETING,代表任务即将完成,接下来很快就会被设置为 NARMAL 或者 EXCEPTIONAL,这取决于调用 Runnable 中的 call() 方法是否抛出了异常。有异常则后者,反之前者。

    任务提交后、任务结束前取消任务,那么有可能变为 CANCELLED 或者 INTERRUPTED。在调用 cancel 方法时,如果传入 false 表示不中断线程,state 会被置为 CANCELLED,反之 state 先被变为 INTERRUPTING,后变为 INTERRUPTED。

    总结下,FutureTask 的状态流转过程,可以出现以下四种情况:

    1. 任务正常执行并返回。 NEW -> COMPLETING -> NORMAL
    2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL
    3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED
    4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED 

    补充:Unsafe

    Unsafe 是 JDK 底层的类库,位于 sun.misc.Unsafe 中,在 java.util.concurrent 广泛使用。

    private static final UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state"));
    
    // 更新 state 状态
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
    

    二、run

    /**
     * run 方法执行有两个条件:1. state=NEW; 2. runner=null
     * 1. 执行前 state=NEW & runner=null
     * 2. 执行中 state=NEW & runner=Thread.currentThread()
     * 3. 执行后 state!=NEW & runner=null,根据是否有异常执行 set(result) 或 setException(ex),无论执行成功与否都会更新 state 状态
     * 因此,多个线程同时调用 run 方法的情况 callable 也只会执行一次
     */
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // set 方法会回调钩子方法 done(),可能抛出异常
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
    
            // 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    protected void set(V v) {
        // 通过 CAS 状态来确认计算没有被取消,而且线程只执行了一次
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            // 必须将栈顶 CAS 为 null,否则重读栈顶并重试。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 遍历并唤醒栈中等待的线程
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    // 将 next 域置为 null,这样对 GC 友好
                    q.next = null; 
                    q = next;
                }
                break;
            }
        }
    
        /*
         * done 方法是暴露给子类的一个钩子方法。
         * 这个方法在 ExecutorCompletionService.QueueingFuture 中的 override 实现是把结果加到阻塞队列里。
         */
        done();
    
        callable = null;
    }
    
    private void handlePossibleCancellationInterrupt(int s) {
        /*
         * 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
         *
         * 实际上,当前调用 cancel 方法的线程不一定能够中断到本线程。
         * 有可能 cancel 方法里读到 runner 是 null,甚至有可能是其它并发调用 run/runAndReset 方法的线程。
         * 但是也没办法判断另一个线程在 cancel 方法中读到的 runner 到底是什么,所以索性自旋让出 CPU 时间片也没事。
         */
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
    }
    

    三、get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 如果线程已经执行完成直接返回
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    /**
     * 等待任务执行完毕,如果任务取消或者超时则停止
     * @param timed 为 true 表示设置超时时间
     * @param nanos 超时时间
     * @return 任务完成时的状态
     * @throws InterruptedException
     */
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            // 1. callable 已执行完成,无论成功或失败直接返回执行结果
            if (s > COMPLETING) {
                // 已执行完,为了 GC 需要清 q.thread
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 2. COMPLETING 是一个很短暂的状态,调用 Thread.yield 期望让出时间片,之后重试循环
            else if (s == COMPLETING)
                Thread.yield();
            // 3. 初始化节点,重试一次循环
            else if (q == null)
                q = new WaitNode();
            // 4. queued 记录是否已经入栈,此处准备将节点压栈
            else if (!queued)
                /*
                 *  这是 Treiber Stack 算法入栈的逻辑。
                 *  Treiber Stack 是一个基于 CAS 的无锁并发栈实现
                 *  更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
                 */
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            // 5. 如果有时限,判断是否超时,未超时则park剩下的时间。
            else if (timed) {
                nanos = deadline - System.nanoTime();
                // 超时,移除栈中节点
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    
    /**
     * 清理用于保存等待线程栈里的无效节点,所谓节点无效就是内部的 thread 为 null(类比 ThreadLocalMap)
     *   
     * 一般有以下几种情况:
     * 1. 节点调用 get 超时。
     * 2. 节点调用 get 中断。
     * 3. 节点调用 get 拿到 task 的状态值(> COMPLETING)。
     *
     * 此方法干了两件事情:
     * 1. 置标记参数 node 的 thread 为 null
     * 2. 清理栈中的无效节点
     *
     * 如果在遍历过程中发现有竞争则重新遍历栈。
     */
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                // pre -> current -> next,如果 current 无效就把 pre.next=next
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    // 1. 如果当前节点仍有效,则置 pred 为当前节点,继续遍历
                    if (q.thread != null)
                        pred = q;
    
                    // 2. 当前节点已无效且有前驱,则将前驱的后继置为当前节点的后继实现删除节点。
                    //    如果前驱节点已无效,则重新遍历 waiters 栈。
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    }
                    // 3. 当前节点已无效,且当前节点没有前驱,则将栈顶置为当前节点的后继。
                    //    失败的话重新遍历 waiters 栈。
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    
    /**
     * 导出结果。
     */
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 1. 正常执行完计算任务
        if (s == NORMAL)
            return (V)x;
        // 2. 取消
        if (s >= CANCELLED)
            throw new CancellationException();
        // 3. 执行计算任务时发生异常
        throw new ExecutionException((Throwable)x);
    }
    

    四、cancal

    /**
     * mayInterruptIfRunning=false 时,不允许在线程运行时中断,设成 true 的话就允许但不保证一定会中断线程。
     * 1. true 时,将状态修改成 INTERRUPTING,执行 thread.interrupt()
     * 2. false 时,将状态修改成 CANCELLED
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    

    参考:

    1. 《FutureTask 源码解读》:http://www.cnblogs.com/micrari/p/7374513.html

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    负载均衡之加权轮询算法(转)
    go 指南学习笔记
    select限制之文件描述符限制
    select的限制
    select实现超时(套接字IO超时设置)
    如何在CentOS 8上安装Puppet
    如何在Ubuntu 20.04 / 18.04或更老版本中安装ifconfig
    关于Ubuntu的Apt安装与使用介绍
    如何在CentOS 8上安装Suricata?
    如何在Ubuntu 20.04上安装PHP Composer
  • 原文地址:https://www.cnblogs.com/binarylei/p/10068923.html
Copyright © 2011-2022 走看看