zoukankan      html  css  js  c++  java
  • FutureTask机制

    1. 继承体系

      FutureTask实现了Runnable接口,实现了run() 方法,可以交给线程/线程池执行。实现了Future接口,实现了get()方法,可以获取执行的结果。

    2. 重要属性

    /**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    //volatile保持可见性
    private volatile int state;
    //初态:初始状态
    private static final int NEW          = 0;
    //中态:给计算结果赋值之前,会先CAS设置一下状态
    private static final int COMPLETING   = 1;
    //终态:结果赋值完成后,设置为该状态
    private static final int NORMAL       = 2;
    //终态:计算过程中出现异常,设置为该状态
    private static final int EXCEPTIONAL  = 3;
    //终态:任务被终止
    private static final int CANCELLED    = 4;
    //中态:计算过程中被中断
    private static final int INTERRUPTING = 5;
    //终态:将等待线程中断后,设置为该状态
    private static final int INTERRUPTED  = 6;
    
    /** 初始化时设置的任务 */
    private Callable<V> callable;
    /** 返回的结果,可能是计算结果,也可能是异常 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 执行任务的线程 */
    private volatile Thread runner;
    /** 等待结果的线程,是一个单向链表 */
    private volatile WaitNode waiters;

      任务执行的整个阶段会不停的修改state,而get方法会根据state的值做出不同的反应

    3. run方法

    public void run() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //出现异常,这里会先将state设置为COMPLETING,将异常赋值给outcome,再将state设置为EXCEPTIONAL。将所有阻塞的线程唤醒finishCompletion
                    setException(ex);
                }
                if (ran)
                    //计算成功的话,设置结果
                    //先将state设置为COMPLETING,将结果赋值给outcome,再将state设置为EXCEPTIONAL。同样也会唤醒所有阻塞的线程
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

      根据任务执行的结果,设置不同的状态,正常结束和异常退出,都会唤醒等待的线程。

    4. get方法

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            //根据状态判断,如果还没有到终态,尝试等待
            s = awaitDone(false, 0L);
        //已经有结果,返回结果
        return report(s);
    }

      先看下如何处理结果

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            //如果是正常计算结果,返回
            return (V)x;
        if (s >= CANCELLED)
            //如果被终止或中断了,抛出被终止异常
            throw new CancellationException();
        //执行过程遇到异常,抛出原异常
        throw new ExecutionException((Throwable)x);
    }

      根据不同的状态,返回结果或者抛出异常。再看下等待结果的处理:

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        //如果设置超时,计算超时时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                //如果等待的线程被中断了,从等待链表中移除这个节点
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {
                //任务已结束
                if (q != null)
                    //解除关联,方便回收
                    q.thread = null;
                //返回执行状态
                return s;
            }
            else if (s == COMPLETING)
                //计算结果正在赋值,让出时间片,稍微等一下
                Thread.yield();
            else if (q == null)
                //新的线程调用get,初始化一个等待节点
                q = new WaitNode();
            else if (!queued)
                //将当前节点设置到等待链表,头插法,这里跟AQS里面等待队列节点的插入类似
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) {
                //设置了超时时间,判断是否超时
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    //已经超时,移除当前节点
                    removeWaiter(q);
                    //返回任务状态
                    return state;
                }
                //未超时,在等待时间内阻塞
                LockSupport.parkNanos(this, nanos);
            }
            else
                //阻塞等待唤醒
                LockSupport.park(this);
        }
    }

      调用get的线程,都会在等待链表中阻塞,直到执行完毕,执行线程会唤醒等待线程,当然等待中可以响应中断,或者设置超时。

    5. sun.misc.Unsafe

      这个类提供了手动操作内存的能力,比如属性的读写,内存管理,数值和对象的CAS操作,线程调度,内存屏障等等。请参见:Java中的Unsafe

      下面我们说下源码中用到的交换方法,这个方法呢,直接操作内存地址,CAS替换对象中某个字段的值。

       public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update); 

      obj:需要修改的对象。

      offset:字段在对象内存地址中的偏移量

      expect:字段期望的初始值

      update:更新的值

      如何比较?首先对象首地址+字段偏移量,找到对应字段的地址,然后跟expect指向对象的地址比较,如果一致,用update指向对象的地址来更新。参考

      回到源码, UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); 

      这里呢,就是替换当前FutureTask对象,字段偏移量为waitersOffset(就是waiters这个属性啦)的值,期望的初始值是旧的头节点waiters,需要更新成我们的新节点q。 q.next = waiters 这个操作将新节点的下位指向了旧的头结点,所以这里是头插法。

      

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    基于Python的人脸动漫转换
    let 与 var的区别
    【LeetCode】汇总
    【HDU】4632 Palindrome subsequence(回文子串的个数)
    【算法】均匀的生成圆内的随机点
    【LeetCode】725. Split Linked List in Parts
    【LeetCode】445. Add Two Numbers II
    【LeetCode】437. Path Sum III
    【LeetCode】222. Count Complete Tree Nodes
    【LeetCode】124. Binary Tree Maximum Path Sum
  • 原文地址:https://www.cnblogs.com/walker993/p/14819004.html
Copyright © 2011-2022 走看看