zoukankan      html  css  js  c++  java
  • Netty之Promise-netty学习笔记(13)-20210823

    Promise是可写的future,从future的分析中能够发如今其中没有写操做的接口,netty特地使promise扩展了future,能够对异步操做结果进行设置。

    (一)defaultpromise

    包含的字段:

    //原子保存异步操做结果
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
    AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    //异步操做结果
    private volatile Object result;
    private final EventExecutor executor;

    操做结果通知

    为了可以支持异步地获取操做结果,netty中用通知的方式来对后续的listener中的操做,操做结果等进行控制。通知的前提包括success,fail,cancel三种状态。数组

    1. success状态:setsuccess()方法

    • 第一步:异步操做结束调用setSuccess(V result)或trySuccess(V result)

    方法,将操做结果当作参数传入,来通知能够对结果进行使用。

    public Promise<V> setSuccess(V result) {
       if (setSuccess0(result)) {
       //触发listener中的operationcomplete()方法
           notifyListeners();
           return this;
       }
       throw new IllegalStateException("complete already: " + this);
      }
    • 第二步:首先调用setsuccess0()方法对result变量进行保存,若是保存成功则经过notifyListeners()触发listener中的operationcomplete()方法。
    #setsuccess0()方法:
      private boolean setSuccess0(V result) {
       return setValue0(result == null ? SUCCESS : result);
      }
    
      #setvalue0()方法:
      private boolean setValue0(Object objResult) {
       if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
           RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
           checkNotifyWaiters();
           return true;
       }
       return false;
       }

    注意:在success状态下保存结果时,若是result(异步操做结果)为null,则将promise内部的result设置为常量SUCCESS。再者,在promise的result中,只容许保存一次,因此netty采用cas保证结果只保存一遍,若结果保存出错返回false。

    #SUCCESS常量
      private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
    • 第三步:保存完结果后,通知全部同步等待异步操做结果的线程。
    private synchronized void checkNotifyWaiters() {
       if (waiters > 0) {
           notifyAll();
       }
      }

    2. success状态:trysuccess()方法源码分析

    trysuccess()方法与setsuccess()方法大同小异,只不过在保存结果出错的时候,返回false,而setsuccess()抛出一个异常信息。

    public boolean trySuccess(V result) {
       if (setSuccess0(result)) {
           notifyListeners();
           return true;
       }
       return false;
    }

    3.fail状态:线程

    fail状态下通知机制和success几乎相同,区别在于保存异步操做结果的时候,fail状态保存的是使用CauseHolder进行封装的异常信息对象。

    private boolean setFailure0(Throwable cause) {
       return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }

    4.cancel状态:
    cancel状态,表示异步操做的时候,对promise对象进行了cancel操做。

    public boolean cancel(boolean mayInterruptIfRunning) {
       if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
           checkNotifyWaiters();
           notifyListeners();
           return true;
       }
       return false;
    }

    一样的,cancel后也和success和fail同样,对result进行了设置。在success的时候,容许初始值为null和UNCANCELLABLE常量(表示不容许cancel),在cancel状态只容许为null。

    # CANCELLATION_CAUSE_HOLDER
      //封装了CancellationException异常。
     private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
    new CancellationException(), DefaultPromise.class, "cancel(...)"));

    5.操做结果通知总结:

    首先,经过调用setsuccess()等方法,启动通知机制;

    而后,将异步操做结果进行保存,仅容许保存一次,不然会返回false.

    保存好信息后,触发listener中的操做,还会通知全部同步等待异步操做结果的线程。

    添加监听者

    接下来分析promise如何添加监听者。

    (一). 首先来看一下用来保存监听者对象的字段。

    /**
     * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
     * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
      *
      * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
       */
        private Object listeners;

    从注释能够总结出那么几个信息:

    1.这个object类型的listeners字段,能够是GenericFutureListener类型,也能够是link DefaultFutureListeners(用来保存GenericFutureListener的数组)。
     ps:这样设计的好处,多数状况下,listener只有一个,用集合或者数组会形成浪费,只有真正须要多个监听者的时候,才使用数组
     2.若是listeners为null,表示还未添加监听者或者已经触发过了(一旦触发就会将listeners清空)
     3.能够在外部添加监听者,因此使用加锁的形式(synchronized(this))添加监听者。

    (二). 添加监听器的过程

    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");
    
        synchronized (this) {
            addListener0(listener);
        }
    
        if (isDone()) {
            notifyListeners();
        }
    
        return this;
    }

    用加锁的方式添加监听器,添加完成后,若是promise的状态为isdone,就会当即触发Listener。接下来看看addlistener0()是如何添加的。

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
        }
    }

    listeners字段只多是GenericFutureListener类型,或者DefaultFutureListeners类型。因此若是为Null,直接保存;若是已是DefaultFutureListeners(数组形式),就让其再添加一个listener;若是是GenericFutureListener类型,就建立一个数组。

    (三)DefaultFutureListeners类分析:

    首先看看它的构造方法:

    DefaultFutureListeners(
            GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    能够看出,从构造之初,他就是size为2的数组。

    再来看看它是如何在后续过程当中添加元素的:

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        listeners[size] = l;
        this.size = size + 1;
    
        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
      }

    若是数组容量未满,就继续添加元素;若是数组容量已满,就将容量翻倍,将原数组内容复制拷贝到新数组中。

    (四)添加监听者总结:

    1.添加完监听者,就会尝试去触发listener中的操做。

    2.promise内部用来保存监听者的listeners只会是两种类型,GenericFutureListener类型和link DefaultFutureListeners。

    触发监听者

    在前面的setsuccess()和addlistener()等方法中均可以看到notifylisteners()方法,这就是触发监听者的起点。

    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
               ……
                    notifyListenersNow();
               ……
        }
    
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
    
    private static void safeExecute(EventExecutor executor, Runnable task) {
            executor.execute(task);
        ……
    }

    在notifylisteners()方法中,能够看到,listener中触发的异步操做要求是在线程组中执行的,若是是在线程组外部提交的任务,会将任务封装成runnable提交到任务队列中等待执行。

    接下来看看notifynow()方法中作了什么。

    private void notifyListenersNow() {
        Object listeners;
        synchronized (this) {
            // Only proceed if there are listeners to notify and we are not already notifying listeners.
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                    // need to be in a finally block.
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }

    在该方法中,首先将Listeners取出来,而后将其清空(每次触发完listeners都会将原来的listeners清空),而后执行listener中具体的操做,执行完操做,会再次检查是否又有listeners添加进来,确保无误后,从方法中退出。

    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }

    触发监听者总结:

    1.触发的listeners中具体的操做是在线程池中进行

    2.触发完毕的listeners会将其清空。

    同步等待

    netty还提供了接口能够同步等待异步操做结果,使用到的是await()和sync()方法。

    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }
    
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
    
        checkDeadLock();
    
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
     }

    原理很简单,就是让线程在promise对象上等待通知。若是是isdone状态,就直接返回。

    sync()方法是在await()方法的基础上添加了额外的功能,区别只是sync()调用,若是异步操做失败,则会抛出异常。

    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }

    (二)defaultchannelpromise

    ChannelPromise 接口在 Netty 中使用得比较多,它综合了 ChannelFuture 和 Promise 中的方法,只不过经过覆写将返回值都变为 ChannelPromise 了而已,没有增长什么新的功能。

    从defaultpromise的分析能够得知,listener中的操做是由线程池来执行。但注意到defaultpromise的其中一个权限为protected的构造方法不须要传入eventexecutor,这可能致使出现nullpoint异常。

    因此出现了另外一个扩展类,defaultchannelpromise。

    public DefaultChannelPromise(Channel channel) {
        this.channel = checkNotNull(channel, "channel");
    }
    
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = checkNotNull(channel, "channel");
    }

    defaultchannelpromise类有两个构造方法,一个为父类传入eventexecutor,一个调用的是上面提到的父类中protected的构造方法。

    protected EventExecutor executor() {
        EventExecutor e = super.executor();
        if (e == null) {
            return channel().eventLoop();
        } else {
            return e;
        }
    }

    能够看到,当eventexecutor为Null时,保存的是channel中的eventexecutor。

     参考:

    https://segmentfault.com/a/1190000013016083

    https://segmentfault.com/a/1190000022519340

  • 相关阅读:
    d3js 获取元素以及设置属性
    javascript 转义函数
    es6 对象浅拷贝的2种方法
    SwitchyOmega 代理设置
    table 设置边框
    Highcharts 配置选项详细说明
    windows环境下生成ssh keys
    vue 给组件绑定原生事件
    Vue 字面量语法 vs 动态语法
    Vue 2.0 v-for 响应式key, index及item.id参数对v-bind:key值造成差异研究
  • 原文地址:https://www.cnblogs.com/sfnz/p/15175980.html
Copyright © 2011-2022 走看看