zoukankan      html  css  js  c++  java
  • Netty中NioEventLoopGroup的创建源码分析

    NioEventLoopGroup的无参构造:

    1 public NioEventLoopGroup() {
    2     this(0);
    3 }

    调用了单参的构造:

    1 public NioEventLoopGroup(int nThreads) {
    2     this(nThreads, (Executor)null);
    3 }

    继续看到双参构造:

    1 public NioEventLoopGroup(int nThreads, Executor executor) {
    2     this(nThreads, executor, SelectorProvider.provider());
    3 }

    在这里是使用JDK中NIO的原生API:SelectorProvider的provider,产生了一个SelectorProvider对象调用,继续调用三参构造。
    关于SelectorProvider在我前面的博客中有介绍过:【Java】NIO中Selector的创建源码分析,在Windows下默认创建了WindowsSelectorProvider对象。

    继续看三参构造:

    1 public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    2     this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    3 }

    在这里创建了一个单例的DefaultSelectStrategyFactory 对象:

     1 public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
     2     public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
     3 
     4     private DefaultSelectStrategyFactory() {
     5     }
     6 
     7     public SelectStrategy newSelectStrategy() {
     8         return DefaultSelectStrategy.INSTANCE;
     9     }
    10 }

    DefaultSelectStrategyFactory实现的是SelectStrategyFactory 接口:

    1 public interface SelectStrategyFactory {
    2     SelectStrategy newSelectStrategy();
    3 }

    该接口提供一个用来产生Select策略的方法,SelectStrategy接口如下:

    1 public interface SelectStrategy {
    2     int SELECT = -1;
    3     int CONTINUE = -2;
    4 
    5     int calculateStrategy(IntSupplier var1, boolean var2) throws Exception;
    6 }

    根据IntSupplier 和一个boolean值为Select策略提供了一个计算策略的方法。
    在Netty中只提供了DefaultSelectStrategy这一种默认实现:

     1 final class DefaultSelectStrategy implements SelectStrategy {
     2     static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
     3 
     4     private DefaultSelectStrategy() {
     5     }
     6 
     7     public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
     8         return hasTasks ? selectSupplier.get() : -1;
     9     }
    10 }


    其中IntSupplier :

    1 public interface IntSupplier {
    2     int get() throws Exception;
    3 }

    结合上面的来看,最终的选择策略主要是根据IntSupplier的get值来得到的。

    再回到构造:

    1 public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
    2     super(nThreads, threadFactory, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
    3 }

    这里产生了一个拒绝策略:

     1 public static RejectedExecutionHandler reject() {
     2     return REJECT;
     3 }
     4 
     5 private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
     6     public void rejected(Runnable task, SingleThreadEventExecutor executor) {
     7         throw new RejectedExecutionException();
     8     }
     9 };
    10 
    11 public interface RejectedExecutionHandler {
    12     void rejected(Runnable var1, SingleThreadEventExecutor var2);
    13 }

    将selectorProvider、selectStrategyFactory以及这个拒绝策略封装在一个Object数组里,再调用了父类MultithreadEventLoopGroup的构造:

    1 protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    2     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    3 }

    在这里对nThreads的大小进行了调整:

    1 private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    SystemPropertyUtil.getInt是根据key值"io.netty.eventLoopThreads",获取系统配置值,在没用设置时使用NettyRuntime.availableProcessors() * 2的值
    NettyRuntime的availableProcessors实现如下:

    1 synchronized int availableProcessors() {
    2     if (this.availableProcessors == 0) {
    3         int availableProcessors = SystemPropertyUtil.getInt("io.netty.availableProcessors", Runtime.getRuntime().availableProcessors());
    4         this.setAvailableProcessors(availableProcessors);
    5     }
    6 
    7     return this.availableProcessors;
    8 }

    还是一样,根据key值"io.netty.availableProcessors",获取系统配置值,在没用设置时使用Runtime.getRuntime().availableProcessors(),是用来获取处理器的个数。

    这样保证了在默认情况下nThreads的大小是总是cpu个数的2倍。

    继续回到构造,MultithreadEventLoopGroup继续调用父类MultithreadEventExecutorGroup的构造:

    1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    2     this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    3 }

    在这里又初始化了一个单例的DefaultEventExecutorChooserFactory对象:

    1 public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    DefaultEventExecutorChooserFactory 实现的是EventExecutorChooserFactory接口:

    1 public interface EventExecutorChooserFactory {
    2     EventExecutorChooserFactory.EventExecutorChooser newChooser(EventExecutor[] var1);
    3 
    4     public interface EventExecutorChooser {
    5         EventExecutor next();
    6     }
    7 }

    DefaultEventExecutorChooserFactory 的具体实现:

    1 public EventExecutorChooser newChooser(EventExecutor[] executors) {
    2     return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors));
    3 }
    4 
    5 private static boolean isPowerOfTwo(int val) {
    6     return (val & -val) == val;
    7 }

    isPowerOfTwo是用来检查executors的大小是否是二的整数次方,若是二的整数次方,产生PowerOfTwoEventExecutorChooser,反之产生GenericEventExecutorChooser:

     1 private static final class GenericEventExecutorChooser implements EventExecutorChooser {
     2     private final AtomicInteger idx = new AtomicInteger();
     3     private final EventExecutor[] executors;
     4 
     5     GenericEventExecutorChooser(EventExecutor[] executors) {
     6         this.executors = executors;
     7     }
     8 
     9     public EventExecutor next() {
    10         return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
    11     }
    12 }
    13 
    14 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    15     private final AtomicInteger idx = new AtomicInteger();
    16     private final EventExecutor[] executors;
    17 
    18     PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
    19         this.executors = executors;
    20     }
    21 
    22     public EventExecutor next() {
    23         return this.executors[this.idx.getAndIncrement() & this.executors.length - 1];
    24     }
    25 }

    这两种其实都是用了取模运算,只不过因为二的整数次方的特殊性而使用位运算。

    回到构造,MultithreadEventExecutorGroup继续调用本省的构造:

     1 private final EventExecutor[] children;
     2 private final Set<EventExecutor> readonlyChildren;
     3 private final AtomicInteger terminatedChildren;
     4 private final Promise<?> terminationFuture;
     5 private final EventExecutorChooser chooser;
     6 
     7 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
     8     this.terminatedChildren = new AtomicInteger();
     9     this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    10     if (nThreads <= 0) {
    11         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    12     } else {
    13         if (executor == null) {
    14             executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
    15         }
    16 
    17         this.children = new EventExecutor[nThreads];
    18 
    19         int j;
    20         for(int i = 0; i < nThreads; ++i) {
    21             boolean success = false;
    22             boolean var18 = false;
    23 
    24             try {
    25                 var18 = true;
    26                 this.children[i] = this.newChild((Executor)executor, args);
    27                 success = true;
    28                 var18 = false;
    29             } catch (Exception var19) {
    30                 throw new IllegalStateException("failed to create a child event loop", var19);
    31             } finally {
    32                 if (var18) {
    33                     if (!success) {
    34                         int j;
    35                         for(j = 0; j < i; ++j) {
    36                             this.children[j].shutdownGracefully();
    37                         }
    38 
    39                         for(j = 0; j < i; ++j) {
    40                             EventExecutor e = this.children[j];
    41 
    42                             try {
    43                                 while(!e.isTerminated()) {
    44                                     e.awaitTermination(2147483647L, TimeUnit.SECONDS);
    45                                 }
    46                             } catch (InterruptedException var20) {
    47                                 Thread.currentThread().interrupt();
    48                                 break;
    49                             }
    50                         }
    51                     }
    52 
    53                 }
    54             }
    55 
    56             if (!success) {
    57                 for(j = 0; j < i; ++j) {
    58                     this.children[j].shutdownGracefully();
    59                 }
    60 
    61                 for(j = 0; j < i; ++j) {
    62                     EventExecutor e = this.children[j];
    63 
    64                     try {
    65                         while(!e.isTerminated()) {
    66                             e.awaitTermination(2147483647L, TimeUnit.SECONDS);
    67                         }
    68                     } catch (InterruptedException var22) {
    69                         Thread.currentThread().interrupt();
    70                         break;
    71                     }
    72                 }
    73             }
    74         }
    75 
    76         this.chooser = chooserFactory.newChooser(this.children);
    77         FutureListener<Object> terminationListener = new FutureListener<Object>() {
    78             public void operationComplete(Future<Object> future) throws Exception {
    79                 if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
    80                     MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
    81                 }
    82 
    83             }
    84         };
    85         EventExecutor[] var24 = this.children;
    86         j = var24.length;
    87 
    88         for(int var26 = 0; var26 < j; ++var26) {
    89             EventExecutor e = var24[var26];
    90             e.terminationFuture().addListener(terminationListener);
    91         }
    92 
    93         Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
    94         Collections.addAll(childrenSet, this.children);
    95         this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
    96     }
    97 }

    首先是对terminatedChildren的初始化,没什么好说的,对terminationFuture的初始化使用DefaultPromise,用来异步处理终止事件。executor初始化产生一个线程池。

    接下来就是对children的操作,根据nThreads的大小,产生一个EventExecutor数组,然后遍历这个数组,调用newChild给每一个元素初始化。

    newChild是在NioEventLoopGroup中实现的:

    1 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    2     return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
    3 }

    在这里直接使用executor,和之前放在args数组中的SelectorProvider、SelectStrategyFactory(newSelectStrategy方法产生DefaultSelectStrategy)和RejectedExecutionHandler产生了一个NioEventLoop对象:

     1 private Selector selector;
     2 private Selector unwrappedSelector;
     3 private SelectedSelectionKeySet selectedKeys;
     4 private final SelectorProvider provider;
     5 private final AtomicBoolean wakenUp = new AtomicBoolean();
     6 private final SelectStrategy selectStrategy;
     7 
     8 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
     9    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    10     if (selectorProvider == null) {
    11         throw new NullPointerException("selectorProvider");
    12     } else if (strategy == null) {
    13         throw new NullPointerException("selectStrategy");
    14     } else {
    15         this.provider = selectorProvider;
    16         NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
    17         this.selector = selectorTuple.selector;
    18         this.unwrappedSelector = selectorTuple.unwrappedSelector;
    19         this.selectStrategy = strategy;
    20     }
    21 }

    NioEventLoop首先在继承链上调用父类的构造,都是一些成员的赋值操作,简单看一看:

     1 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
     2     super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
     3     this.tailTasks = this.newTaskQueue(maxPendingTasks);
     4 }
     5 
     6 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
     7     super(parent);
     8     this.threadLock = new Semaphore(0);
     9     this.shutdownHooks = new LinkedHashSet();
    10     this.state = 1;
    11     this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    12     this.addTaskWakesUp = addTaskWakesUp;
    13     this.maxPendingTasks = Math.max(16, maxPendingTasks);
    14     this.executor = (Executor)ObjectUtil.checkNotNull(executor, "executor");
    15     this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
    16     this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    17 }
    18 
    19 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
    20     super(parent);
    21 }
    22 
    23 protected AbstractEventExecutor(EventExecutorGroup parent) {
    24     this.selfCollection = Collections.singleton(this);
    25     this.parent = parent;
    26 }

    在经过这继承链上的一系列调用后,给provider成员赋值selectorProvider,就是之前创建好的WindowsSelectorProvider,然后使用openSelector方法,最终创建JDK原生的Selector:

     1 private NioEventLoop.SelectorTuple openSelector() {
     2     final AbstractSelector unwrappedSelector;
     3     try {
     4         unwrappedSelector = this.provider.openSelector();
     5     } catch (IOException var7) {
     6         throw new ChannelException("failed to open a new selector", var7);
     7     }
     8 
     9     if (DISABLE_KEYSET_OPTIMIZATION) {
    10         return new NioEventLoop.SelectorTuple(unwrappedSelector);
    11     } else {
    12         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    13         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    14             public Object run() {
    15                 try {
    16                     return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
    17                 } catch (Throwable var2) {
    18                     return var2;
    19                 }
    20             }
    21         });
    22         if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
    23             final Class<?> selectorImplClass = (Class)maybeSelectorImplClass;
    24             Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    25                 public Object run() {
    26                     try {
    27                         Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    28                         Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    29                         Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
    30                         if (cause != null) {
    31                             return cause;
    32                         } else {
    33                             cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
    34                             if (cause != null) {
    35                                 return cause;
    36                             } else {
    37                                 selectedKeysField.set(unwrappedSelector, selectedKeySet);
    38                                 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
    39                                 return null;
    40                             }
    41                         }
    42                     } catch (NoSuchFieldException var4) {
    43                         return var4;
    44                     } catch (IllegalAccessException var5) {
    45                         return var5;
    46                     }
    47                 }
    48             });
    49             if (maybeException instanceof Exception) {
    50                 this.selectedKeys = null;
    51                 Exception e = (Exception)maybeException;
    52                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
    53                 return new NioEventLoop.SelectorTuple(unwrappedSelector);
    54             } else {
    55                 this.selectedKeys = selectedKeySet;
    56                 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    57                 return new NioEventLoop.SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    58             }
    59         } else {
    60             if (maybeSelectorImplClass instanceof Throwable) {
    61                 Throwable t = (Throwable)maybeSelectorImplClass;
    62                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
    63             }
    64 
    65             return new NioEventLoop.SelectorTuple(unwrappedSelector);
    66         }
    67     }
    68 }

    可以看到在一开始就使用provider的openSelector方法,即WindowsSelectorProvider的openSelector方法,创建了WindowsSelectorImpl对象(【Java】NIO中Selector的创建源码分析 )

    然后根据DISABLE_KEYSET_OPTIMIZATION判断:

    1 private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

    可以看到这个系统配置在没有设置默认是false,如果设置了则直接创建一个SelectorTuple对象返回:

     1 private static final class SelectorTuple {
     2     final Selector unwrappedSelector;
     3     final Selector selector;
     4 
     5     SelectorTuple(Selector unwrappedSelector) {
     6         this.unwrappedSelector = unwrappedSelector;
     7         this.selector = unwrappedSelector;
     8     }
     9 
    10     SelectorTuple(Selector unwrappedSelector, Selector selector) {
    11         this.unwrappedSelector = unwrappedSelector;
    12         this.selector = selector;
    13     }
    14 }

    可以看到仅仅是将unwrappedSelector和selector封装了,unwrappedSelector对应的是JDK原生Selector没有经过更改的,而selector对应的是经过更改修饰操作的。

    在没有系统配置下,就对Selector进行更改修饰操作:
    首先创建SelectedSelectionKeySet对象,这个SelectedSelectionKeySet继承自AbstractSet:

    1 final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    2     SelectionKey[] keys = new SelectionKey[1024];
    3     int size;
    4     
    5     SelectedSelectionKeySet() {
    6     }
    7     ......
    8 }

    后面是通过反射机制,使得WindowsSelectorImpl的selectedKeys和publicSelectedKeys成员直接赋值为SelectedSelectionKeySet对象。

    WindowsSelectorImpl的这两个成员是在SelectorImpl中定义的:

    1 protected Set<SelectionKey> selectedKeys = new HashSet();
    2 private Set<SelectionKey> publicSelectedKeys;

    从这里就可以明白,在JDK原生的Selector中,selectedKeys和publicSelectedKeys这两个Set的初始化大小都为0,而在这里仅仅就是使其初始化大小变为1024。
    后面就是对一些异常的处理,没什么好说的。

    openSelector结束后,就可以分别对包装过的Selector和未包装过的Selector,即selector和unwrappedSelector成员赋值,再由selectStrategy保存刚才产生的选择策略,用于Selector的轮询。

    回到MultithreadEventExecutorGroup的构造,在调用newChild方法时即NioEventLoop创建的过程中可能出现异常情况,就需要遍历children数组,将之前创建好的NioEventLoop使用shutdownGracefully优雅地关闭掉:
    shutdownGracefully在AbstractEventExecutor中实现:

    1 public Future<?> shutdownGracefully() {
    2     return this.shutdownGracefully(2L, 15L, TimeUnit.SECONDS);
    3 }

    这里设置了超时时间,继续调用SingleThreadEventExecutor的shutdownGracefully方法:

     1 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
     2     if (quietPeriod < 0L) {
     3         throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
     4     } else if (timeout < quietPeriod) {
     5         throw new IllegalArgumentException("timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
     6     } else if (unit == null) {
     7         throw new NullPointerException("unit");
     8     } else if (this.isShuttingDown()) {
     9         return this.terminationFuture();
    10     } else {
    11         boolean inEventLoop = this.inEventLoop();
    12 
    13         while(!this.isShuttingDown()) {
    14             boolean wakeup = true;
    15             int oldState = this.state;
    16             int newState;
    17             if (inEventLoop) {
    18                 newState = 3;
    19             } else {
    20                 switch(oldState) {
    21                 case 1:
    22                 case 2:
    23                     newState = 3;
    24                     break;
    25                 default:
    26                     newState = oldState;
    27                     wakeup = false;
    28                 }
    29             }
    30 
    31             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
    32                 this.gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    33                 this.gracefulShutdownTimeout = unit.toNanos(timeout);
    34                 if (oldState == 1) {
    35                     try {
    36                         this.doStartThread();
    37                     } catch (Throwable var10) {
    38                         STATE_UPDATER.set(this, 5);
    39                         this.terminationFuture.tryFailure(var10);
    40                         if (!(var10 instanceof Exception)) {
    41                             PlatformDependent.throwException(var10);
    42                         }
    43 
    44                         return this.terminationFuture;
    45                     }
    46                 }
    47 
    48                 if (wakeup) {
    49                     this.wakeup(inEventLoop);
    50                 }
    51 
    52                 return this.terminationFuture();
    53             }
    54         }
    55 
    56         return this.terminationFuture();
    57     }
    58 }

    前三个判断没什么好说的,isShuttingDown判断:

    1 public boolean isShuttingDown() {
    2     return this.state >= 3;
    3 }

    在之前NioEventLoop创建的时候,调用了一系列的继承链,其中state是在SingleThreadEventExecutor的构造方法中实现的,初始值是1,state有如下几种状态:

    1 private static final int ST_NOT_STARTED = 1;
    2 private static final int ST_STARTED = 2;
    3 private static final int ST_SHUTTING_DOWN = 3;
    4 private static final int ST_SHUTDOWN = 4;
    5 private static final int ST_TERMINATED = 5;

    可见在NioEventLoop初始化后处于尚未启动状态,并没有Channel的注册,也就不需要轮询。

    isShuttingDown就必然是false,就进入了else块:
    首先得到inEventLoop的返回值,该方法在AbstractEventExecutor中实现:

    1 public boolean inEventLoop() {
    2     return this.inEventLoop(Thread.currentThread());
    3 }

    他传入了一个当前线程,接着调用inEventLoop的重载,这个方法是在SingleThreadEventExecutor中实现:

    1 public boolean inEventLoop(Thread thread) {
    2     return thread == this.thread;
    3 }

    通过观察之前的SingleThreadEventExecutor构造方法,发现并没有对thread成员初始化,此时的this.thread就是null,那么返回值就是false,即inEventLoop为false。

    在while循环中又对isShuttingDown进行了判断,shutdownGracefully当让不仅仅使用在创建NioEventLoop对象失败时才调用的,无论是在EventLoopGroup的关闭,还是Bootstrap的关闭,都会关闭绑定的NioEventLoop,所以在多线程环境中,有可能会被其他线程关闭。

    在while循环中,结合上面可知满足进入switch块,在switch块中令newState为3;
    然后调用STATE_UPDATER的compareAndSet方法,STATE_UPDATER是用来原子化更新state成员的:

    1 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    所以这里就是使用CAS操作,原子化更新state成员为3,也就是使当前状态由ST_NOT_STARTED 变为了ST_SHUTTING_DOWN 状态。

    gracefulShutdownQuietPeriod和gracefulShutdownTimeout分别保存quietPeriod和timeout的纳秒级颗粒度。

    前面可知oldState使1,调用doStartThread方法:

      1 private void doStartThread() {
      2     assert this.thread == null;
      3 
      4     this.executor.execute(new Runnable() {
      5         public void run() {
      6             SingleThreadEventExecutor.this.thread = Thread.currentThread();
      7             if (SingleThreadEventExecutor.this.interrupted) {
      8                 SingleThreadEventExecutor.this.thread.interrupt();
      9             }
     10 
     11             boolean success = false;
     12             SingleThreadEventExecutor.this.updateLastExecutionTime();
     13             boolean var112 = false;
     14 
     15             int oldState;
     16             label1685: {
     17                 try {
     18                     var112 = true;
     19                     SingleThreadEventExecutor.this.run();
     20                     success = true;
     21                     var112 = false;
     22                     break label1685;
     23                 } catch (Throwable var119) {
     24                     SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119);
     25                     var112 = false;
     26                 } finally {
     27                     if (var112) {
     28                         int oldStatex;
     29                         do {
     30                             oldStatex = SingleThreadEventExecutor.this.state;
     31                         } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3));
     32 
     33                         if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) {
     34                             SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
     35                         }
     36 
     37                         try {
     38                             while(!SingleThreadEventExecutor.this.confirmShutdown()) {
     39                                 ;
     40                             }
     41                         } finally {
     42                             try {
     43                                 SingleThreadEventExecutor.this.cleanup();
     44                             } finally {
     45                                 SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
     46                                 SingleThreadEventExecutor.this.threadLock.release();
     47                                 if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) {
     48                                     SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
     49                                 }
     50 
     51                                 SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
     52                             }
     53                         }
     54 
     55                     }
     56                 }
     57 
     58                 do {
     59                     oldState = SingleThreadEventExecutor.this.state;
     60                 } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));
     61 
     62                 if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) {
     63                     SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
     64                 }
     65 
     66                 try {
     67                     while(!SingleThreadEventExecutor.this.confirmShutdown()) {
     68                         ;
     69                     }
     70 
     71                     return;
     72                 } finally {
     73                     try {
     74                         SingleThreadEventExecutor.this.cleanup();
     75                     } finally {
     76                         SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
     77                         SingleThreadEventExecutor.this.threadLock.release();
     78                         if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) {
     79                             SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
     80                         }
     81 
     82                         SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
     83                     }
     84                 }
     85             }
     86 
     87             do {
     88                 oldState = SingleThreadEventExecutor.this.state;
     89             } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));
     90 
     91             if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) {
     92                 SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
     93             }
     94 
     95             try {
     96                 while(!SingleThreadEventExecutor.this.confirmShutdown()) {
     97                     ;
     98                 }
     99             } finally {
    100                 try {
    101                     SingleThreadEventExecutor.this.cleanup();
    102                 } finally {
    103                     SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
    104                     SingleThreadEventExecutor.this.threadLock.release();
    105                     if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) {
    106                         SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
    107                     }
    108 
    109                     SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
    110                 }
    111             }
    112 
    113         }
    114     });
    115 }

    刚才说过this.thread并没有初始化,所以等于null成立,断言可以继续。

    然后直接使executor运行了一个线程,这个executor其实就是在刚才的MultithreadEventExecutorGroup中产生的ThreadPerTaskExecutor对象。

    在线程中,首先将SingleThreadEventExecutor的thread成员初始化为当前线程。

    在这里可能就有疑问了,为什么会在关闭时会调用名为doStartThread的方法,这个方法不因该在启动时调用吗?
    其实doStartThread在启动时是会被调用的,当在启动时被调用的话,每一个NioEventLoop都会被绑定一个线程用来处理真正的Selector操作,根据之前的说法就可以知道,每个EventLoopGroup在创建后都会被绑定cpu个数的二倍个NioEventLoop,而每个NioEventLoop都会绑定一个Selector对象,上面又说了在启动时SingleThreadEventExecutor绑定了一个线程,即NioEventLoop绑定了一个线程来处理其绑定的Selector的轮询。
    至于关闭时还会启动Selector的轮询,就是为了解决注册了的Channel没有被处理的情况。

    回到doStartThread方法,其实这个doStartThread方法的核心是SingleThreadEventExecutor.this.run(),这个方法就是正真的Selector的轮询操作,在NioEventLoop中实现:

     1 protected void run() {
     2     while(true) {
     3         while(true) {
     4             try {
     5                 switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {
     6                 case -2:
     7                     continue;
     8                 case -1:
     9                     this.select(this.wakenUp.getAndSet(false));
    10                     if (this.wakenUp.get()) {
    11                         this.selector.wakeup();
    12                     }
    13                 default:
    14                     this.cancelledKeys = 0;
    15                     this.needsToSelectAgain = false;
    16                     int ioRatio = this.ioRatio;
    17                     if (ioRatio == 100) {
    18                         try {
    19                             this.processSelectedKeys();
    20                         } finally {
    21                             this.runAllTasks();
    22                         }
    23                     } else {
    24                         long ioStartTime = System.nanoTime();
    25                         boolean var13 = false;
    26 
    27                         try {
    28                             var13 = true;
    29                             this.processSelectedKeys();
    30                             var13 = false;
    31                         } finally {
    32                             if (var13) {
    33                                 long ioTime = System.nanoTime() - ioStartTime;
    34                                 this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
    35                             }
    36                         }
    37 
    38                         long ioTime = System.nanoTime() - ioStartTime;
    39                         this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
    40                     }
    41                 }
    42             } catch (Throwable var21) {
    43                 handleLoopException(var21);
    44             }
    45 
    46             try {
    47                 if (this.isShuttingDown()) {
    48                     this.closeAll();
    49                     if (this.confirmShutdown()) {
    50                         return;
    51                     }
    52                 }
    53             } catch (Throwable var18) {
    54                 handleLoopException(var18);
    55             }
    56         }
    57     }
    58 }

    进入switch块,首先调用之前准备好的选择策略,其中this.selectNowSupplier在NioEventLoop创建的时候就被创建了:

    1 private final IntSupplier selectNowSupplier = new IntSupplier() {
    2     public int get() throws Exception {
    3         return NioEventLoop.this.selectNow();
    4     }
    5 };

    实际上调用了selectNow方法:

     1 int selectNow() throws IOException {
     2     int var1;
     3     try {
     4         var1 = this.selector.selectNow();
     5     } finally {
     6         if (this.wakenUp.get()) {
     7             this.selector.wakeup();
     8         }
     9 
    10     }
    11 
    12     return var1;
    13 }

    这里就直接调用了JDK原生的selectNow方法。
    之前说过的选择策略:

    1 public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    2     return hasTasks ? selectSupplier.get() : -1;
    3 }

    其中hasTasks是根据hasTasks方法来判断,而hasTasks方法就是判断任务队列是否为空,那么在一开始初始化,必然是空的,所以这里calculateStrategy的返回值就是-1;

    那么case为-1条件成立,执行this.select(this.wakenUp.getAndSet(false)),其中wakenUp是一个原子化的Boolean,用来表示是需要唤醒Selector的轮询阻塞,初始化是为true,这里通过CAS操作设置为false代表不需要唤醒,后面在select执行完后,又判断wakenUp是否需要唤醒,说明在select中对Selector的阻塞进行了检查,若是需要唤醒,就通过Selector的原生API完成唤醒【Java】NIO中Selector的select方法源码分析

    来看看这里的select实现:

     1 private void select(boolean oldWakenUp) throws IOException {
     2     Selector selector = this.selector;
     3 
     4     try {
     5         int selectCnt = 0;
     6         long currentTimeNanos = System.nanoTime();
     7         long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);
     8 
     9         while(true) {
    10             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    11             if (timeoutMillis <= 0L) {
    12                 if (selectCnt == 0) {
    13                     selector.selectNow();
    14                     selectCnt = 1;
    15                 }
    16                 break;
    17             }
    18 
    19             if (this.hasTasks() && this.wakenUp.compareAndSet(false, true)) {
    20                 selector.selectNow();
    21                 selectCnt = 1;
    22                 break;
    23             }
    24 
    25             int selectedKeys = selector.select(timeoutMillis);
    26             ++selectCnt;
    27             if (selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
    28                 break;
    29             }
    30 
    31             if (Thread.interrupted()) {
    32                 if (logger.isDebugEnabled()) {
    33                     logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
    34                 }
    35 
    36                 selectCnt = 1;
    37                 break;
    38             }
    39 
    40             long time = System.nanoTime();
    41             if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    42                 selectCnt = 1;
    43             } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    44                 logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
    45                 this.rebuildSelector();
    46                 selector = this.selector;
    47                 selector.selectNow();
    48                 selectCnt = 1;
    49                 break;
    50             }
    51 
    52             currentTimeNanos = time;
    53         }
    54 
    55         if (selectCnt > 3 && logger.isDebugEnabled()) {
    56             logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
    57         }
    58     } catch (CancelledKeyException var13) {
    59         if (logger.isDebugEnabled()) {
    60             logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, var13);
    61         }
    62     }
    63 
    64 }

    这个方法虽然看着很长,但核心就是判断这个存放任务的阻塞队列是否还有任务,若是有,就直接调用Selector的selectNow方法获取就绪的文件描述符,若是没有就绪的文件描述符该方法也会立即返回;若是阻塞队列中没有任务,就调用Selector的select(timeout)方法,尝试在超时时间内取获取就绪的文件描述符。

    因为现在是在执行NioEventLoopGroup的创建,并没有Channel的注册,也就没有轮询到任何文件描述符就绪。
    在轮询结束后,回到run方法,进入default块:
    其中ioRatio是执行IO操作和执行任务队列的任务用时比率,默认是50。若是ioRatio设置为100,就必须等到tasks阻塞队列中的所有任务执行完毕才再次进行轮询;若是小于100,那么就根据(100 - ioRatio) / ioRatio的比值乘以ioTime计算出的超时时间让所有任务尝试在超时时间内执行完毕,若是到达超时时间还没执行完毕,就在下一轮的轮询中执行。

    processSelectedKeys方法就是获取Selector轮询的SelectedKeys结果:

    1 private void processSelectedKeys() {
    2     if (this.selectedKeys != null) {
    3         this.processSelectedKeysOptimized();
    4     } else {
    5         this.processSelectedKeysPlain(this.selector.selectedKeys());
    6     }
    7 
    8 }

    selectedKeys 在openSelector时被初始化过了,若是在openSelector中出现异常selectedKeys才会为null。

    processSelectedKeysOptimized方法:

     1 private void processSelectedKeysOptimized() {
     2     for(int i = 0; i < this.selectedKeys.size; ++i) {
     3         SelectionKey k = this.selectedKeys.keys[i];
     4         this.selectedKeys.keys[i] = null;
     5         Object a = k.attachment();
     6         if (a instanceof AbstractNioChannel) {
     7             this.processSelectedKey(k, (AbstractNioChannel)a);
     8         } else {
     9             NioTask<SelectableChannel> task = (NioTask)a;
    10             processSelectedKey(k, task);
    11         }
    12 
    13         if (this.needsToSelectAgain) {
    14             this.selectedKeys.reset(i + 1);
    15             this.selectAgain();
    16             i = -1;
    17         }
    18     }
    19 
    20 }

    这里就通过遍历在openSelector中注入进Selector的SelectedKeys,得到SelectionKey 对象。
    在这里可以看到Netty很巧妙地通过SelectionKey的attachment附件,将JDK中的Channel和Netty中的Channel联系了起来。
    根据得到的附件Channel的类型,执行不同的processSelectedKey方法,去处理IO操作。

    processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法:

     1 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     2     NioUnsafe unsafe = ch.unsafe();
     3     if (!k.isValid()) {
     4         NioEventLoop eventLoop;
     5         try {
     6             eventLoop = ch.eventLoop();
     7         } catch (Throwable var6) {
     8             return;
     9         }
    10 
    11         if (eventLoop == this && eventLoop != null) {
    12             unsafe.close(unsafe.voidPromise());
    13         }
    14     } else {
    15         try {
    16             int readyOps = k.readyOps();
    17             if ((readyOps & 8) != 0) {
    18                 int ops = k.interestOps();
    19                 ops &= -9;
    20                 k.interestOps(ops);
    21                 unsafe.finishConnect();
    22             }
    23 
    24             if ((readyOps & 4) != 0) {
    25                 ch.unsafe().forceFlush();
    26             }
    27 
    28             if ((readyOps & 17) != 0 || readyOps == 0) {
    29                 unsafe.read();
    30             }
    31         } catch (CancelledKeyException var7) {
    32             unsafe.close(unsafe.voidPromise());
    33         }
    34 
    35     }
    36 }

    这里的主要核心就是根据SelectedKey的readyOps值来判断,处理不同的就绪事件,有如下几种事件:

    1 public static final int OP_READ = 1 << 0;
    2 public static final int OP_WRITE = 1 << 2;
    3 public static final int OP_CONNECT = 1 << 3;
    4 public static final int OP_ACCEPT = 1 << 4;

    结合来看上面的判断就对应:连接就绪、写就绪、侦听或者读就绪(或者缺省状态0,该状态是未来注册时的默认状态,后续博客会介绍),交由Netty的AbstractNioChannel的NioUnsafe去处理不同事件的byte数据,NioUnsafe会将数据再交由ChannelPipeline双向链表去处理。
    关于ChannelPipeline会在后续的博客中详细介绍。

    processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)这个方法的实现细节需要由使用者实现NioTask<SelectableChannel>接口,就不说了。

    回到processSelectedKeys方法,在this.selectedKeys等于null的情况下:

     1 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
     2     if (!selectedKeys.isEmpty()) {
     3         Iterator i = selectedKeys.iterator();
     4 
     5         while(true) {
     6             SelectionKey k = (SelectionKey)i.next();
     7             Object a = k.attachment();
     8             i.remove();
     9             if (a instanceof AbstractNioChannel) {
    10                 this.processSelectedKey(k, (AbstractNioChannel)a);
    11             } else {
    12                 NioTask<SelectableChannel> task = (NioTask)a;
    13                 processSelectedKey(k, task);
    14             }
    15 
    16             if (!i.hasNext()) {
    17                 break;
    18             }
    19 
    20             if (this.needsToSelectAgain) {
    21                 this.selectAgain();
    22                 selectedKeys = this.selector.selectedKeys();
    23                 if (selectedKeys.isEmpty()) {
    24                     break;
    25                 }
    26 
    27                 i = selectedKeys.iterator();
    28             }
    29         }
    30 
    31     }
    32 }

    这是在openSelector中注入进Selector的SelectedKeys失败的情况下,直接遍历Selector本身的SelectedKeys,和processSelectedKeysOptimized没有差别。

    继续回到run方法,在调用完processSelectedKeys方法后,就需要调用runAllTasks处理任务队列中的任务:
    runAllTasks()方法:

     1 protected boolean runAllTasks() {
     2     assert this.inEventLoop();
     3 
     4     boolean ranAtLeastOne = false;
     5 
     6     boolean fetchedAll;
     7     do {
     8         fetchedAll = this.fetchFromScheduledTaskQueue();
     9         if (this.runAllTasksFrom(this.taskQueue)) {
    10             ranAtLeastOne = true;
    11         }
    12     } while(!fetchedAll);
    13 
    14     if (ranAtLeastOne) {
    15         this.lastExecutionTime = ScheduledFutureTask.nanoTime();
    16     }
    17 
    18     this.afterRunningAllTasks();
    19     return ranAtLeastOne;
    20 }

    首先调用fetchFromScheduledTaskQueue方法:

     1 private boolean fetchFromScheduledTaskQueue() {
     2     long nanoTime = AbstractScheduledEventExecutor.nanoTime();
     3 
     4     for(Runnable scheduledTask = this.pollScheduledTask(nanoTime); scheduledTask != null; scheduledTask = this.pollScheduledTask(nanoTime)) {
     5         if (!this.taskQueue.offer(scheduledTask)) {
     6             this.scheduledTaskQueue().add((ScheduledFutureTask)scheduledTask);
     7             return false;
     8         }
     9     }
    10 
    11     return true;
    12 }

    这里就是通过pollScheduledTask不断地从延时任务队列获取到期的任务,将到期的任务添加到taskQueue任务队列中,为上面的runAllTasksFrom执行做准备;若是添加失败,再把它放进延时任务队列。

    pollScheduledTask方法:

     1 protected final Runnable pollScheduledTask(long nanoTime) {
     2     assert this.inEventLoop();
     3 
     4     Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
     5     ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : (ScheduledFutureTask)scheduledTaskQueue.peek();
     6     if (scheduledTask == null) {
     7         return null;
     8     } else if (scheduledTask.deadlineNanos() <= nanoTime) {
     9         scheduledTaskQueue.remove();
    10         return scheduledTask;
    11     } else {
    12         return null;
    13     }
    14 }

    从延时任务队列中获取队首的任务scheduledTask,若是scheduledTask的deadlineNanos小于等于nanoTime,说明该任务到期。

    回到runAllTasks,将到期了的延时任务放在了任务队列,由runAllTasksFrom执行这些任务:

     1 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
     2     Runnable task = pollTaskFrom(taskQueue);
     3     if (task == null) {
     4         return false;
     5     } else {
     6         do {
     7             safeExecute(task);
     8             task = pollTaskFrom(taskQueue);
     9         } while(task != null);
    10 
    11         return true;
    12     }
    13 }

    不断地从任务队列队首获取任务,然后执行,直到没有任务。

    pollTaskFrom是获取队首任务:

    1 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    2     Runnable task;
    3     do {
    4         task = (Runnable)taskQueue.poll();
    5     } while(task == WAKEUP_TASK);
    6 
    7     return task;
    8 }

    其中WAKEUP_TASK,是用来巧妙地控制循环:

    1 private static final Runnable WAKEUP_TASK = new Runnable() {
    2     public void run() {
    3     }
    4 };


    safeExecute是执行任务:

    1 protected static void safeExecute(Runnable task) {
    2     try {
    3         task.run();
    4     } catch (Throwable var2) {
    5         logger.warn("A task raised an exception. Task: {}", task, var2);
    6     }
    7 
    8 }

    实际上就是执行Runnable 的run方法。

    继续回到runAllTasks方法,当所有到期任务执行完毕后,根据ranAtLeastOne判断是否需要修改最后一次执行时间lastExecutionTime,最后调用afterRunningAllTasks方法,该方法是在SingleThreadEventLoop中实现的:

    1 protected void afterRunningAllTasks() {
    2     this.runAllTasksFrom(this.tailTasks);
    3 }

    这里就仅仅执行了tailTasks队列中的任务。runAllTasks到这里执行完毕。

    再来看看runAllTasks(timeoutNanos)方法:

     1 protected boolean runAllTasks(long timeoutNanos) {
     2     this.fetchFromScheduledTaskQueue();
     3     Runnable task = this.pollTask();
     4     if (task == null) {
     5         this.afterRunningAllTasks();
     6         return false;
     7     } else {
     8         long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
     9         long runTasks = 0L;
    10 
    11         long lastExecutionTime;
    12         while(true) {
    13             safeExecute(task);
    14             ++runTasks;
    15             if ((runTasks & 63L) == 0L) {
    16                 lastExecutionTime = ScheduledFutureTask.nanoTime();
    17                 if (lastExecutionTime >= deadline) {
    18                     break;
    19                 }
    20             }
    21 
    22             task = this.pollTask();
    23             if (task == null) {
    24                 lastExecutionTime = ScheduledFutureTask.nanoTime();
    25                 break;
    26             }
    27         }
    28 
    29         this.afterRunningAllTasks();
    30         this.lastExecutionTime = lastExecutionTime;
    31         return true;
    32     }
    33 }

    这个方法前面的runAllTasks方法类似,先通过fetchFromScheduledTaskQueue将所有到期了的延时任务放在taskQueue中,然后不断从taskQueue队首获取任务,但是,若是执行到了到超过了63个任务,判断是否达到了超时时间deadline,若是达到结束循环,留着下次执行,反之继续循环执行任务。

    回到run方法,在轮询完毕,并且执行完任务后,通过isShuttingDown判断当前状态,在之前的CAS操作中,state已经变为了3,所以isShuttingDown成立,就需要调用closeAll方法

     1 private void closeAll() {
     2     this.selectAgain();
     3     Set<SelectionKey> keys = this.selector.keys();
     4     Collection<AbstractNioChannel> channels = new ArrayList(keys.size());
     5     Iterator var3 = keys.iterator();
     6 
     7     while(var3.hasNext()) {
     8         SelectionKey k = (SelectionKey)var3.next();
     9         Object a = k.attachment();
    10         if (a instanceof AbstractNioChannel) {
    11             channels.add((AbstractNioChannel)a);
    12         } else {
    13             k.cancel();
    14             NioTask<SelectableChannel> task = (NioTask)a;
    15             invokeChannelUnregistered(task, k, (Throwable)null);
    16         }
    17     }
    18 
    19     var3 = channels.iterator();
    20 
    21     while(var3.hasNext()) {
    22         AbstractNioChannel ch = (AbstractNioChannel)var3.next();
    23         ch.unsafe().close(ch.unsafe().voidPromise());
    24     }
    25 
    26 }

    在这里首先调用selectAgain进行一次轮询:

     1 private void selectAgain() {
     2     this.needsToSelectAgain = false;
     3 
     4     try {
     5         this.selector.selectNow();
     6     } catch (Throwable var2) {
     7         logger.warn("Failed to update SelectionKeys.", var2);
     8     }
     9 
    10 }

    通过这次的轮询,将当前仍有事件就绪的JDK的SelectionKey中绑定的Netty的Channel添加到channels集合中,遍历这个集合,通过unsafe的close方法关闭Netty的Channel。

    之后调用confirmShutdown方法:

     1 protected boolean confirmShutdown() {
     2     if (!this.isShuttingDown()) {
     3         return false;
     4     } else if (!this.inEventLoop()) {
     5         throw new IllegalStateException("must be invoked from an event loop");
     6     } else {
     7         this.cancelScheduledTasks();
     8         if (this.gracefulShutdownStartTime == 0L) {
     9             this.gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    10         }
    11 
    12         if (!this.runAllTasks() && !this.runShutdownHooks()) {
    13             long nanoTime = ScheduledFutureTask.nanoTime();
    14             if (!this.isShutdown() && nanoTime - this.gracefulShutdownStartTime <= this.gracefulShutdownTimeout) {
    15                 if (nanoTime - this.lastExecutionTime <= this.gracefulShutdownQuietPeriod) {
    16                     this.wakeup(true);
    17 
    18                     try {
    19                         Thread.sleep(100L);
    20                     } catch (InterruptedException var4) {
    21                         ;
    22                     }
    23 
    24                     return false;
    25                 } else {
    26                     return true;
    27                 }
    28             } else {
    29                 return true;
    30             }
    31         } else if (this.isShutdown()) {
    32             return true;
    33         } else if (this.gracefulShutdownQuietPeriod == 0L) {
    34             return true;
    35         } else {
    36             this.wakeup(true);
    37             return false;
    38         }
    39     }
    40 }

    首先调用cancelScheduledTasks,取消所有的延时任务:

     1 protected void cancelScheduledTasks() {
     2     assert this.inEventLoop();
     3 
     4     PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
     5     if (!isNullOrEmpty(scheduledTaskQueue)) {
     6         ScheduledFutureTask<?>[] scheduledTasks = (ScheduledFutureTask[])scheduledTaskQueue.toArray(new ScheduledFutureTask[scheduledTaskQueue.size()]);
     7         ScheduledFutureTask[] var3 = scheduledTasks;
     8         int var4 = scheduledTasks.length;
     9 
    10         for(int var5 = 0; var5 < var4; ++var5) {
    11             ScheduledFutureTask<?> task = var3[var5];
    12             task.cancelWithoutRemove(false);
    13         }
    14 
    15         scheduledTaskQueue.clearIgnoringIndexes();
    16     }
    17 }

    遍历scheduledTasks这个延时任务对立中所有的任务,通过cancelWithoutRemove将该任务取消。

    至此轮询的整个生命周期完成。

    回到SingleThreadEventExecutor的doStartThread方法,在run方法执行完毕后,说明Selector轮询结束,调用SingleThreadEventExecutor.this.cleanup()方法关闭Selector:

    1 protected void cleanup() {
    2     try {
    3         this.selector.close();
    4     } catch (IOException var2) {
    5         logger.warn("Failed to close a selector.", var2);
    6     }
    7 
    8 }

    这次终于可以回到MultithreadEventExecutorGroup的构造,在children创建完毕后,用chooserFactory根据children的大小创建chooser,前面说过。

    然后产生terminationListener异步中断监听对象,给每个NioEventLoop设置中断监听,然后对children进行了备份处理,通过readonlyChildren保存。


    至此NioEventLoopGroup的创建全部结束。

  • 相关阅读:
    eclipse下SpringMVC+Maven+Mybatis+MySQL项目搭建
    Springmvc UPDATE 数据时 ORA-01858:a non-numeric character was found where a numeric was expected
    新建 jsp异常,The superclass "javax.servlet.http.HttpServlet" was not found on the Java Build Path
    Spring MVC 单元测试异常 Caused by: org.springframework.core.NestedIOException: ASM ClassReader failed to parse class file
    UE添加鼠标右键打开
    mysql 组合索引
    mysql 查询条件中文问题
    sqlserver 游标
    sqlserver 在将 nvarchar 值 'XXX' 转换成数据类型 int 时失败
    过程需要类型为 'ntext/nchar/nvarchar' 的参数 '@statement'
  • 原文地址:https://www.cnblogs.com/a526583280/p/10927544.html
Copyright © 2011-2022 走看看