zoukankan      html  css  js  c++  java
  • FutureTask解析(转)

    站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:
    1. 线程执行结果带有返回值
    2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回。

    那,怎么实现future这种超时控制呢?来看看代码:

    F1

    FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,这个类承担了所有future的功能,AbstractQueuedSynchronizer的作者是大名鼎鼎的并发编程大师Doug Lea,它的作用远远不止实现一个Future这么简单,后面在说。

    下面,我们从一个future提交到线程池开始,直到future超时或者执行结束来看看future都做了些什么。怎么做的。
    首先,向线程池ThreadPoolExecutor提交一个future:

    F2

    ThreadPoolExecutor将提交的任务用FutureTask包装一下:

    F3

    F4

    然后尝试将包装后的Future用Thread类包装下后启动,

    红色标记的地方表示,当当前线程池的大小小于corePoolSize时,将任务提交,否则将该任务加入到workQueue中去,如果workQueue装满了,则尝试在线程数小于MaxPoolSize的条件下提交该任务。

    F5

    顺便说明下,我们使用线程池时,常常看到有关有界队列,无界队列作为工作队列的字眼:使用无界队列时,线程池的大小永远不大于corePoolSize,使用有界队列时的maxPoolSize才有效,原因就在这里,如果是
    无界队列,红框中的add永远为true 下方的addIfUnderMaximumPoolSize怎么也走不到了,也就不会有线程数量大于MaxPoolSize的情况。

    言归正传,看看addIfUnderCorePoolSize 中做了什么事:
    new了一个Thread,将我们提交的任务包装下后就直接启动了

    F6

    我们知道,线程的start方法会调用我们runnable接口的run方法,因此不难猜测FutureTask也是实现了Runnable接口的

    F7

    F8

    FutureTask的run()方法中是这么写:

    F9

    innerRun方法先使用原子方式更改了一下自己的一个标志位state(用于标示任务的执行情况)
    然后红色框的方法 实现回调函数call的调用,并且将返回值作为参数传递下去,放置在一个叫做result的泛型变量中,
    然后future只管等待一段时间后去拿result这个变量的值就可以了。 至于怎么实现的“等待一段时间再去拿” 后面马上说明。

    F10

    innerSet在经过一系列的状态判断后,最终将V这个call方法返回的值赋值给了result

    F11

    说到这里,我们知道,future是通过将call方法的返回值放在一个叫做result的变量中,经过一段时间的等待后再去拿出来返回就可以了。

    怎么实现这个 “等一段时间”呢?

    要从Sync的父类AbstractQueuedSynchronizer这个类说起:

    我们知道AbstractQueuedSynchronizer 后者的中文名字叫做 同步器,顾名思义,是用来控制资源占用的一种方式。对于FutureTask来说,“资源”就是result,线程执行的结果。思路就是通过控制对result这个资源的访问来决定是否需要马上去取得result这个结果,当超时时间未到,或者线程未执行结束时,是不能去取result的。当线程正常执行结束后,一系列的标志位会被修改,并告诉等待future执行结果的各个线程,可以来获取result了。

    这里会涉及到 独占锁和共享锁的概念。

    独占锁:同一时间只有一个线程获取锁。再有线程尝试加锁,将失败。 典型例子 reentrantLock
    共享锁:同一时间可以有多个线程获取锁。 典型例子,本例中的FutureTask

    为什么说他们?因为Sync本质上就是想完成一个共享锁的功能,所以Sync继承了AbstractQueuedSynchronizer 所以Sync的方法使用的是AbstractQueuedSynchronizer的共享锁的API

    首先,我们明白,future结束有两种状态:
    1. 线程正常执行完毕,通知等待结果的主线程对应于future.get()方法。
    2. 线程还未执行完毕,等待结果的主线程已经等不到了(超时),抛出一个TimeOutException后不再等待。对应于future.get(long timeout, TimeUnit unit)

    下面我们依次看看对于这两种状态,我们是怎么处理的:
    从上图中可以得知,线程在执行完毕后会将执行的结果放到result中, 红色框中同时提到了releaseShared 方法,我们从这里进入AbstractQueuedSynchronizer

    F12

    当result已经被赋值,或者FutureTask为cancel状态时,FutureTask会尝试去释放共享锁(可以同时有多个线程调用future.get() 方法,也就是会有多个线程在等待future执行结果,而furue在执行完毕后会依次唤醒各个线程)
    如果尝试成功,则开始真正的释放锁,这里是AbstractQueuedSynchronizer 比较精妙的地方, “尝试”动作都定义为抽象方法,交个各个子类去定义“尝试成功的含义” 而真正的释放则自己实现,这种复杂规则交个子类,流程交给自己的思路很值得借鉴。

    F13

    再看FutureTask的 “尝试释放”的规则:

    没啥好说,怎么尝试都成功

    F14

    接着AbstractQueuedSynchronizer 开始了真正的释放唤醒工作:

    private void doReleaseShared() {
     /*
    * Ensure that a release propagates, even if there are other
    * in-progress acquires/releases. This proceeds in the usual
    * way of trying to unparkSuccessor of head if it needs
    * signal. But if it does not, status is set to PROPAGATE to
    * ensure that upon release, propagation continues.
    * Additionally, we must loop in case a new node is added
    * while we are doing this. Also, unlike other uses of
    * unparkSuccessor, we need to know if CAS to reset status
    * fails, if so rechecking.
    */ for (;;) {
         Node h = head;//把头元素取出来,保持头元素的引用,防止head被更改      if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {//如果状态位为:需要一个信号去唤醒 注释原话:/** waitStatus value to                  indicate successor's thread needs unparking */         if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //修改状态位              continue; // loop to recheck cases         unparkSuccessor(h);//如果修改成功,则通过头元素找到一个线程,并且唤醒它(唤醒动作是通过JNI方法去调用的)         }
           else if (ws == 0 &&
                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
           continue; // loop on failed CAS      }
         if (h == head) // loop if head changed          break;
       }
    }

    循环遍历后,知道已经没有结点需要唤醒则返回,依次return后,future的run方法执行完毕。

    以上是针对future线程的,我们知道,FutureTask已经将执行结果放在了result中,并且按等的先后顺序依唤醒了等待队列上的线程。
    那,猜测future.get方法就不难了,对于带超时的get方法:最大的可能性就是不断的检查future的一个状态位,看它是否执行完毕,执行完则获取结果返回,否则,再阻塞自己一段时间。
    对于不待超时的,就上来就先尝试获取结果,拿不到就阻塞自己,直到上述的innerSet方法唤醒它。
    究竟是不是这样呢?一起来看看:

    因为innerGet(long nanosTimeout) 和innerGet()流程大致相同,所以我们重点讲解innerGet(long nanosTimeout) ,在唯一一个有区别的地方说明下即可。

    如下图所示,对于innerGet(long nanosTimeout) 方法,FutureTask采用的方法是直接加锁或者每隔一段时间尝试加锁,如果成功,则返回true,则如上图所示,直接返回result,主线程拿到执行结果。
    否则,抛出超时异常。

    对于tryAcquireShared 方法,比较简单,直接看future是否执行完毕

    如果没有结束,则进入doAcquireSharedNanos方法:

    private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
      
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.SHARED);//在队列尾部增加一个结点,我的理解是,用来标明这个队列是共享者队列还是独占队列     try {
            for (;;) {
                final Node p = node.predecessor();//拿出刚才新增结点的前一个结点:实际有效的队尾结点。             if (p == head) {
                    int r = tryAcquireShared(arg);//尝试获取锁。                 if (r >= 0) {//                     setHeadAndPropagate(node, r);//返回值大于1 对于FutureTask代表任务已经被cancel了,则更改队列头部结点。                 p.next = null; // help GC 将p结点脱离队列,帮助GC             return true;//返回true后 上述中可以知道当前线成会抛出超时异常 确定下会不会唤醒其他节点?         }
            }
            if (nanosTimeout <= 0) { //如果设置的超时时间小于等于0 则取消获取锁 cancelAcquire(node); return             false; } if (nanosTimeout > spinForTimeoutThreshold && //等待的时间必须大于一个自旋锁的周期时间             shouldParkAfterFailedAcquire(p, node)) // 遍历队列,找到需要沉睡的第一个节点             LockSupport.parkNanos(this, nanosTimeout); // 调用JNI方法,沉睡当前线程             long now = System.nanoTime();
                nanosTimeout -= now - lastTime; // 更新等待时间 循环遍历             lastTime = now;
                if (Thread.interrupted())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
            // Arrive here only if interrupted         cancelAcquire(node);
            throw new InterruptedException();
        }

    这样通过AQS的协作,所有调用future.get(long timeout, TimeUnit unit)的线程都会按顺序等待,直到线成执行完被唤醒或者超时时间到 主动抛出异常。

    总结

    至此为止FutureTask的解析已经基本结束了,可以看到。它依靠AQS的共享锁实现了对线程执行结果的访问控制。和我们通常意义上的访问控制(并发访问某个资源,获取失败时,沉睡自己等待唤醒或者超时后返回)基本是一致的,不外乎维护了一个等待资源的列表。将等待资源的线程通过链表的方式串了起来。

    当然AQS的功能远不仅如此,它还提供了一套独占锁的API,帮助使用者实现独占锁的功能。
    最常用的Reentrantlock就是使用这套API做的。
    有机会的话再和大家分享下它的实现。

    http://www.liuinsect.com/2014/02/17/futuretask-%e6%ba%90%e7%a0%81%e8%a7%a3%e6%9e%90/

  • 相关阅读:
    HashMap按键排序和按值排序
    LeetCode 91. Decode Ways
    LeetCode 459. Repeated Substring Pattern
    JVM
    LeetCode 385. Mini Parse
    LeetCode 319. Bulb Switcher
    LeetCode 343. Integer Break
    LeetCode 397. Integer Replacement
    LeetCode 3. Longest Substring Without Repeating Characters
    linux-网络数据包抓取-tcpdump
  • 原文地址:https://www.cnblogs.com/softidea/p/4819801.html
Copyright © 2011-2022 走看看