zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-29 Future模式的实现

      Future模式是一个重要的异步并发模式,在JDK有实现。但JDK实现的Future模式功能比较简单,使用起来比较复杂。Netty在JDK Future基础上,加强了Future的能力,具体体现在:

    1. 更加简单的结果返回方式。在JDK中,需要用户自己实现Future对象的执行及返回结果。而在Netty中可以使用Promise简单地调用方法返回结果。
    2. 更加灵活的结果处理方式。JDK中只提供了主动得到结果的get方法,要么阻塞,要么轮询。Netty除了支持主动get方法外,还可以使用Listener被动监听结果。
    3. 实现了进度监控。Netty提供了ProgressiveFuture、ProgressivePromise和GenericProgressiveFutureListener接口及其实现,支持对执行进程的监控。

      吹了那么多牛,有一个关键问题还没弄清楚:Future到底是干嘛的?io.netty.util.concurrent.Future代码的第一行注释简洁第回答了这个问题:Future就是异步操作的结果。这里面有三个关键字:异步,操作,结果。首先,Future首先是一个“结果”;其次这个结果产生于一个“操作”,操作具体是什么可以随便定义;最后这个操作是"异步"执行的,这就意味着“操作”可能在另一个线程中并发执行,也可能随后在同一个线程中执行,什么时候产生结果是一件不确定的事。

      异步调用过程的一般过程是:调用方唤起一个异步操作,在接下来的某个恰当的时间点得到的异步操作操作的结果。要正确地完成上述步骤,需要解决以下几个问题:

    • 怎样维护这个调用状态?
    • 如何获取异步操作的结果?
    • 何时处理结果?

      io.netty.util.concurrent.DefaultPromise是Future的默认实现,以上三个问题的答案都能在这个类的代码中找到。

    DefaultPromise的派生体系

      下面是DefaultPromis及其父类,接口的声明:

      public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> 
    
      public abstract class AbstractFuture<V> implements Future<V>
    
      public interface Promise<V> extends Future<V> 
    
      public interface Future<V> extends java.util.concurrent.Future<V> 

       可以看出,DefaultPromise派生自AbstractFuture类,并实现了Promise接口。抽象类型AbstractFuture派生自Future, 接口Promise派生自Future。Future派生自JDK的Future接口。

      和JDK的Future相比,Netty的Future接口增加一些自己的方法:

       /**
         当操作成功时返回true*/
        boolean isSuccess();
    
        /**
       只有当操作可以被取消时返回true
    */ boolean isCancellable(); /** 返回操作的异常*/ Throwable cause(); /** 添加一个监听器到future。当操作完成(成功或失败都算完成,此事isDone()返回true)时, 会通知这个监听器。如果添加时操作已经完成,
       这个监听器会立即被通知。
    */ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); /** 和上个方法一样,可以同时添加多个监听器*/ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** 删除指定的监听器, 如果这个监听器还没被通知的话。*/ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); /** 功能和上个方法一样,可以同时删除多个监听器。*/ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** 同步等待直到操作完成。会被打断。 */ Future<V> sync() throws InterruptedException; /**    同步等着知道操作完成。不会被打断。 */ Future<V> syncUninterruptibly(); /** 同sync*/ Future<V> await() throws InterruptedException; /** 同synUniterruptibliy*/ Future<V> awaitUninterruptibly(); /** 等待,直到操作完成或超过指定的时间。会被打断。*/ boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** 同上*/ boolean await(long timeoutMillis) throws InterruptedException; /** 同上,不会被打断。*/ boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** 同上。*/ boolean awaitUninterruptibly(long timeoutMillis); /** 立即得到结果,不会阻塞。如果操作没有完成或没有成功,返回null*/ V getNow();

      Netty的Future最大的特点是增加了Listener被动接收任务完成通知,下面是两个Listener接口的定义:

    public interface GenericFutureListener<F extends Future<?>> extends EventListener {
        void operationComplete(F future) throws Exception;
    }
    
    public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
        void operationProgressed(F future, long progress, long total) throws Exception;
    }

      把一个listener添加到future之后。当异步操作完成之后,listener会被通知一次,同时会回调operationComplete方法。参数future是当前通知的future,这意味这,一个listener可以被添加到多个future中。

      当异步操作进度发送变化时,listener会被通知,同时会回调operationProgressed方法。progress是当前进度,total是总进度。progress==total表示操作完成。如果不知道何时完成操作progress=-1。

      Promise定义的方法:

        /**
        设置结果。把这个future设置为success,通知所有的listener,
      如果这个future已经是success或failed(操作已经完成),会抛出IllegalStateException
    */ Promise<V> setSuccess(V result); /**
    同上。只有在操作没有完成的时候才会生效,且会返回true */ boolean trySuccess(V result); /** 设置异常。把这个future设置为failed状态,通知所有的listener.
    如果这个future已经完成,会抛出IllegalStateException
    */ Promise<V> setFailure(Throwable cause); /** 同上。只有在操作没有完成时才会生效,且返回ture */ boolean tryFailure(Throwable cause); /** 设置当前前future的操作不能被取消。这个future没有完成且可以设置成功或这个future已经完成,返回true。否则返回false */ boolean setUncancellable();

    DefaultPromise的设计

    关键属性

      volatile Object result;

      异步操作的结果。可以通过它的值知道当前future的状态。

      final EventExecutor executor;

      通知listener的线程。

      Object listeners;

      维护添加到当前future的listener对象。

      short waiters;

      记录当前真正等待结果的线程数量。

      boolean notifyingListeners;

      是否正在通知listener,防止多线程并发执行通知操作。

    状态管理

      future有4种状态: 未完成, 未完成-不能取消,完成-成功,完成-失败。使用isDone()判断是否完成,它代码如下:

    1     @Override
    2     public boolean isDone() {
    3         return isDone0(result);
    4     }
    5 
    6     private static boolean isDone0(Object result) {
    7         return result != null && result != UNCANCELLABLE;
    8     }

      第7行是判断当前完成状态的。result != null 且 result != UNCANCELLABLE,表示处于完成状态。

      result默认是null, 此时future处于未完成状态。可以使用setUncancellable方法把它设置成为完成-不能取消状态。

    1     @Override
    2     public boolean setUncancellable() {
    3         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
    4             return true;
    5         }
    6         Object result = this.result;
    7         return !isDone0(result) || !isCancelled0(result);
    8     }

      第3行,使用原子操作设置result的值,只有result==null时才能把result设置成UNCANCELLABLE。当result==UNCANCELLABLE时,不允许取消异步操作。

      使用isSuccess方法判断future是否处于完成-成功状态。

    1     @Override
    2     public boolean isSuccess() {
    3         Object result = this.result;
    4         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
    5     }

      第4行是完成-成功状态result的取值:除null, UNCANCELLABLE和CauseHolder对象的任何值。

      只有满足isDone() && !isSuccess()时,future处于完成失败状态,可以使用cause方法获取异常。

      调用setSuccess和trySuccess方法,能够把状态转换成完成-成功。

     1     @Override
     2     public Promise<V> setSuccess(V result) {
     3         if (setSuccess0(result)) {
     4             notifyListeners();
     5             return this;
     6         }
     7         throw new IllegalStateException("complete already: " + this);
     8     }
     9     
    10     private boolean setSuccess0(V result) {
    11         return setValue0(result == null ? SUCCESS : result);
    12     }

      第3行尝试把状态设置成完成-成功状态。如果可以,在第4行通知所有的listener。否则第7行抛出错误。第11行给出了成功的默认值SUCCESS。trySuccess少了第7行,不会抛出异常。

      调用setFailure和tryFailure方法,能够包状态转换成完成-失败状态。

     1     @Override
     2     public Promise<V> setFailure(Throwable cause) {
     3         if (setFailure0(cause)) {
     4             notifyListeners();
     5             return this;
     6         }
     7         throw new IllegalStateException("complete already: " + this, cause);
     8     }
     9 
    10     private boolean setFailure0(Throwable cause) {
    11         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    12     }

      第3行尝试把专题设置成完成-失败状态。如果可以,在第4行通知所有listener。否则在第7行抛出异常。第11行把异常包装成CauseHolder对象。tryFailure少了第7行,不会抛出异常。

    获取异步操作的结果

      当异步操作完成时,调用Promise提供的setSuccess和trySuccess设置成功的结果,调用setFailure和tryFailure设置异常结果。不论什么结果,都会使用setValue0方法保存到result属性上。

    1     private boolean setValue0(Object objResult) {
    2         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
    3             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    4             checkNotifyWaiters();
    5             return true;
    6         }
    7         return false;
    8     }

      第2,3行,使用原子操作设置result的值,只有result==null或result==UNCANCELLABLE时,才能设置成功。如果设置成功,在第4行唤醒所有等待中的线程。可以使用get方法得到result值。如果isSucess()==true, result的值是SUCCESS或异步操作的结果。否则result的值是CauseHolder对象,此时可以调用cause方法得到异常对象。

      使用get或cause,只有在异步操作完成后才能顺利得到结果。可以使用listener,被动等待操作完成通知。

    使用listener异步通知处理结果

      Future的listener是必须实现GenericFutureListener接口,调用方法可以在operationComplete方法中处理异步操作的结果。

      listeners属性用来保存使用addListener,addListeners方法添加到future的listener。listeners可能使用一个GenericFutureListener对象,也可能是一个GenericFutureListener数组。所有添加listener方法都会调用addListener0方法添加listener。

    1     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    2         if (listeners == null) {
    3             listeners = listener;
    4         } else if (listeners instanceof DefaultFutureListeners) {
    5             ((DefaultFutureListeners) listeners).add(listener);
    6         } else {
    7             listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
    8         }
    9     }

      这段代码中使用了一个DefaultFutureListeners类,它内部维护了一个GenericFutureListener数组。

      当一次操作完成时,会调用notifyListeners方法通知listeners中所有的listener,并调用listener的operationComplete方法。只有当isDone()==true时才会调用notifyListeners方法。触发点在下面的一些方法中:

      addListener, addListeners。

      setSuccess, trySuccess。

      setFailure, tryFailure。

      notifyListeners的代码如下:

     1     private void notifyListeners() {
     2         EventExecutor executor = executor();
     3         if (executor.inEventLoop()) {
     4             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
     5             final int stackDepth = threadLocals.futureListenerStackDepth();
     6             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
     7                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
     8                 try {
     9                     notifyListenersNow();
    10                 } finally {
    11                     threadLocals.setFutureListenerStackDepth(stackDepth);
    12                 }
    13                 return;
    14             }
    15         }
    16 
    17         safeExecute(executor, new Runnable() {
    18             @Override
    19             public void run() {
    20                 notifyListenersNow();
    21             }
    22         });
    23     }

      这段代码的作用是调用notifyListenersNow。如果当前线程就是executor的线程,在第9行直接调用notifyListenerNow,否则在第20行,把notifyListnerNow放在executor中执行。第4-7行和11行的作用是防止递归调用导致线程栈溢出,MAX_LISTENER_STACK_DEPTH就是listener递归调用的最大深度。

      notifyListenerNow的作用是,确保没有并发执行notifyListener0或notifyListners0方法,且所有的listener只能被通知一次。

     1     private void notifyListenersNow() {
     2         Object listeners;
     3         synchronized (this) {
     4             // Only proceed if there are listeners to notify and we are not already notifying listeners.
     5             if (notifyingListeners || this.listeners == null) {
     6                 return;
     7             }
     8             notifyingListeners = true;
     9             listeners = this.listeners;
    10             this.listeners = null;
    11         }
    12         for (;;) {
    13             if (listeners instanceof DefaultFutureListeners) {
    14                 notifyListeners0((DefaultFutureListeners) listeners);
    15             } else {
    16                 notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
    17             }
    18             synchronized (this) {
    19                 if (this.listeners == null) {
    20                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
    21                     // need to be in a finally block.
    22                     notifyingListeners = false;
    23                     return;
    24                 }
    25                 listeners = this.listeners;
    26                 this.listeners = null;
    27             }
    28         }
    29     }

      第3-11行的作用是防止多个线程并发执行11行之后的代码。

      结合第5,9,10行可知, listeners中的所有listener只能被通知一次。

      13-17行,通知所有listeners。notifyListener0通知一个listener,notifyListeners0通知所有的listener。

      最后,18-27行,检查在通知listeners的过程中,是否有新的listener被添加进来。如果有,25,26行得到所有新添加的listener并清空listeners属性,13-17行继续通知新添加的listener。否则,运行22,23行结束通知过程。

     1     private void notifyListeners0(DefaultFutureListeners listeners) {
     2         GenericFutureListener<?>[] a = listeners.listeners();
     3         int size = listeners.size();
     4         for (int i = 0; i < size; i ++) {
     5             notifyListener0(this, a[i]);
     6         }
     7     }
     8 
     9     @SuppressWarnings({ "unchecked", "rawtypes" })
    10     private static void notifyListener0(Future future, GenericFutureListener l) {
    11         try {
    12             l.operationComplete(future);
    13         } catch (Throwable t) {
    14             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    15         }
    16     }

      1-7行,notifyListeners0对每个listener调用一次notifyListener0,参数是当前的future。

      10-16,调用listener的operationComplete方法,捕获了所有的异常,确保接下来可以继续通知下一个listener。

    使用await机制同步等待结果

      可以使用一系列的await,awaitXXX方法同步等待结果。这些方法可以分为: 能被打断的,不能被打断的。一直等待的,有超时时间的。await0方法是最复杂的等待实现,所有带超时时间的await方法都会调用它。

     1     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
     2         if (isDone()) {
     3             return true;
     4         }
     5 
     6         if (timeoutNanos <= 0) {
     7             return isDone();
     8         }
     9 
    10         if (interruptable && Thread.interrupted()) {
    11             throw new InterruptedException(toString());
    12         }
    13 
    14         checkDeadLock();
    15 
    16         long startTime = System.nanoTime();
    17         long waitTime = timeoutNanos;
    18         boolean interrupted = false;
    19         try {
    20             for (;;) {
    21                 synchronized (this) {
    22                     if (isDone()) {
    23                         return true;
    24                     }
    25                     incWaiters();
    26                     try {
    27                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
    28                     } catch (InterruptedException e) {
    29                         if (interruptable) {
    30                             throw e;
    31                         } else {
    32                             interrupted = true;
    33                         }
    34                     } finally {
    35                         decWaiters();
    36                     }
    37                 }
    38                 if (isDone()) {
    39                     return true;
    40                 } else {
    41                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
    42                     if (waitTime <= 0) {
    43                         return isDone();
    44                     }
    45                 }
    46             }
    47         } finally {
    48             if (interrupted) {
    49                 Thread.currentThread().interrupt();
    50             }
    51         }
    52     }

      这个方法返回的条件有: (1)isDone()==true;(2)允许被打断(interrupted==true)的情况下被打断;(3)已经超时。2-12行分别检查了这3种情况。

      25,35行管理waiters属性,这个属性用来记录当前正在等待的线程数。inWaiters方法正常情况下会把waiters加1,当检查到waiters==Short.MAX_VALUE时会抛出异常,防止过多的线程等待。

      27行,调用wait等待,经历waitTime后超时返回。在等待过程中,会被setValue0方法调用notifyAll唤醒。

      29-33行,处理被打断的异常,如果运行被打断,在30行抛出这个异常返回。

      38-45行,不论什么原因线程被唤醒,检查是否满足返回条件,如果不满足,继续循环等待。

      没有超时的wait方法实现要简单一些,只需判读返回条件(1)(2)。

    跟踪异步操作的执行进度

      如果想要跟踪异步操作的执行进度,future需要换成DefaultProgressivePromise对象,listener需要换成GenericProgressiveFutureListener类型。DefaultProgressivePromise派生自DefaultPromise同时实现了ProgressivePromise接口。GenericProgressiveFutureListener接口派生自GenericFutureListener接口。

      ProgressivePromise定义了setProgress和tryProgress方法用来更新进度,是不是很眼熟,和Promise接口定义返回结果的方法很类似。

    ProgressivePromise<V> setProgress(long progress, long total);
    boolean tryProgress(long progress, long total);

      GenericProgressiveFutureListener定义了operationProgressed方法用来处理进度更新通知。

         void operationProgressed(F future, long progress, long total) throws Exception;

      

      DefaultProgressivePromise自己只实现了setProgress和tryProgress方法,其它都是复用了DefaultPromise的实现。

     1     @Override
     2     public ProgressivePromise<V> setProgress(long progress, long total) {
     3         if (total < 0) {
     4             // total unknown
     5             total = -1; // normalize
     6             if (progress < 0) {
     7                 throw new IllegalArgumentException("progress: " + progress + " (expected: >= 0)");
     8             }
     9         } else if (progress < 0 || progress > total) {
    10             throw new IllegalArgumentException(
    11                     "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))");
    12         }
    13 
    14         if (isDone()) {
    15             throw new IllegalStateException("complete already");
    16         }
    17 
    18         notifyProgressiveListeners(progress, total);
    19         return this;
    20     }

      3-12行,检查progress和total的合法性。

      14行,如isDone()==true,抛出异常。只有在操作还没完成的是否更新进度才有意义。

      18行,调用notifyProgressiveListeners触发进度更新通知,这个方法在DefaultPromise中实现。

      notifyProgressiveListeners实现了触发进度更新通知的主要流程:

     1     void notifyProgressiveListeners(final long progress, final long total) {
     2         final Object listeners = progressiveListeners();
     3         if (listeners == null) {
     4             return;
     5         }
     6 
     7         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
     8 
     9         EventExecutor executor = executor();
    10         if (executor.inEventLoop()) {
    11             if (listeners instanceof GenericProgressiveFutureListener[]) {
    12                 notifyProgressiveListeners0(
    13                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
    14             } else {
    15                 notifyProgressiveListener0(
    16                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
    17             }
    18         } else {
    19             if (listeners instanceof GenericProgressiveFutureListener[]) {
    20                 final GenericProgressiveFutureListener<?>[] array =
    21                         (GenericProgressiveFutureListener<?>[]) listeners;
    22                 safeExecute(executor, new Runnable() {
    23                     @Override
    24                     public void run() {
    25                         notifyProgressiveListeners0(self, array, progress, total);
    26                     }
    27                 });
    28             } else {
    29                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
    30                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
    31                 safeExecute(executor, new Runnable() {
    32                     @Override
    33                     public void run() {
    34                         notifyProgressiveListener0(self, l, progress, total);
    35                     }
    36                 });
    37             }
    38         }
    39     }

      第3行,从listeners中选出GenericProgressiveFutureListener类型的listener。

      10-38行。调用notifyProgressiveListeners0, notifyProgressiveListener0通知进度跟新。11-17行,在当前线程中调用。

      19-37行,在executor中调用。notifyProgressiveListener0只是简单地调用listener的operationProgressed方法。notifyProgressiveListeners0是对每个listener调用一次notifyProgressiveListener0。

      和完成通知相比,进度更新通知要更加简单。进度更新通知没有处理并发问题,没有处理栈溢出问题。

      

  • 相关阅读:
    校门外的树
    年龄与疾病
    数组逆序重放
    计算书费
    陶陶摘苹果
    与指定数字相同的数的个数
    上学路线
    NOIP2011 普及組 統計單詞數
    [HAOI2012]音量调节
    [USACO11JAN]利润Profits
  • 原文地址:https://www.cnblogs.com/brandonli/p/11858862.html
Copyright © 2011-2022 走看看