zoukankan      html  css  js  c++  java
  • Netty系列-netty的Future 和 Promise

    首先我们来看看future和promise接口整体设计

    最顶层的future是jdk的,第二个是netty自定义的future,两个同名,继承关系

    看看jdk的future接口

    public interface Future<V> {
        // 取消任务
        boolean cancel(boolean mayInterruptIfRunning);
        // 任务是否取消
        boolean isCancelled();
        // 任务是否完成
        boolean isDone();
        // 阻塞的获取执行的结果
        V get() throws InterruptedException, ExecutionException;
        // 在一定时间内-超时-阻塞的获取执行的结果
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    瞅瞅netty的future接口

    public interface Future<V> extends java.util.concurrent.Future<V> {
        // 是否成功
        boolean isSuccess();
        // 是否取消
        boolean isCancellable();
        Throwable cause();
         // 添加listener进行回调
        Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    
        // 阻塞的等待任务执行,如果失败则抛出失败原因的异常
        Future<V> sync() throws InterruptedException;
        // 不响应中断等待异常
        Future<V> syncUninterruptibly();
    
        // 阻塞等待任务执行,失败不抛异常
        Future<V> await() throws InterruptedException;
        Future<V> awaitUninterruptibly();
        boolean await(long timeout, TimeUnit unit) throws InterruptedException;
        boolean await(long timeoutMillis) throws InterruptedException;
        boolean awaitUninterruptibly(long timeout, TimeUnit unit);
        boolean awaitUninterruptibly(long timeoutMillis);
    
        // 马上获取到任务的结果,不阻塞,而jdk的future是阻塞的
        V getNow();
    
    
        // 取消任务执行,如果取消成功,任务会因为 CancellationException 异常而导致失败
        //      也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。
        // mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能停止该任务的执行),
        //       似乎 Netty 中 Future 接口的各个实现类,都没有使用这个参数
        @Override
        boolean cancel(boolean mayInterruptIfRunning);
    }

    netty的future在jdk的基础上扩展了它需要的方法,sync和await的区别我们放到下面看实现类的时候说

    同时我们也可以看到,这个future接口跟io操作是无关的

    接下来我们看看ChannelFuture接口,接口注释上写的很清楚,我们来看看

    * The result of an asynchronous {@link Channel} I/O operation.
    * <p>
    * All I/O operations in Netty are asynchronous. It means any I/O calls will
    * return immediately with no guarantee that the requested I/O operation has
    * been completed at the end of the call. Instead, you will be returned with
    * a {@link ChannelFuture} instance which gives you the information about the
    * result or status of the I/O operation.
    * <p>
    * A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
    * When an I/O operation begins, a new future object is created. The new future
    * is uncompleted initially - it is neither succeeded, failed, nor cancelled
    * because the I/O operation is not finished yet. If the I/O operation is
    * finished either successfully, with failure, or by cancellation, the future is
    * marked as completed with more specific information, such as the cause of the
    * failure. Please note that even failure and cancellation belong to the
    * completed state.
    所有io操作都是异步的,一个io操作的调用会立即返回一个带有结果或者状态的io实例。
    io操作要么是未完成的,要么是完成的。当它开始时,future会被创建,一开始是未完成的,未完成的时候没有成功、失败或者取消状态
    当它是完成的时候,可以是失败或者取消的,失败或者取消原因会被附加到future上。
    * <pre> * +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+ * </pre>

    上面那个状态迁移图很清楚了,在两种过程的时候会有什么状态,我们看看接口

    public interface ChannelFuture extends Future<Void> {
        // 返回future关联的channel
        Channel channel();
        // 重写下面几个方法,修改返回值为channelfuture
        @Override
        ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
        @Override
        ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
        @Override
        ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
        @Override
        ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
        @Override
        ChannelFuture sync() throws InterruptedException;
        @Override
        ChannelFuture syncUninterruptibly();
        @Override
        ChannelFuture await() throws InterruptedException;
        @Override
        ChannelFuture awaitUninterruptibly();
    
        /**
         * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
         * following methods:
         * <ul>
         *     <li>{@link #addListener(GenericFutureListener)}</li>
         *     <li>{@link #addListeners(GenericFutureListener[])}</li>
         *     <li>{@link #await()}</li>
         *     <li>{@link #await(long, TimeUnit)} ()}</li>
         *     <li>{@link #await(long)} ()}</li>
         *     <li>{@link #awaitUninterruptibly()}</li>
         *     <li>{@link #sync()}</li>
         *     <li>{@link #syncUninterruptibly()}</li>
         * </ul>
           标记该future是void的,使不能使用上面的方法
         */
        boolean isVoid();
    }

     netty其实是强烈建议直接通过添加监听器的方式来获取io操作结果,或者进行后续操作的,ChannelFuture可以增加或者删除一个多个 GenericFutureListener,它定义如下

    public interface GenericFutureListener<F extends Future<?>> extends EventListener {
        void operationComplete(F future) throws Exception;
    }

    执行完后会回调 operationComplete方法

    注意一点,不要在ChannelHandler中调用ChannelFuture的await方法,会导致死锁。这是因为发起io操作后,由io线程负责异步通知发起io操作的用户线程,如果io线程和用户线程是同一个的话,就会导致io线程等待自己通知操作完成,这就会导致死锁,自己挂死自己。

    我们继续看promise接口,

    public interface Promise<V> extends Future<V> {
    
        // 标记该future成功及设置结果,并通知所有listener
        // 如果失败的话抛异常
        Promise<V> setSuccess(V result);
    
         // 和setsuccess一样,只是失败的话返回false
        boolean trySuccess(V result);
    
         // 标记future失败,然后通知listener
        Promise<V> setFailure(Throwable cause);
        boolean tryFailure(Throwable cause);
    
        // 标记该future 不可被取消
        boolean setUncancellable();
    
        // 下面跟ChannelFuture一样,都是覆盖重写方法
        @Override
        Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        @Override
        Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        @Override
        Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
        @Override
        Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        @Override
        Promise<V> await() throws InterruptedException;
        @Override
        Promise<V> awaitUninterruptibly();
        @Override
        Promise<V> sync() throws InterruptedException;
        @Override
        Promise<V> syncUninterruptibly();
    }

    Promise是可写的future,Future本身并没有写操作相关的接口,netty通过Promise对其进行扩展,用于设置io操作的结果。Promise 实例内部是一个任务,任务的执行往往是异步的,通常是一个线程池来处理任务。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 将来会被某个执行任务的线程在执行完成以后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数(当然,回调的具体内容不一定要由执行任务的线程自己来执行,它可以创建新的线程来执行,也可以将回调任务提交到某个线程池来执行)。而且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。

    接下来我们看看ChannelPromise

    public interface ChannelPromise extends ChannelFuture, Promise<Void> {
        @Override
        Channel channel();
    @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess();
    boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause);
    @Override ChannelPromise addListener(GenericFutureListener
    <? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override ChannelPromise sync()
    throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. */ ChannelPromise unvoid(); }

    看方法其实很清楚,基本都是覆写综合了ChannelFuture和Promise接口的,就返回值变了,看看我们一开始的类继承图,ChannelPromise 接口同时继承了 ChannelFuture 和 Promise,最终继承的都是Future接口,接下来我们看看具体的实现类DefaultPromise吧

    public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    
        // 为了后面操作成功后通过cas来保存结果到result字段
        @SuppressWarnings("rawtypes")
        private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
                AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
        // result为null的时候默认值
        private static final Object SUCCESS = new Object();
        // 操作成功后cas比对的值
        private static final Object UNCANCELLABLE = new Object();
       
        // 保存执行的结果
        private volatile Object result;
        // 线程执行器
        private final EventExecutor executor;
        // 监听者
        private Object listeners;
         /**
         * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
         */
        // 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量)
        private short waiters;
    
        /**
         * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
         * executor changes.
         */
         // 是否唤醒正在等待线程,用于防止重复执行唤醒,不然会重复执行 listeners 的回调方法
        private boolean notifyingListeners;
    
        ....
     }

    属性看完了,我们可以看看它主要的方法

        @Override
        public Promise<V> setSuccess(V result) {
            if (setSuccess0(result)) {
                notifyListeners();
                return this;
            }
            throw new IllegalStateException("complete already: " + this);
        }
    
        @Override
        public boolean trySuccess(V result) {
            if (setSuccess0(result)) {
                notifyListeners();
                return true;
            }
            return false;
        }
    
        @Override
        public Promise<V> setFailure(Throwable cause) {
            if (setFailure0(cause)) {
                notifyListeners();
                return this;
            }
            throw new IllegalStateException("complete already: " + this, cause);
        }
    
        @Override
        public boolean tryFailure(Throwable cause) {
            if (setFailure0(cause)) {
                notifyListeners();
                return true;
            }
            return false;
        }

    set和try的区别就是返回值不一样而已,我们看看底层的方法 setSuccess0

        private boolean setSuccess0(V result) {
            return setValue0(result == null ? SUCCESS : result);
        }
    
        private boolean setValue0(Object objResult) {
            if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
                RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
                checkNotifyWaiters();
                return true;
            }
            return false;
        }

    就是通过cas来把objResult保存到result属性上,然后Notify其他线程。其他方法都差不多,可以比对看看

    我们再看个await方法

    public Promise<V> await() throws InterruptedException {
            if (isDone()) {
                return this;
            }
    
            if (Thread.interrupted()) {
                throw new InterruptedException(toString());
            }
    
            checkDeadLock();
    
            synchronized (this) {
                while (!isDone()) {
                    incWaiters();
                    try {
                        wait();
                    } finally {
                        decWaiters();
                    }
                }
            }
            return this;
        }

    如果当前Promise已被设置,则返回;如果碰到线程中断则响应中断;检查死锁,由于在IO线程中调用Promise的await方法或者sync方法会导致死锁,前面说过的,所以需要检验保护,判定当前线程是否是io线程;同步锁定当前Promise对象,循环判定是否设置完成,使用循环是避免伪唤醒,防止线程 被意外唤醒导致功能异常。

    接下来我们顺便也看下sync方法

    @Override
        public Promise<V> sync() throws InterruptedException {
            await();
            rethrowIfFailed();
            return this;
        }

    首先调用await方法,然后看是否需要抛出异常,如果任务失败的话就重新抛出异常,这也是两方法区别了。

    DefaultChannelPromise实现我们就不看了,基本都是基于DefaultPromise的,只是返回值都是 ChannelPromise而已。

    下面我们来写个例子吧

    public class ChannelPromiseExample extends Thread{
    
        private static final Object object = new Object();
    
        public static void main(String[] args) {
            final DefaultEventExecutor executor = new DefaultEventExecutor();
            final Promise<Integer> promise = executor.newPromise();
    
            // 任务seccess或者failure来回调operationComplete 方法
            promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
                @Override
                public void operationComplete(Future<? super Integer> future)
                        throws Exception {
                    System.out.println(Thread.currentThread().getName() + " 第一个监听器");
                    if (future.isSuccess()) {
                        System.out.println("任务成功,result:" + future.get());
                    } else {
                        System.out.println("任务失败,result:" + future.cause());
                    }
                }
            }).addListener(new GenericFutureListener<Future<? super Integer>>() {
                @Override
                public void operationComplete(Future<? super Integer> future)
                        throws Exception {
                    System.out.println(Thread.currentThread().getName() + " 第二个监听器");
                }
            });
            // 提交任务
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //可以设置成功或者失败
                    //promise.setSuccess(1);
                    promise.setFailure(new Throwable("FAILURE"));
                }
            });
    
    
            try {
                System.out.println("promise wait begin");
                //promise.sync();
                promise.await();
                System.out.println("promise wait end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                executor.shutdown();
            }
        }
    }

    可以体会下await和sync的区别

  • 相关阅读:
    轻松自动化---selenium-webdriver(python) (八)
    Ubuntu 18.04 LTS 启用 WakeOnLAN
    lower_bound 和 upper_bound
    [LeetCode 201.] Bitwise AND of Numbers Range
    [LeetCode 162.] Find Peak Element
    [LeetCode 33. 81. 153. 154.] 旋转数组中的二分查找
    C++ unordered_map 的一个疑问
    [LintCode 386.] 最多有k个不同字符的最长子字符串
    [LintCode 550.] 最常使用的K个单词II
    [LintCode 1029.] 寻找最便宜的航行旅途(最多经过k个中转站)
  • 原文地址:https://www.cnblogs.com/myos/p/13199104.html
Copyright © 2011-2022 走看看