zoukankan      html  css  js  c++  java
  • FutureTask源码完整解读

    1 简介

    上一篇博客“异步任务服务简介”对FutureTask做过简要介绍与分析,这里再次对FutureTask做一次深入的分析(基于JDK1.8)。

    FutureTask同时实现了Future 、Runnable接口,因此它可以交给执行器Executor去执行这个任务,也可以由调用线程直接执行run方法。

    根据FutureTask.run方法的执行状态,可将其分为以下3种状态

    ①未启动: run方法还未被执行,FutureTask处于未启动状态。

    ②已启动: run方法在执行过程中,FutureTask处于已启动状态

    ③已完成:run方法正常完成返回或被取消或执行过程中因异常抛出而非正常结束,FutureTask处于已完成状态。

    当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

    当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行 FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel方法将返回false (已完成的任务任务无法取消)。

    2 用法示例

    FutureTask因其自身继承于Runnable接口,因此它可以交给执行器Executor去执行;另外它也代表异步任务结果,它还可以通过ExecutorService.submit返回一个FutureTask。另外FutureTask也可单独使用。为了更好的理解FutureTask ,下面结合ConcurrentHashMap演示一个任务缓存。缓存中有多个任务,使用多线程去执行这些任务,一个任务最多被一个线程消费,若多个线程试图执行这一个任务,只允许一个线程来执行,其他线程必须等待它执行完成。

    import java.util.concurrent.*;
    
    public class FutureTaskTest {
        private final ConcurrentMap<String, Future<String>> taskCache = new ConcurrentHashMap<>();
        public  String executionTask(final String taskName)
                throws ExecutionException, InterruptedException {
            while (true) {
                Future<String> future = taskCache.get(taskName);// 从缓存中获取任务
                if (future == null) {//不存在此任务,新构建一个任务放入缓存,并启动这个任务
                    Callable<String> task = () ->{
                        System.out.println("执行的任务名是"+taskName);
                        return taskName;
                    } ; // 1.2创建任务
                    FutureTask<String> futureTask = new FutureTask<String>(task);
                    future = taskCache.putIfAbsent(taskName, futureTask);// 尝试将任务放入缓存中
                    if (future == null) {
                        future = futureTask;
                        futureTask.run();//执行任务
                    }
                }
                try { //若任务在缓存中了,可以直接等待任务的完成
                    return future.get();// 等待任务执行完成
                } catch (CancellationException e) {
                    taskCache.remove(taskName, future);
                }
            }
        }
    
        public static void main(String[] args)    {
         final   FutureTaskTest taskTest = new FutureTaskTest();
            for (int i = 0; i < 7; i++) {
                int finalI = i;
                new Thread(()->{
                    try {
                        taskTest.executionTask("taskName" + finalI);
                    } catch (ExecutionException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
                new Thread(()->{
                    try {
                        taskTest.executionTask("taskName" + finalI);
                        taskTest.executionTask("taskName" + finalI);
                    } catch (ExecutionException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    打印输出

    执行的任务名是taskName0

    执行的任务名是taskName5

    执行的任务名是taskName4

    执行的任务名是taskName6

    执行的任务名是taskName3

    执行的任务名是taskName1

    执行的任务名是taskName2

    3 实现原理

    1) 成员变量

    它有一个成员变量state表示状态

    private volatile int state;

    它有这些可能取值

    private static final int NEW          = 0;//刚开始的状态或任务在运行中
    private static final int COMPLETING   = 1;//临时状态,任务即将结束,正在设置结果
    private static final int NORMAL       = 2;//任务正常完成
    private static final int EXCEPTIONAL  = 3;//因抛出异常而结束任务
    private static final int CANCELLED    = 4;//任务被取消
    private static final int INTERRUPTING = 5;//任务正在被中断
    private static final int INTERRUPTED  = 6;//任务被中断(中断的最终状态)

    state可能有这几种状态转换

    /** NEW -> COMPLETING -> NORMAL      正常结束任务时的状态转换流程
         * NEW -> COMPLETING -> EXCEPTIONAL   任务执行过程中抛出了异常时的状态转换流程
         * NEW -> CANCELLED                 任务被取消时的状态转换流程
         * NEW -> INTERRUPTING -> INTERRUPTED  任务执行过程中出现中断时的状态转换流程
         */

    其他成员变量

    private Callable<V> callable;
    private Object outcome; // non-volatile, protected by state reads/writes
    private volatile Thread runner;
    private volatile WaitNode waiters;

    成员变量callable表示要执行的任务,

    成员变量outcome表示任务的结果或任务非正常结束的异常

    成员变量runner表示执行此任务的线程

    成员变量waiter表示等待任务执行结果的等待栈(数据结构是单向链表) 。WaitNode是一个简单的静态内部,一个成员变量thread表示等待结果的线程,另一个成员变量next表示下一个等待节点(线程)。

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

    2) 构造方法

    FutureTask的构造方法会初始化callable和state ,它有两个构造方法, 分别接受Callable和Runnable类型的待执行任务。但对于Runnable类型参数,它会调用Executors.callable将Runnable转换为Callable类型实例,以便于统一处理。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    Executors.callable方法也很简单,它就返回了一个Callable的实现类RunnableAdapter类型的对象。

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

    3) 主要API

    (1) run与runAndReset

    run方法是Funture最重要的方法,FutureTask的一切都是从run方法开始的,它是执行callable任务的方法。

    public void run() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                //将当前线程设置为执行任务的线程,CAS失败就直接返回
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();//执行任务
                        ran = true;
                    } catch (Throwable ex) {
                        //运行时有异常,设置异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//设置结果
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null; //state已是最终状态,不再变化,将runer设为null,防止run方法被并发调用
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state; //清空运行线程runner后再重新获取state,防止遗漏掉对中断的处理
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    其主要逻辑是:

    ①检查状态,设置运行任务的线程

    ②调用callable的call方法去执行任务,并捕获运行中可能出现的异常

    ③如果任务正常完成,调用set设置任务的结果,将state设为NORMAL, 将结果保存到outcome ,唤醒所有等待结果的线程

    ④若执行任务过程中发生了异常,调用setException设置异常,将state设为EXCEPTIONAL ,将此异常也保存到outcome ,唤醒所有等待结果的线程

    ⑤最后将运行线程runner清空,若状态可能是任务被取消的中断还要处理此中断。

    set 、setException方法分别用来设置结果、设置异常,但这仅是它们的主要逻辑,它们还会进行其他的处理。

    它们会将结果或异常设置到成员变量outcome上,还会更新状态state,最后调用finishCompletion从等待栈表中移除并唤醒所有(节点)线程(任务已完成,无需要等待,可以直接获取结果,等待栈已没有存在的意义了)。

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

    run方法中有对中断的处理,我们来看看handlePossibleCancellationInterrupt方法怎么处理中断的.

    这里就是简单地使当前线程让出时间片,让其他线程先执行任务,即线程礼让。

    private void handlePossibleCancellationInterrupt(int s) {
            if (s == INTERRUPTING)
                while (state == INTERRUPTING)
                    Thread.yield(); // wait out pending interrupt
        }

    runAndReset方法是FutureTask类自己添加的protected级别的方法(供子类调用), 这个方法主要用来执行可多次执行且不需要结果的任务,只有在任务运行和重设成功时才返回true 。定时任务执行器ScheduledThreadPoolExecutor的静态内部ScheduledFutureTask的run方法调用了这个API.

    和run方法相比,runAndSet方法与之逻辑大致相同,只是runAndSet没用调用set方法设置结果(本身不需要结果,也是出于防止state被修改的目的)

    protected boolean runAndReset() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                //任务已启动或CAS设置运行任务的的线程失败,直接返回false
                return false;
            boolean ran = false;
            int s = state;
            try {
                Callable<V> c = callable;
                if (c != null && s == NEW) {
                    try {
                        c.call(); // don't set result  没有调用set(V)方法,不设置结束
                        ran = true;
                    } catch (Throwable ex) {
                        setException(ex);
                    }
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
            return ran && s == NEW; //任务成功运行且state还是NEW时返回true,反之返回false
        }

    (2) get方法

    get方法用于获取任务的最终结果,它有两个版本,其中一个是超时版本。两个版本的最主要的区别在于,非超时版本可以不限时长地等待结果返回 ,另外非超时版本不会抛出TimeoutException超时异常。get方法超时版本的基本逻辑:若任务未完成就等待任务完成,最后调用report报告结果,report会根据状态返回结果或抛出异常。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);//awaitDone第一个参数为false,表示可以无限时长等待
        return report(s);
    }
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&//还未完成
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//等待完成
            throw new TimeoutException();//到了限定时间,任务仍未完成,抛出超时异常TimeoutException
        return report(s);//报告结果
    }

    get方法的核心实现在于调用awaitDone方法,awaitDone用于等待任务的结果,若任务未完成awaitDone会阻塞当前线程。

    awaitDone方法的基本逻辑:①若执行任务时出现了中断,则抛出InterruptedException异常;②若此时任务已完成,就返回最新的state,③若任务即将完成就使当前线程让出CPU时间片,让其他线程先执行;④若任务还在执行中,就将当前线程加入到等待栈中,然后让当前线程休眠直到超出限定时间或等待任务完成时run方法调用finishCompletion唤醒线程(run方法中的set或setException调用finishCompletion,而finishCompletion又会调用LockSupport.unpark).

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) { //被中断了,就在等待栈表中移除这个线程,并抛出中断异常
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) { //任务完成了,将当前线程从等待队列中清空,返回最新的状态
                if (q != null)
                    q.thread = null;
                return s;
            }
            //任务即将完成,当前线程礼让,让其他线程执行
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null) 
                //初始化当前线程对应的节点
                q = new WaitNode();
            else if (!queued)
                //如果之前入栈失败,再次尝试入栈(CAS更新),将当前节点设为等待栈的栈顶
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) { //如果设置了超时时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) { 
                    //如果任务执行时长已经超出了给定的时间,从等待栈中移除当前节点(线程)
                    removeWaiter(q);
                    return state;
                }
                //让当前线程休眠等待给定的时间(或等到run方法中的set或setException调用finishCompletion来唤醒)
                LockSupport.parkNanos(this, nanos);
            }
            else//未设置超时时间
          //让当前线程无限时长休眠等待,直到任务完成时run方法中的set或setException调用finishCompletion来唤醒此线程
                LockSupport.park(this);
        }
    }

    上面的awaitDone方法中调用removeWaiter来移除等待栈表的中断和超时节点。

    其内部实现不容易理解,但主要逻辑还是很清楚的:从头到尾遍历链表,将链表中的中断/超时节点移除出链表,若有线程竞争就重头开始再次遍历链表检查并移除无效节点。

    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null; //先将节点对应的线程清空,下面的"q.thread != null"就能判断节点是否超时或中断节点。
            retry:
            for (;;) {          // restart on removeWaiter race
                //q表示当前遍历到的节点,pred表示q的前驱节点,s表示q的后继节点 
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//遍历完链表才能退出内循环
                    s = q.next;
                    //q.thread!=null 表示这不是超时或中断的节点,它是效节点,不能被从栈表中移除
                    //(removeWaiter的开头将超时或中断的节点在thread赋空,可见node.thread=null代码)
                    if (q.thread != null)
                        pred = q; //得到下次循环时q的前驱节点
                    else if (pred != null) { //q.thread== null 且pred!=null,需要将无效节点q从栈表中移除
                        //将q的前驱、后继节点直接链接在一起,q本身被移除出栈表了
                        pred.next = s;
                        //这里是从前向后遍历链表的,无竞争情况下,不可能没检查到当前节点的前面还有无效节点,
                        //那么一定有其他线程修改了当前节点q的前驱,些时有线程竞争,需要从链表的头部重新遍历检查
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    // pred==null且q.thread=null
                    //q的前驱节点为空,表明q是链表的头节点
                    //q.thread==null,表明q是无效节点
                    //无效节点不能作为链表的头节点,所以要更新头节点,将q的后继节点s作为链表新的头节点
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,  //CAS更新头节点
                            q, s))
                        //CAS更新失败,重试
                        continue retry;
                }
                break;
            }
        }
    }

    get方法需要调用report方法来报告结果,而report方法的基本逻辑也简单:若是任务正常结束就返回这个任务的结果,若是任务被取消,就抛出CancellationException异常,若是在执行任务过程中发生了异常就将其统一封装成ExecutionException并抛出。

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    (3) cancel方法

    cancel方法用于取消任务,我们可以看看cancel(boolean)方法如何实现的

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            //①不是NEW状态,表示任务至少是COMPLETING(即将结束)状态,返回false
            //②CAS更新state为INTERRUPTING或CANCELLED失败,返回false
            //只有state状态更新成功,才能取消任务(防止被并发调用)
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//允许中断就设置中断标志
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();//设置中断标志
                } finally { // final state 设置中断的最终状态
                    //INTERRUPTING -> INTERRUPTED ,将state由“正在中断”更新为”已经中断“
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //从等待栈上唤醒并移除所有的线程(节点)
            finishCompletion();
        }
        return true;
    }

    其基本逻辑:

    ①任务已结束或被取消,返回false

    ②若mayInterruptIfRunning为true,调用interrupt设置中断标志,将state设置为INTERRUPTED,若mayInterruptIfRunning为false,将state设为CANCELLED.

    ③调用finishCompletion唤醒并移除等待栈中的所有线程

    finishCompletion()主要是处理任务结束后的扫尾工作,其主要逻辑是:将等待栈waiters赋空,唤醒并移除等待栈上的所有节点(线程),最后再将任务callable赋空。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            //任务取消后,等待栈表没有存在的意义了,将等待栈waiters赋为null,
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) { //遍历链表(栈),将所有节点移除,并唤醒节点对应的线程
                    Thread t = q.thread;
                    if (t != null) {
                    //将节点上的线程清空
                        q.thread = null; 
                        LockSupport.unpark(t);//唤醒此线程
                    }
                    WaitNode next = q.next;
                    if (next == null)//链表到尾了,退出遍历
                        break;
                    q.next = null; // unlink to help gc 将节点next属性清空,方便垃圾回收
                    q = next;//向后移动一个节点
                }
                break;
            }
        }
        done();//空方法,留给子类重写
        callable = null; //赋空,减少痕迹       // to reduce footprint 
    }
     

    (4) 其他辅助方法

    isCancelled方法返回任务是否被取消的布尔值

    isDone方法返回任务是否完成的布尔值(非正常结束也行)

    isCancelled 、isDone都是直接根据state确定任务的状态。

    public boolean isCancelled() {
        return state >= CANCELLED;
    }
    
    public boolean isDone() {
        return state != NEW;
    }
  • 相关阅读:
    HDU 1286(欧拉函数||筛选法)
    因数打表(HDU1215)
    HDU 1003
    T行数据跟着N个数据
    15校赛
    HDU 1002
    简单大数相加
    (质因子打表记录素数的位置)HDU Largest prime factor
    HDU cake
    【转】 cin、cin.get()、cin.getline()、getline()、gets()等函数的用法
  • 原文地址:https://www.cnblogs.com/gocode/p/analysis-source-code-of-FutureTask.html
Copyright © 2011-2022 走看看