zoukankan      html  css  js  c++  java
  • FutureTask源码分析

    FutureTask:表示的是异步计算的未来结果,本质就是多线程执行并拿到结果。
    get方法是阻塞的
    NEW表示是FutureTask刚刚创建好,是一个新的任务或者是还没有执行的任务,是初始状态。

    FutureTask只能执行一次,一次过后需要重新创建,因为futureTask的状态会停留到最后一步,所以第二次执行到run方法的时候会被过滤掉。
    FutureTask同时只能一个线程执行,在执行run方法的时候会被过滤掉。

    FutureTask实现了RunnableFuture接口

    其中包含Future接口

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);//用于取消异步执行,不一定成功。
        boolean isCancelled();//判断任务是否取消,true表示已经取消,false表示没有取消
        boolean isDone();//判断任务是否已经执行完成,true表示完成,false表示没有完成
        V get() throws InterruptedException, ExecutionException;//获取异步执行的结果
        V get(long timeout, TimeUnit unit)//获取异步执行的结果,这里是带超时时间的。
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    构造函数

    public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;//初始状态
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);//通过工具类进行封装,把Runnable转为Callable
        this.state = NEW;//初始状态
    }

    FutureTask是有多种状态的

    private static final int NEW          = 0;//初始状态,在还没执行完成都是。
    private static final int COMPLETING   = 1;//call方法已经调用结束了,在调用set或者setException方法,但是还没有赋值给outcome。
    private static final int NORMAL       = 2;//正常执行完成,是调用set方法。
    private static final int EXCEPTIONAL  = 3;//异常执行完成,是调用的setException方法。
    private static final int CANCELLED    = 4;//调用了cancel(false)后的状态,是最终态。
    private static final int INTERRUPTING = 5;//调用了cancel(true)后的状态,是中间态。
    private static final int INTERRUPTED  = 6;//调用cancel的最终状态。

    run方法的执行逻辑

    public void run() {
        //判断当前线程是不是初始状态,这样可以保证执行过的线程会过滤掉。
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))//判断runner是否为null,这样可以保证同时只能一个线程执行。    
            return;
        try {
            //创建FutureTask时传入进来了
            Callable<V> c = callable;
            //初始状态且不为null
            if (c != null && state == NEW) {
                V result;
                //用于判断任务是否正确执行
                boolean ran;
                try {
                    //执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//异常赋值
                }
                if (ran)
                    set(result);//正常赋值
            }
        } finally {
          //在状态成为最终态时要保证runner非空,因为这样可以避免并发调用run。 runner
    = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

    set和setException方法

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //指定最终状态是NORMAL
            finishCompletion();
        }
    }
    
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //指定最终状态是EXCEPTIONAL
            finishCompletion();
        }
    }

    finishCompletion方法

    //唤醒所有等待的线程
    //这些等待的线程是在调用get时产生的。
    //相当于在这里控制等待的线程get到结果。
    private void finishCompletion() {
        //唤醒所有等待的线程
        for (WaitNode q; (q = waiters) != null;) {//等待的线程
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒等待的线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }

    get方法

    //get方法是阻塞的
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)//如果没有执行完的
            s = awaitDone(false, 0L);
        return report(s);//返回结果
    }

    report方法

    //获取当前的值
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)//正常状态的结果
            return (V)x;
        if (s >= CANCELLED)//取消的结果
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);//异常结束的结果
    }

    awaitDone方法

    //当前线程阻塞的逻辑:  timed是否设置等待时间,nanos具体的等待时间
    //进入该方法的时候状态肯定是<=COMPLETING的,也就是NEW(执行未完成)和COMPLETING(执行完成未赋值)
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {//通过for死循环不断验证状态
            if (Thread.interrupted()) {//如果当前线程是被打断的,那么会从waiters中移除并抛出异常。
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {//run运行结束后,或者通过cancel调用导致状态变更。
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // run正在进行时,call方法已经执行了,还没有执行到赋值那一步
                Thread.yield();
            else if (q == null)//新创建等待节点
                q = new WaitNode();
            else if (!queued)//还没有入队列就把q放到waiters的头结点
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {//指定等待时间的情况下
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {//已经到时了
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//线程阻塞
            }
            else
                LockSupport.park(this);//当前线程进行阻塞
        }
    }

    cancel方法

    //是否取消,如果为true,会打断run的线程,否则只是状态标记为CANCELLED
    ////该方法不会抛出异常的
    public boolean cancel(boolean mayInterruptIfRunning) { //状态不为new的时候,也就是到赋值阶段是不允许cancel的,基本上就是在run方法执行时可以。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion();//cancel会把等待线程都唤醒,然后结束。 } return true; }

    isCancelled方法和isDone方法

    //判断是否被取消
    public boolean isCancelled() {
        return state >= CANCELLED;//通过状态判断
    }
    
    //是否已经完成,也就是执行完run方法,或者被取消
    public boolean isDone() {
        return state != NEW;//通过状态判断
    }
  • 相关阅读:
    Mysql基础知识:操作数据库
    Mysql基础知识:数据类型
    Struts2下载与使用
    Open Live Writer安装使用
    博客园界面设置
    (3)break、continue
    (2)基础语法
    查找五:散列表查找
    查找三:平衡二叉树
    centos 安装 masscan
  • 原文地址:https://www.cnblogs.com/tp123/p/12093223.html
Copyright © 2011-2022 走看看