zoukankan      html  css  js  c++  java
  • FutureTask源码完整解读

    1 简介

    上一篇博客“异步任务服务简介”对FutureTask做过简要介绍与分析,这里再次对FutureTask做一次深入的分析(基于JDK1.8)。

    FutureTask同时实现了Future 、Runnable接口,因此它可以交给执行器Executor去执行这个任务,也可以由调用线程直接执行run方法。

    根据FutureTask.run方法的执行状态,可将其分为以下3种状态

    ①未启动: run方法还未被执行,FutureTask处于未启动状态。

    ②已启动: run方法在执行过程中,FutureTask处于已启动状态

    ③已完成:run方法正常完成返回或被取消或执行过程中因异常抛出而非正常结束,FutureTask处于已完成状态。

    当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

    当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行 FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel方法将返回false (已完成的任务任务无法取消)。

    2 用法示例

    FutureTask因其自身继承于Runnable接口,因此它可以交给执行器Executor去执行;另外它也代表异步任务结果,它还可以通过ExecutorService.submit返回一个FutureTask。另外FutureTask也可单独使用。为了更好的理解FutureTask ,下面结合ConcurrentHashMap演示一个任务缓存。缓存中有多个任务,使用多线程去执行这些任务,一个任务最多被一个线程消费,若多个线程试图执行这一个任务,只允许一个线程来执行,其他线程必须等待它执行完成。

    import java.util.concurrent.*;
    
    public class FutureTaskTest {
        private final ConcurrentMap<String, Future<String>> taskCache = new ConcurrentHashMap<>();
        public  String executionTask(final String taskName)
                throws ExecutionException, InterruptedException {
            while (true) {
                Future<String> future = taskCache.get(taskName);// 从缓存中获取任务
                if (future == null) {//不存在此任务,新构建一个任务放入缓存,并启动这个任务
                    Callable<String> task = () ->{
                        System.out.println("执行的任务名是"+taskName);
                        return taskName;
                    } ; // 1.2创建任务
                    FutureTask<String> futureTask = new FutureTask<String>(task);
                    future = taskCache.putIfAbsent(taskName, futureTask);// 尝试将任务放入缓存中
                    if (future == null) {
                        future = futureTask;
                        futureTask.run();//执行任务
                    }
                }
                try { //若任务在缓存中了,可以直接等待任务的完成
                    return future.get();// 等待任务执行完成
                } catch (CancellationException e) {
                    taskCache.remove(taskName, future);
                }
            }
        }
    
        public static void main(String[] args)    {
         final   FutureTaskTest taskTest = new FutureTaskTest();
            for (int i = 0; i < 7; i++) {
                int finalI = i;
                new Thread(()->{
                    try {
                        taskTest.executionTask("taskName" + finalI);
                    } catch (ExecutionException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
                new Thread(()->{
                    try {
                        taskTest.executionTask("taskName" + finalI);
                        taskTest.executionTask("taskName" + finalI);
                    } catch (ExecutionException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    打印输出

    执行的任务名是taskName0

    执行的任务名是taskName5

    执行的任务名是taskName4

    执行的任务名是taskName6

    执行的任务名是taskName3

    执行的任务名是taskName1

    执行的任务名是taskName2

    3 实现原理

    1) 成员变量

    它有一个成员变量state表示状态

    private volatile int state;

    它有这些可能取值

    private static final int NEW          = 0;//刚开始的状态或任务在运行中
    private static final int COMPLETING   = 1;//临时状态,任务即将结束,正在设置结果
    private static final int NORMAL       = 2;//任务正常完成
    private static final int EXCEPTIONAL  = 3;//因抛出异常而结束任务
    private static final int CANCELLED    = 4;//任务被取消
    private static final int INTERRUPTING = 5;//任务正在被中断
    private static final int INTERRUPTED  = 6;//任务被中断(中断的最终状态)

    state可能有这几种状态转换

    /** NEW -> COMPLETING -> NORMAL      正常结束任务时的状态转换流程
         * NEW -> COMPLETING -> EXCEPTIONAL   任务执行过程中抛出了异常时的状态转换流程
         * NEW -> CANCELLED                 任务被取消时的状态转换流程
         * NEW -> INTERRUPTING -> INTERRUPTED  任务执行过程中出现中断时的状态转换流程
         */

    其他成员变量

    private Callable<V> callable;
    private Object outcome; // non-volatile, protected by state reads/writes
    private volatile Thread runner;
    private volatile WaitNode waiters;

    成员变量callable表示要执行的任务,

    成员变量outcome表示任务的结果或任务非正常结束的异常

    成员变量runner表示执行此任务的线程

    成员变量waiter表示等待任务执行结果的等待栈(数据结构是单向链表) 。WaitNode是一个简单的静态内部,一个成员变量thread表示等待结果的线程,另一个成员变量next表示下一个等待节点(线程)。

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

    2) 构造方法

    FutureTask的构造方法会初始化callable和state ,它有两个构造方法, 分别接受Callable和Runnable类型的待执行任务。但对于Runnable类型参数,它会调用Executors.callable将Runnable转换为Callable类型实例,以便于统一处理。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    Executors.callable方法也很简单,它就返回了一个Callable的实现类RunnableAdapter类型的对象。

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

    3) 主要API

    (1) run与runAndReset

    run方法是Funture最重要的方法,FutureTask的一切都是从run方法开始的,它是执行callable任务的方法。

    public void run() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                //将当前线程设置为执行任务的线程,CAS失败就直接返回
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();//执行任务
                        ran = true;
                    } catch (Throwable ex) {
                        //运行时有异常,设置异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//设置结果
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null; //state已是最终状态,不再变化,将runer设为null,防止run方法被并发调用
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state; //清空运行线程runner后再重新获取state,防止遗漏掉对中断的处理
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    其主要逻辑是:

    ①检查状态,设置运行任务的线程

    ②调用callable的call方法去执行任务,并捕获运行中可能出现的异常

    ③如果任务正常完成,调用set设置任务的结果,将state设为NORMAL, 将结果保存到outcome ,唤醒所有等待结果的线程

    ④若执行任务过程中发生了异常,调用setException设置异常,将state设为EXCEPTIONAL ,将此异常也保存到outcome ,唤醒所有等待结果的线程

    ⑤最后将运行线程runner清空,若状态可能是任务被取消的中断还要处理此中断。

    set 、setException方法分别用来设置结果、设置异常,但这仅是它们的主要逻辑,它们还会进行其他的处理。

    它们会将结果或异常设置到成员变量outcome上,还会更新状态state,最后调用finishCompletion从等待栈表中移除并唤醒所有(节点)线程(任务已完成,无需要等待,可以直接获取结果,等待栈已没有存在的意义了)。

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

    run方法中有对中断的处理,我们来看看handlePossibleCancellationInterrupt方法怎么处理中断的.

    这里就是简单地使当前线程让出时间片,让其他线程先执行任务,即线程礼让。

    private void handlePossibleCancellationInterrupt(int s) {
            if (s == INTERRUPTING)
                while (state == INTERRUPTING)
                    Thread.yield(); // wait out pending interrupt
        }

    runAndReset方法是FutureTask类自己添加的protected级别的方法(供子类调用), 这个方法主要用来执行可多次执行且不需要结果的任务,只有在任务运行和重设成功时才返回true 。定时任务执行器ScheduledThreadPoolExecutor的静态内部ScheduledFutureTask的run方法调用了这个API.

    和run方法相比,runAndSet方法与之逻辑大致相同,只是runAndSet没用调用set方法设置结果(本身不需要结果,也是出于防止state被修改的目的)

    protected boolean runAndReset() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                //任务已启动或CAS设置运行任务的的线程失败,直接返回false
                return false;
            boolean ran = false;
            int s = state;
            try {
                Callable<V> c = callable;
                if (c != null && s == NEW) {
                    try {
                        c.call(); // don't set result  没有调用set(V)方法,不设置结束
                        ran = true;
                    } catch (Throwable ex) {
                        setException(ex);
                    }
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
            return ran && s == NEW; //任务成功运行且state还是NEW时返回true,反之返回false
        }

    (2) get方法

    get方法用于获取任务的最终结果,它有两个版本,其中一个是超时版本。两个版本的最主要的区别在于,非超时版本可以不限时长地等待结果返回 ,另外非超时版本不会抛出TimeoutException超时异常。get方法超时版本的基本逻辑:若任务未完成就等待任务完成,最后调用report报告结果,report会根据状态返回结果或抛出异常。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);//awaitDone第一个参数为false,表示可以无限时长等待
        return report(s);
    }
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&//还未完成
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//等待完成
            throw new TimeoutException();//到了限定时间,任务仍未完成,抛出超时异常TimeoutException
        return report(s);//报告结果
    }

    get方法的核心实现在于调用awaitDone方法,awaitDone用于等待任务的结果,若任务未完成awaitDone会阻塞当前线程。

    awaitDone方法的基本逻辑:①若执行任务时出现了中断,则抛出InterruptedException异常;②若此时任务已完成,就返回最新的state,③若任务即将完成就使当前线程让出CPU时间片,让其他线程先执行;④若任务还在执行中,就将当前线程加入到等待栈中,然后让当前线程休眠直到超出限定时间或等待任务完成时run方法调用finishCompletion唤醒线程(run方法中的set或setException调用finishCompletion,而finishCompletion又会调用LockSupport.unpark).

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) { //被中断了,就在等待栈表中移除这个线程,并抛出中断异常
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) { //任务完成了,将当前线程从等待队列中清空,返回最新的状态
                if (q != null)
                    q.thread = null;
                return s;
            }
            //任务即将完成,当前线程礼让,让其他线程执行
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null) 
                //初始化当前线程对应的节点
                q = new WaitNode();
            else if (!queued)
                //如果之前入栈失败,再次尝试入栈(CAS更新),将当前节点设为等待栈的栈顶
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) { //如果设置了超时时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) { 
                    //如果任务执行时长已经超出了给定的时间,从等待栈中移除当前节点(线程)
                    removeWaiter(q);
                    return state;
                }
                //让当前线程休眠等待给定的时间(或等到run方法中的set或setException调用finishCompletion来唤醒)
                LockSupport.parkNanos(this, nanos);
            }
            else//未设置超时时间
          //让当前线程无限时长休眠等待,直到任务完成时run方法中的set或setException调用finishCompletion来唤醒此线程
                LockSupport.park(this);
        }
    }

    上面的awaitDone方法中调用removeWaiter来移除等待栈表的中断和超时节点。

    其内部实现不容易理解,但主要逻辑还是很清楚的:从头到尾遍历链表,将链表中的中断/超时节点移除出链表,若有线程竞争就重头开始再次遍历链表检查并移除无效节点。

    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null; //先将节点对应的线程清空,下面的"q.thread != null"就能判断节点是否超时或中断节点。
            retry:
            for (;;) {          // restart on removeWaiter race
                //q表示当前遍历到的节点,pred表示q的前驱节点,s表示q的后继节点 
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//遍历完链表才能退出内循环
                    s = q.next;
                    //q.thread!=null 表示这不是超时或中断的节点,它是效节点,不能被从栈表中移除
                    //(removeWaiter的开头将超时或中断的节点在thread赋空,可见node.thread=null代码)
                    if (q.thread != null)
                        pred = q; //得到下次循环时q的前驱节点
                    else if (pred != null) { //q.thread== null 且pred!=null,需要将无效节点q从栈表中移除
                        //将q的前驱、后继节点直接链接在一起,q本身被移除出栈表了
                        pred.next = s;
                        //这里是从前向后遍历链表的,无竞争情况下,不可能没检查到当前节点的前面还有无效节点,
                        //那么一定有其他线程修改了当前节点q的前驱,些时有线程竞争,需要从链表的头部重新遍历检查
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    // pred==null且q.thread=null
                    //q的前驱节点为空,表明q是链表的头节点
                    //q.thread==null,表明q是无效节点
                    //无效节点不能作为链表的头节点,所以要更新头节点,将q的后继节点s作为链表新的头节点
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,  //CAS更新头节点
                            q, s))
                        //CAS更新失败,重试
                        continue retry;
                }
                break;
            }
        }
    }

    get方法需要调用report方法来报告结果,而report方法的基本逻辑也简单:若是任务正常结束就返回这个任务的结果,若是任务被取消,就抛出CancellationException异常,若是在执行任务过程中发生了异常就将其统一封装成ExecutionException并抛出。

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    (3) cancel方法

    cancel方法用于取消任务,我们可以看看cancel(boolean)方法如何实现的

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            //①不是NEW状态,表示任务至少是COMPLETING(即将结束)状态,返回false
            //②CAS更新state为INTERRUPTING或CANCELLED失败,返回false
            //只有state状态更新成功,才能取消任务(防止被并发调用)
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//允许中断就设置中断标志
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();//设置中断标志
                } finally { // final state 设置中断的最终状态
                    //INTERRUPTING -> INTERRUPTED ,将state由“正在中断”更新为”已经中断“
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //从等待栈上唤醒并移除所有的线程(节点)
            finishCompletion();
        }
        return true;
    }

    其基本逻辑:

    ①任务已结束或被取消,返回false

    ②若mayInterruptIfRunning为true,调用interrupt设置中断标志,将state设置为INTERRUPTED,若mayInterruptIfRunning为false,将state设为CANCELLED.

    ③调用finishCompletion唤醒并移除等待栈中的所有线程

    finishCompletion()主要是处理任务结束后的扫尾工作,其主要逻辑是:将等待栈waiters赋空,唤醒并移除等待栈上的所有节点(线程),最后再将任务callable赋空。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            //任务取消后,等待栈表没有存在的意义了,将等待栈waiters赋为null,
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) { //遍历链表(栈),将所有节点移除,并唤醒节点对应的线程
                    Thread t = q.thread;
                    if (t != null) {
                    //将节点上的线程清空
                        q.thread = null; 
                        LockSupport.unpark(t);//唤醒此线程
                    }
                    WaitNode next = q.next;
                    if (next == null)//链表到尾了,退出遍历
                        break;
                    q.next = null; // unlink to help gc 将节点next属性清空,方便垃圾回收
                    q = next;//向后移动一个节点
                }
                break;
            }
        }
        done();//空方法,留给子类重写
        callable = null; //赋空,减少痕迹       // to reduce footprint 
    }
     

    (4) 其他辅助方法

    isCancelled方法返回任务是否被取消的布尔值

    isDone方法返回任务是否完成的布尔值(非正常结束也行)

    isCancelled 、isDone都是直接根据state确定任务的状态。

    public boolean isCancelled() {
        return state >= CANCELLED;
    }
    
    public boolean isDone() {
        return state != NEW;
    }
  • 相关阅读:
    无法重用Linq2Entity Query
    The Joel Test
    MSBuilder directly instead of default VSComplie with keyborad shotcut 原创
    客户端缓存(Client Cache)
    关于代码重构和UT的一些想法,求砖头
    ExtJS2.0实用简明教程 应用ExtJS
    Perl information,doc,module document and FAQ.
    使用 ConTest 进行多线程单元测试 为什么并行测试很困难以及如何使用 ConTest 辅助测试
    史上最简单的Hibernate入门简介
    汽车常识全面介绍 传动系统
  • 原文地址:https://www.cnblogs.com/gocode/p/analysis-source-code-of-FutureTask.html
Copyright © 2011-2022 走看看