zoukankan      html  css  js  c++  java
  • Netty中的ChannelFuture和ChannelPromise

    在Netty使用ChannelFuture和ChannelPromise进行异步操作的处理

    这是官方给出的ChannelFutur描述

     1  *                                      | Completed successfully    |
     2  *                                      +---------------------------+
     3  *                                 +---->      isDone() = true      |
     4  * +--------------------------+    |    |   isSuccess() = true      |
     5  * |        Uncompleted       |    |    +===========================+
     6  * +--------------------------+    |    | Completed with failure    |
     7  * |      isDone() = false    |    |    +---------------------------+
     8  * |   isSuccess() = false    |----+---->      isDone() = true      |
     9  * | isCancelled() = false    |    |    |       cause() = non-null  |
    10  * |       cause() = null     |    |    +===========================+
    11  * +--------------------------+    |    | Completed by cancellation |
    12  *                                 |    +---------------------------+
    13  *                                 +---->      isDone() = true      |
    14  *                                      | isCancelled() = true      |
    15  *                                      +---------------------------+

    由图可以知道ChannelFutur有四种状态:Uncompleted、Completed successfully、Completed with failure、Completed by cancellation,这几种状态是由isDone、isSuccess、isCancelled、cause这四种方法的返回值决定的。

    ChannelFutur接口的定义如下:

     1 public interface ChannelFuture extends Future<Void> {
     2     Channel channel();
     3 
     4     ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
     5 
     6     ChannelFuture addListeners(GenericFutureListener... var1);
     7 
     8     ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
     9 
    10     ChannelFuture removeListeners(GenericFutureListener... var1);
    11 
    12     ChannelFuture sync() throws InterruptedException;
    13 
    14     ChannelFuture syncUninterruptibly();
    15 
    16     ChannelFuture await() throws InterruptedException;
    17 
    18     ChannelFuture awaitUninterruptibly();
    19 
    20     boolean isVoid();
    21 }

    继承自Netty的Future:

     1 public interface Future<V> extends java.util.concurrent.Future<V> {
     2     boolean isSuccess();
     3 
     4     boolean isCancellable();
     5 
     6     Throwable cause();
     7 
     8     Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
     9 
    10     Future<V> addListeners(GenericFutureListener... var1);
    11 
    12     Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
    13 
    14     Future<V> removeListeners(GenericFutureListener... var1);
    15 
    16     Future<V> sync() throws InterruptedException;
    17 
    18     Future<V> syncUninterruptibly();
    19 
    20     Future<V> await() throws InterruptedException;
    21 
    22     Future<V> awaitUninterruptibly();
    23 
    24     boolean await(long var1, TimeUnit var3) throws InterruptedException;
    25 
    26     boolean await(long var1) throws InterruptedException;
    27 
    28     boolean awaitUninterruptibly(long var1, TimeUnit var3);
    29 
    30     boolean awaitUninterruptibly(long var1);
    31 
    32     V getNow();
    33 
    34     boolean cancel(boolean var1);
    35 }

    Netty的Future又继承自JDK的Future:

     1 public interface Future<V> {
     2 
     3     boolean cancel(boolean mayInterruptIfRunning);
     4     
     5     boolean isCancelled();
     6 
     7     boolean isDone();
     8     
     9     V get() throws InterruptedException, ExecutionException;
    10 
    11     V get(long timeout, TimeUnit unit)
    12         throws InterruptedException, ExecutionException, TimeoutException;
    13 }


    ChannelPromise继承了ChannelFuture:

     1 public interface ChannelPromise extends ChannelFuture, Promise<Void> {
     2     Channel channel();
     3 
     4     ChannelPromise setSuccess(Void var1);
     5 
     6     ChannelPromise setSuccess();
     7 
     8     boolean trySuccess();
     9 
    10     ChannelPromise setFailure(Throwable var1);
    11 
    12     ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
    13 
    14     ChannelPromise addListeners(GenericFutureListener... var1);
    15 
    16     ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
    17 
    18     ChannelPromise removeListeners(GenericFutureListener... var1);
    19 
    20     ChannelPromise sync() throws InterruptedException;
    21 
    22     ChannelPromise syncUninterruptibly();
    23 
    24     ChannelPromise await() throws InterruptedException;
    25 
    26     ChannelPromise awaitUninterruptibly();
    27 
    28     ChannelPromise unvoid();
    29 }

    其中Promise接口定义如下:

     1 public interface Promise<V> extends Future<V> {
     2     Promise<V> setSuccess(V var1);
     3 
     4     boolean trySuccess(V var1);
     5 
     6     Promise<V> setFailure(Throwable var1);
     7 
     8     boolean tryFailure(Throwable var1);
     9 
    10     boolean setUncancellable();
    11 
    12     Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
    13 
    14     Promise<V> addListeners(GenericFutureListener... var1);
    15 
    16     Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
    17 
    18     Promise<V> removeListeners(GenericFutureListener... var1);
    19 
    20     Promise<V> await() throws InterruptedException;
    21 
    22     Promise<V> awaitUninterruptibly();
    23 
    24     Promise<V> sync() throws InterruptedException;
    25 
    26     Promise<V> syncUninterruptibly();
    27 }


    在Netty中,无论是服务端还是客户端,在Channel注册时都会为其绑定一个ChannelPromise,默认实现是DefaultChannelPromise

    DefaultChannelPromise定义如下:

      1 public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
      2 
      3     private final Channel channel;
      4     private long checkpoint;
      5 
      6     public DefaultChannelPromise(Channel channel) {
      7         this.channel = checkNotNull(channel, "channel");
      8     }
      9     
     10     public DefaultChannelPromise(Channel channel, EventExecutor executor) {
     11         super(executor);
     12         this.channel = checkNotNull(channel, "channel");
     13     }
     14 
     15     @Override
     16     protected EventExecutor executor() {
     17         EventExecutor e = super.executor();
     18         if (e == null) {
     19             return channel().eventLoop();
     20         } else {
     21             return e;
     22         }
     23     }
     24 
     25     @Override
     26     public Channel channel() {
     27         return channel;
     28     }
     29 
     30     @Override
     31     public ChannelPromise setSuccess() {
     32         return setSuccess(null);
     33     }
     34 
     35     @Override
     36     public ChannelPromise setSuccess(Void result) {
     37         super.setSuccess(result);
     38         return this;
     39     }
     40 
     41     @Override
     42     public boolean trySuccess() {
     43         return trySuccess(null);
     44     }
     45 
     46     @Override
     47     public ChannelPromise setFailure(Throwable cause) {
     48         super.setFailure(cause);
     49         return this;
     50     }
     51 
     52     @Override
     53     public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
     54         super.addListener(listener);
     55         return this;
     56     }
     57 
     58     @Override
     59     public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
     60         super.addListeners(listeners);
     61         return this;
     62     }
     63 
     64     @Override
     65     public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
     66         super.removeListener(listener);
     67         return this;
     68     }
     69 
     70     @Override
     71     public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
     72         super.removeListeners(listeners);
     73         return this;
     74     }
     75 
     76     @Override
     77     public ChannelPromise sync() throws InterruptedException {
     78         super.sync();
     79         return this;
     80     }
     81 
     82     @Override
     83     public ChannelPromise syncUninterruptibly() {
     84         super.syncUninterruptibly();
     85         return this;
     86     }
     87 
     88     @Override
     89     public ChannelPromise await() throws InterruptedException {
     90         super.await();
     91         return this;
     92     }
     93 
     94     @Override
     95     public ChannelPromise awaitUninterruptibly() {
     96         super.awaitUninterruptibly();
     97         return this;
     98     }
     99 
    100     @Override
    101     public long flushCheckpoint() {
    102         return checkpoint;
    103     }
    104 
    105     @Override
    106     public void flushCheckpoint(long checkpoint) {
    107         this.checkpoint = checkpoint;
    108     }
    109 
    110     @Override
    111     public ChannelPromise promise() {
    112         return this;
    113     }
    114 
    115     @Override
    116     protected void checkDeadLock() {
    117         if (channel().isRegistered()) {
    118             super.checkDeadLock();
    119         }
    120     }
    121 
    122     @Override
    123     public ChannelPromise unvoid() {
    124         return this;
    125     }
    126 
    127     @Override
    128     public boolean isVoid() {
    129         return false;
    130     }
    131 }

    可以看到这个DefaultChannelPromise仅仅是将Channel封装了,而且其基本上所有方法的实现都依赖于父类DefaultPromise

    DefaultPromise中的实现是整个ChannelFuture和ChannelPromise的核心所在:

    DefaultPromise中有如下几个状态量:

    1 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
    2             SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    3 private static final Object SUCCESS = new Object();
    4 private static final Object UNCANCELLABLE = new Object();
    5 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
    6             new CancellationException(), DefaultPromise.class, "cancel(...)"));
    7 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
    8             AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");         

    MAX_LISTENER_STACK_DEPTH: 表示最多可执行listeners的数量,默认是8
    SUCCESS :表示异步操作正常完成
    UNCANCELLABLE:表示异步操作不可取消,并且尚未完成
    CANCELLATION_CAUSE_HOLDER:表示异步操作取消监听,用于cancel操作,
    而CauseHolder 的实例对象是用来表示异步操作异常结束,同时保存异常信息:

    1 private static final class CauseHolder {
    2     final Throwable cause;
    3     CauseHolder(Throwable cause) {
    4         this.cause = cause;
    5     }
    6 }


    RESULT_UPDATER:是一个原子更新器,通过CAS操作,原子化更新 DefaultPromise对象的名为result的成员,这个result成员是其异步操作判断的关键所在

    DefaultPromise的成员及构造方法定义:

     1 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
     2     private volatile Object result;
     3     private final EventExecutor executor;
     4     private Object listeners;
     5     private short waiters;
     6     private boolean notifyingListeners;
     7     
     8     public DefaultPromise(EventExecutor executor) {
     9         this.executor = checkNotNull(executor, "executor");
    10     }
    11 }

    result:就是前面说的,判断异步操作状态的关键
    result的取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及null (其实还可以是泛型V类型的任意对象,这里暂不考虑)
    executor:就是Channel绑定的NioEventLoop,在我之前的博客说过,Channel的异步操作都是在NioEventLoop的线程中完成的([Netty中NioEventLoopGroup的创建源码分析](https://blog.csdn.net/Z_ChenChen/article/details/90567863))
    listeners:通过一个Object保存所有对异步操作的监听,用于异步操作的回调
    waiters:记录阻塞中的listeners的数量
    notifyingListeners:是否需要唤醒的标志

    首先来看isDone方法,通过之前的图可以知道,
    isDone为false对应了Uncompleted状态,即异步操作尚未完成;
    isDone为true则代表了异步操作完成,但是还是有三种完成情况,需要结合别的判断方法才能具体知道是哪种情况;

    isDone方法:

    1 @Override
    2 public boolean isDone() {
    3     return isDone0(result);
    4 }

    调用isDone0:

    1 private static boolean isDone0(Object result) {
    2     return result != null && result != UNCANCELLABLE;
    3 }

    有如下几种情况:
    result等于null,result没有赋值,表示异步操作尚未完成(从这里就能想到异步操作完成,需要调用某个set方法来改变result的状态)
    result是UNCANCELLABLE状态,表示执行中的异步操作不可取消,当然也就是异步操作尚未完成
    result不等于null,且不等于UNCANCELLABLE,就表示异步操作完成(包括正常完成,以及异常结束,需要由cause方法进一步判断)

    isSuccess方法:

    1 @Override
    2 public boolean isSuccess() {
    3     Object result = this.result;
    4     return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
    5 }

    由这里可以知道当且仅当result 为SUCCESS状态时,才返回true(其余除UNCANCELLABLE和null的值其实也可以,这里暂不考虑)

    isCancelled方法:

    1 @Override
    2 public boolean isCancelled() {
    3     return isCancelled0(result);
    4 }

    调用isCancelled0方法:

    1 private static boolean isCancelled0(Object result) {
    2     return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
    3 }

    只有当result是CancellationException的实例时,表示取消异步操作

    接着来看cause方法:

    1 @Override
    2 public Throwable cause() {
    3     Object result = this.result;
    4     return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
    5 }

    和上面同理,通过判别resul是否是CauseHolder的实现类,若是,将CauseHolder保存的异常返回。

    几种状态的判别说完了,下面看一下如何设置这几种状态的:
    setSuccess方法:

    1 @Override
    2 public Promise<V> setSuccess(V result) {
    3     if (setSuccess0(result)) {
    4         notifyListeners();
    5         return this;
    6     }
    7     throw new IllegalStateException("complete already: " + this);
    8 }

    首先调用setSuccess0方法,其中result的泛型通过DefaultChannelPromise可知是Void,在DefaultChannelPromise中所有的set和try操作参数都是null,这里的result也就不去考虑:

    1 private boolean setSuccess0(V result) {
    2     return setValue0(result == null ? SUCCESS : result);
    3 }

    继续调用setValue0方法:

    1 private boolean setValue0(Object objResult) {
    2     if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
    3         RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    4         checkNotifyWaiters();
    5         return true;
    6     }
    7     return false;
    8 }

    通过CAS操作,将result状态变为SUCCESS

    其中checkNotifyWaiters方法:

    1 private synchronized void checkNotifyWaiters() {
    2     if (waiters > 0) {
    3         notifyAll();
    4     }
    5 }

    检查waiters的个数,唤醒所有阻塞中的this,sync方法会引起阻塞

    回到setSuccess方法中,setSuccess0通过CAS操作,将result状态更新为SUCCESS成功后,调用
    notifyListeners方法,唤醒所有listener完成对异步操作的回调

    listeners是通过addListener方法添加的,用来对异步操作进行侦听:
    看到addListener方法:

     1 @Override
     2 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
     3     checkNotNull(listener, "listener");
     4     
     5     synchronized (this) {
     6     addListener0(listener);
     7     }
     8     
     9     if (isDone()) {
    10     notifyListeners();
    11     }
    12     
    13     return this;
    14 }
    15 
    16 @Override
    17 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    18     checkNotNull(listeners, "listeners");
    19 
    20     synchronized (this) {
    21         for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
    22             if (listener == null) {
    23                 break;
    24             }
    25             addListener0(listener);
    26         }
    27     }
    28 
    29     if (isDone()) {
    30         notifyListeners();
    31     }
    32 
    33     return this;
    34 }

    其中GenericFutureListener接口定义如下:

    1 public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    2      /**
    3      * Invoked when the operation associated with the {@link Future} has been completed.
    4      *
    5      * @param future  the source {@link Future} which called this callback
    6      */
    7     void operationComplete(F future) throws Exception;
    8 }

    可以看到listener其实就是通过operationComplete方法,来完成回调,对Future对象进行处理,由注释可知operationComplete方法是在future操作完成时调用

    addListeners方法的实现比较简单,实现核心是在addListener0中:

    1 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    2     if (listeners == null) {
    3         listeners = listener;
    4     } else if (listeners instanceof DefaultFutureListeners) {
    5         ((DefaultFutureListeners) listeners).add(listener);
    6     } else {
    7         listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    8     }
    9 }

    其中DefaultFutureListeners是将GenericFutureListener对象封装的一个数组:

     1 final class DefaultFutureListeners {
     2 
     3     private GenericFutureListener<? extends Future<?>>[] listeners;
     4     private int size;
     5     private int progressiveSize;
     6 
     7     @SuppressWarnings("unchecked")
     8     DefaultFutureListeners(
     9             GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
    10         listeners = new GenericFutureListener[2];
    11         listeners[0] = first;
    12         listeners[1] = second;
    13         size = 2;
    14         if (first instanceof GenericProgressiveFutureListener) {
    15             progressiveSize ++;
    16         }
    17         if (second instanceof GenericProgressiveFutureListener) {
    18             progressiveSize ++;
    19         }
    20     }
    21 
    22     public void add(GenericFutureListener<? extends Future<?>> l) {
    23         GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
    24         final int size = this.size;
    25         if (size == listeners.length) {
    26             this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
    27         }
    28         listeners[size] = l;
    29         this.size = size + 1;
    30 
    31         if (l instanceof GenericProgressiveFutureListener) {
    32             progressiveSize ++;
    33         }
    34     }
    35 
    36     public void remove(GenericFutureListener<? extends Future<?>> l) {
    37         final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
    38         int size = this.size;
    39         for (int i = 0; i < size; i ++) {
    40             if (listeners[i] == l) {
    41                 int listenersToMove = size - i - 1;
    42                 if (listenersToMove > 0) {
    43                     System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
    44                 }
    45                 listeners[-- size] = null;
    46                 this.size = size;
    47 
    48                 if (l instanceof GenericProgressiveFutureListener) {
    49                     progressiveSize --;
    50                 }
    51                 return;
    52             }
    53         }
    54     }
    55 
    56     public GenericFutureListener<? extends Future<?>>[] listeners() {
    57         return listeners;
    58     }
    59 
    60     public int size() {
    61         return size;
    62     }
    63 
    64     public int progressiveSize() {
    65         return progressiveSize;
    66     }
    67 }

    size:记录listeners的个数
    progressiveSize:记录GenericProgressiveFutureListener类型的listeners的个数
    DefaultFutureListeners 中对数组的操作比较简单,
    add方法,当size达到数组长度时,进行二倍扩容,

    其中GenericProgressiveFutureListener继承自GenericFutureListener:

     1 public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
     2     /**
     3      * Invoked when the operation has progressed.
     4      *
     5      * @param progress the progress of the operation so far (cumulative)
     6      * @param total the number that signifies the end of the operation when {@code progress} reaches at it.
     7      *              {@code -1} if the end of operation is unknown.
     8      */
     9     void operationProgressed(F future, long progress, long total) throws Exception;
    10 }

    由注释可知operationProgressed是在future操作进行时调用,这里不对GenericProgressiveFutureListener过多讨论

    回到addListener0方法,由DefaultFutureListeners就可以知道,实际上通过DefaultFutureListeners管理的一维数组来保存listeners

    在addListener方法完成对listener的添加后,还会调用isDone方法检查当前异步操作是否完成,若是完成需要调用notifyListeners,直接唤醒所有listeners完后对异步操作的回调

    有add就有remove,removeListener方法:

     1 @Override
     2 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
     3     checkNotNull(listener, "listener");
     4 
     5     synchronized (this) {
     6         removeListener0(listener);
     7     }
     8 
     9     return this;
    10 }
    11 
    12 @Override
    13 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
    14     checkNotNull(listeners, "listeners");
    15 
    16     synchronized (this) {
    17         for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
    18             if (listener == null) {
    19                 break;
    20             }
    21             removeListener0(listener);
    22         }
    23     }
    24 
    25     return this;
    26 }

    还是由removeListener0来实现:

    1 private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    2     if (listeners instanceof DefaultFutureListeners) {
    3         ((DefaultFutureListeners) listeners).remove(listener);
    4     } else if (listeners == listener) {
    5         listeners = null;
    6     }
    7 }

    看过之前的内容在看这里就比较简单了,通过DefaultFutureListeners去删除

    notifyListeners方法:

     1 private void notifyListeners() {
     2     EventExecutor executor = executor();
     3     if (executor.inEventLoop()) {
     4         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
     5         final int stackDepth = threadLocals.futureListenerStackDepth();
     6         if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
     7             threadLocals.setFutureListenerStackDepth(stackDepth + 1);
     8             try {
     9                 notifyListenersNow();
    10             } finally {
    11                 threadLocals.setFutureListenerStackDepth(stackDepth);
    12             }
    13             return;
    14         }
    15     }
    16 
    17     safeExecute(executor, new Runnable() {
    18         @Override
    19         public void run() {
    20             notifyListenersNow();
    21         }
    22     });
    23 }

    其中executor方法:

    1 protected EventExecutor executor() {
    2     return executor;
    3 }

    用来获取executor轮询线程对象

    判断executor是否处于轮询,否则需要通过safeExecute方法处理listeners的侦听,
    safeExecute方法:

    1 private static void safeExecute(EventExecutor executor, Runnable task) {
    2     try {
    3         executor.execute(task);
    4     } catch (Throwable t) {
    5         rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
    6     }
    7 }

    这里保证了listeners的侦听回调是异步执行

    InternalThreadLocalMap在我之前的博客中说过,是Netty使用的ThreadLocal (Netty中FastThreadLocal源码分析

    去线程本地变量中找futureListenerStackDepth(默认为0),判断stackDepth是否小于MAX_LISTENER_STACK_DEPTH,否则也要通过safeExecute方法处理listeners的侦听
    核心都是调用notifyListenersNow方法:

     1 private void notifyListenersNow() {
     2     Object listeners;
     3     synchronized (this) {
     4         // Only proceed if there are listeners to notify and we are not already notifying listeners.
     5         if (notifyingListeners || this.listeners == null) {
     6             return;
     7         }
     8         notifyingListeners = true;
     9         listeners = this.listeners;
    10         this.listeners = null;
    11     }
    12     for (;;) {
    13         if (listeners instanceof DefaultFutureListeners) {
    14             notifyListeners0((DefaultFutureListeners) listeners);
    15         } else {
    16             notifyListener0(this, (GenericFutureListener<?>) listeners);
    17         }
    18         synchronized (this) {
    19             if (this.listeners == null) {
    20                 // Nothing can throw from within this method, so setting notifyingListeners back to false does not
    21                 // need to be in a finally block.
    22                 notifyingListeners = false;
    23                 return;
    24             }
    25             listeners = this.listeners;
    26             this.listeners = null;
    27         }
    28     }
    29 }

    先检查是否需要监听,满足条件后,判断listeners是否是DefaultFutureListeners,即包装后的数组
    notifyListeners0方法:

    1 private void notifyListeners0(DefaultFutureListeners listeners) {
    2    GenericFutureListener<?>[] a = listeners.listeners();
    3    int size = listeners.size();
    4    for (int i = 0; i < size; i ++) {
    5        notifyListener0(this, a[i]);
    6    }
    7 }

    遍历这个数组,实则调用notifyListener0方法:

    1 private static void notifyListener0(Future future, GenericFutureListener l) {
    2     try {
    3         l.operationComplete(future);
    4     } catch (Throwable t) {
    5         if (logger.isWarnEnabled()) {
    6             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    7         }
    8     }
    9 }

    这里就可以看到,完成了对operationComplete的回调,处理future

    setSuccess结束,再来看trySuccess方法:

    1 @Override
    2 public boolean trySuccess(V result) {
    3     if (setSuccess0(result)) {
    4         notifyListeners();
    5         return true;
    6     }
    7     return false;
    8 }

    对比setSuccess来看,只有返回值不一样

    setFailure方法:

     1 @Override
     2 public Promise<V> setFailure(Throwable cause) {
     3     if (setFailure0(cause)) {
     4         notifyListeners();
     5         return this;
     6     }
     7     throw new IllegalStateException("complete already: " + this, cause);
     8 }
     9 
    10 private boolean setFailure0(Throwable cause) {
    11     return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    12 }
    13 
    14 private boolean setValue0(Object objResult) {
    15     if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
    16         RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    17         checkNotifyWaiters();
    18         return true;
    19     }
    20     return false;
    21 }

    和setSuccess逻辑一样,只不过CAS操作将状态变为了CauseHolder对象,成功后唤醒listeners对异步操作的回调

    tryFailure方法:

    1 @Override
    2 public boolean tryFailure(Throwable cause) {
    3     if (setFailure0(cause)) {
    4         notifyListeners();
    5         return true;
    6     }
    7     return false;
    8 }

    也都是一个逻辑

    还有一个setUncancellable方法:

    1 @Override
    2 public boolean setUncancellable() {
    3     if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
    4         return true;
    5     }
    6     Object result = this.result;
    7     return !isDone0(result) || !isCancelled0(result);
    8 }

    若是result状态为null,异步操作尚未结束,直接通过CAS操作将状态变为UNCANCELLABLE
    否则若是根据状态来判断


    下来看到cancel方法:

     1 /**
     2  * {@inheritDoc}
     3  *
     4  * @param mayInterruptIfRunning this value has no effect in this implementation.
     5  */
     6 @Override
     7 public boolean cancel(boolean mayInterruptIfRunning) {
     8     if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
     9         checkNotifyWaiters();
    10         notifyListeners();
    11         return true;
    12     }
    13     return false;
    14 }

    mayInterruptIfRunning正如注释中所说,在这里没有什么作用
    还是通过CAS操作,将状态变为CANCELLATION_CAUSE_HOLDER,调用checkNotifyWaiters唤醒因sync阻塞的线程,notifyListeners方法回调listeners的侦听


    最后看到sync方法:

    1 @Override
    2 public Promise<V> sync() throws InterruptedException {
    3     await();
    4     rethrowIfFailed();
    5     return this;
    6 }

    先调用await方法:

     1 @Override
     2 public Promise<V> await() throws InterruptedException {
     3     if (isDone()) {
     4         return this;
     5     }
     6 
     7     if (Thread.interrupted()) {
     8         throw new InterruptedException(toString());
     9     }
    10 
    11     checkDeadLock();
    12 
    13     synchronized (this) {
    14         while (!isDone()) {
    15             incWaiters();
    16             try {
    17                 wait();
    18             } finally {
    19                 decWaiters();
    20             }
    21         }
    22     }
    23     return this;
    24 }

    先判断能否执行(异步操作尚未结束,当前线程没有被中断),然后调用checkDeadLock方法:

    1 protected void checkDeadLock() {
    2     EventExecutor e = executor();
    3     if (e != null && e.inEventLoop()) {
    4         throw new BlockingOperationException(toString());
    5     }
    6 }

    检查轮询线程是否在工作

    在synchronized块中以自身为锁,自旋等待异步操作的完成,若是没完成,调用incWaiters方法:

    1 private void incWaiters() {
    2     if (waiters == Short.MAX_VALUE) {
    3         throw new IllegalStateException("too many waiters: " + this);
    4     }
    5     ++waiters;
    6 }

    在小于Short.MAX_VALUE的情况下,对waiters自增,
    然后使用wait将自身阻塞,等待被唤醒
    所以在之前setValue0时,checkNotifyWaiters操作会notifyAll,
    由此可以知道sync方法的作用:在某一线程中调用sync方法会使得当前线程被阻塞,只有当异步操作执完毕,通过上面的set方法改变状态后,才会调用checkNotifyWaiters方法唤醒当前线程。

    当从阻塞中被唤醒后调用decWaiters方法:

    1 private void decWaiters() {
    2     --waiters;
    3 }

    使得waiters自减
    通过这样一种自旋方式,一直等到isDone成立,结束自旋,进而结束await方法,然后调用rethrowIfFailed方法:

    1 private void rethrowIfFailed() {
    2     Throwable cause = cause();
    3     if (cause == null) {
    4         return;
    5     }
    6 
    7     PlatformDependent.throwException(cause);
    8 }

    根据异步操作是否有异常,进而使用PlatformDependent抛出异常。


    至此Netty中的ChannelFuture和ChannelPromise分析到此全部结束。

  • 相关阅读:
    农夫安全第二季课程-3.vmware ESXIv2
    六、表达式:前缀&&后缀
    五、数据类型(1):整数&&带小数点的数
    四、变量和常量
    三、简单的输入输出
    二、第一个C程序:Hello World!
    一、环境的安装Dev-C++
    .Net基础之5——复杂数据类型
    .Net基础之4——流程控制
    .Net基础之3——运算符
  • 原文地址:https://www.cnblogs.com/a526583280/p/10965537.html
Copyright © 2011-2022 走看看