zoukankan      html  css  js  c++  java
  • 线程基础知识09-FutureTask详解

    FutureTask详解

         作用:用于等待一个线程执行完毕后再执行另一个线程任务。一般用于Executors框架中,最常使用的是再ThreadPoolExecutor中,进行多线程任务;

    注意:

    • JDK1.8不在使用AQS进行线程管理;

    • 取而代之的是通过CAS进行状态的切换,waiter线程节点由堆栈完成操作;

    • 每次执行完或者有异常后,都会调用方法,重新唤醒后继线程进行锁竞争;

    UML图

    FutureTask几种状态变更

    几种状态

    • 初始状态是NEW;

    • 任务执行中的状态是COMPLETING;

    • 调用cancel方法,会取消任务,调用cancel(true)方法会中断任务执行;

    几种状态的切换如下:

    • NEW -> COMPLETING -> NORMAL

    • NEW -> COMPLETING -> EXCEPTIONAL

    • NEW -> CANCELLED

    • NEW -> INTERRUPTING -> INTERRUPTED

    状态码的值大小

    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    

    源码查看

    run 方法

    • 如果当前线程占有锁并且是初始状态,则进行任务执行,并返回结果;

    • 如果出现错误,则更改状态,并唤醒所有后继线程;

    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))//判断状态是否为NEW或者锁占有线程是否当前线程
                return;
            try {
                Callable<V> c = callable;//获取当前callbale任务
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        /**
                         * 如果实现的是Runnable接口,要预设返回的result;
                         */       
                        result = c.call();//执行当前线程任务
                        ran = true;//任务执行成功
                    } catch (Throwable ex) {//任务执行失败
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)//如果任务执行成功返回执行结果
                        set(result);
                }
            } finally {
                /**
                 * 将当前线程设置为空
                 * 如果当前线程是正在终端状态,调用Thread.yeild()进行锁释放重竞争
                 */     
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    /**
     * 本质还是调用Runnable的run方法,返回预给定的返回值
     */
    static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();//相当于还是调用线程的run方法进行执行,没什么差别
                return result;
            }
        }
    /**
     * 返回错误信息,更改错误状态,唤醒所有后继线程
     */
     protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    
    

    set方法

    • 更改当前节点状态,并设置返回值;

    • 唤醒所有后继节点线程

      protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更改状态为COMPLETING执行中
                outcome = v;//增加返回值
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 更改状态为NORMAL,执行完成
                finishCompletion();
            }
        }
    
    
    /**
     * 作用:唤起所有后继节点的线程
     */
      private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {//有后继节点的时候
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//后继节点替换当前节点
                    for (;;) {
                        Thread t = q.thread;//获取节点线程
                        if (t != null) {//线程不为空时
                            q.thread = null;
                            LockSupport.unpark(t);//唤醒线程
                        }
                        WaitNode next = q.next;//当前节点的后继节点
                        if (next == null)
                            break;
                        q.next = null; // GC回收节点
                        q = next;//进行节点替换
                    }
                    break;
                }
            }
           /**
            * 这个方法是继承类去实现,完成线程任务或者取消线程任务
            */ 
            done();
            callable = null; 
        }
    

    get方法

    • 作用:获取当前线程返回的数据;

    • 方法执行的过程:

      • 如果是初始状态NEW,则创建当前线程为新的节点,并关联后继节点。

      • 阻塞节点直到节点线程任务完成,或者抛出异常;

      • 返回当前线程执行数据;

     public V get() throws InterruptedException, ExecutionException {
          int s = state;
          if (s <= COMPLETING)//如果是初始状态时
              s = awaitDone(false, 0L);//任务执行,并返回执行状态
          return report(s);//用于返回数据
      }
      /**
       * 执行的过程时:
       * 1.判断当前节点的状态如果是终端,则移除当前节点抛出异常 ;
       * 2.如果节点执行完成,则将线程任务回收,并返回任务执行状态;  
       * 3.如果当前节点为空,则创建节点
       * 4.如果当前节点没有在队列中,则将当前节点加入到队列中
       * 5.有超时限制的话,判断是否超时,超时则移出,否则进行阻塞
       * 6.非以上情况,则进行阻塞,直到被唤醒。
       */             
      private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;//是否设置超时时间
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {//如果线程已经终端,移除节点,并抛出异常
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {//如果大于COMPLETING说明任务已经执行
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // 如果状态时COMPLETING,让出CPU时间,并重新尝试获得
                    Thread.yield();
                else if (q == null)//如果为空,则创建新的节点
                    q = new WaitNode();
                else if (!queued)//如果没有后继节点,当前节点通过unsafe设置当前节点的后继节点
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {//设置了超时的化,超时后,将当前节点进行移除
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);//如果没有超时,则阻塞当前节点固定的时间
                }
                else
                    LockSupport.park(this);//阻塞当前节点
            }
        }
    /**
     * 移除等待的节点
     */  
      private void removeWaiter(WaitNode node) {
            if (node != null) {
                node.thread = null;
                retry:
                for (;;) {          // restart on removeWaiter race
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                        s = q.next;
                        if (q.thread != null)
                            pred = q;
                        else if (pred != null) {
                            pred.next = s;
                            if (pred.thread == null) // check for race
                                continue retry;
                        }
                        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                              q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }
    
    
    /**
     * 作用:用于返回结果
     */
       private V report(int s) throws ExecutionException {
            Object x = outcome;//返回的值
            if (s == NORMAL)//正常返回
                return (V)x;
            if (s >= CANCELLED)//返回异常
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    cancel方法

    • 初始状态,根据传入的boolean值,如果不传值,默认取消当前节点,唤起后继线程

    • 如果传值true,表示中断,中断任务后,唤起后继节点线程;

       public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {//表示中断当前线程任务
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);//更改节点状态
                    }
                }
            } finally {
                finishCompletion();//唤醒后继节点线程
            }
            return true;
        }
    

    总结

    1.所以为什么不用AQS而改用CAS?

    其实注解已经给出了答案:
    Revision notes: This differs from previous versions of this
    class that relied on AbstractQueuedSynchronizer, mainly to
    avoid surprising users about retaining interrupt status during
    cancellation races.
    
    我们看一下AQS中断式获取锁的方法;
    
     - 假设第一次获取锁,并没有获取到;
     - 加到同步队列的过程中中断异常了;
     - 获取中断状态(会清空当前线程中断标识),又抛出中断异常
     - 如果再来获取一次,又会重复一次。这种容易对开发人员造成困惑。所以改CAS方式了;
    
    
     public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())//判断如果中断了
                throw new InterruptedException();//再抛出中断异常
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
    
     private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                //。。。。。。。。。。
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();//抛出中断异常
                }
            } finally {
               //。。。。。
            }
        }
    
    
  • 相关阅读:
    编译不通过:提示XXXX不是类或命名空间名 的解决办法
    nginx应用总结(2)--突破高并发的性能优化
    nginx应用总结(1)--基础认识和应用配置
    springboot内置tomcat验证授权回调页面域名
    MySQL实现类似Oracle中的nextval和currval
    Notepad++中删除连续的任意n行
    Spring Boot系列二 Spring @Async异步线程池用法总结
    Spring线程池配置
    Spring异步方法注解 @Async
    异步任务spring @Async注解源码解析
  • 原文地址:https://www.cnblogs.com/perferect/p/13666702.html
Copyright © 2011-2022 走看看