zoukankan      html  css  js  c++  java
  • Netty源码学习系列之5-NioEventLoop的run方法

    前言

        NioEventLoop的run方法,是netty中最核心的方法,没有之一。在该方法中,完成了对已注册的channel上来自底层操作系统的socket事件的处理(在服务端时事件包括客户端的连接事件和读写事件,在客户端时是读写事件)、单线程任务队列的处理(服务端的注册事件、客户端的connect事件等),当然还包括对NIO空轮询的规避、消息的编解码等。下面一起来探究一番,首先奉上run方法的源码:

     1 protected void run() {
     2         for (;;) {
     3             try {
     4                 try {
     5                     // 1、确定处理策略
     6                     switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
     7                     case SelectStrategy.CONTINUE:
     8                         continue;
     9                     case SelectStrategy.BUSY_WAIT:
    10                     case SelectStrategy.SELECT:
    11                         // 2、表示有socket事件,需要进行处理
    12                         select(wakenUp.getAndSet(false));
    13                         if (wakenUp.get()) {
    14                             selector.wakeup();
    15                         }
    16                     default:
    17                     }
    18                 } catch (IOException e) {
    19                     // selector有异常,则重新创建一个
    20                     rebuildSelector0();
    21                     handleLoopException(e);
    22                     continue;
    23                 }
    24                 cancelledKeys = 0;
    25                 needsToSelectAgain = false;
    26                 final int ioRatio = this.ioRatio;
    27                 if (ioRatio == 100) {
    28                     try {
    29                         // 3、处理来自客户端或者服务端的socket事件
    30                         processSelectedKeys();
    31                     } finally {
    32                         // 4、处理队列中的task任务
    33                         runAllTasks();
    34                     }
    35                 } else {
    36                     final long ioStartTime = System.nanoTime();
    37                     try {
    38                         // 3、处理来自客户端或者服务端的socket事件
    39                         processSelectedKeys();
    40                     } finally {
    41                         final long ioTime = System.nanoTime() - ioStartTime;
    42                         // 4、处理队列中的task任务
    43                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    44                     }
    45                 }
    46             } catch (Throwable t) {
    47                 handleLoopException(t);
    48             }
    49             // 执行shutdown后的善后逻辑
    50             try {
    51                 if (isShuttingDown()) {
    52                     closeAll();
    53                     if (confirmShutdown()) {
    54                         return;
    55                     }
    56                 }
    57             } catch (Throwable t) {
    58                 handleLoopException(t);
    59             }
    60         }
    61     }

        run方法中有四个主要的方法,已在上面注释中标出,主要逻辑概括起来就是:先通过select方法探知是否当前channel上有就绪的事件(方法1和方法2),然后处理这些事件(方法3),最后再处理队列中的任务(方法4)。

    一、selectStrategy.calculateStrategy方法

         selectStrategy只有一个默认实现类DefaultSelectStrategy,实现方法如下,如果判断有任务,则走selectSupplier.get()方法,否则直接返回SELECT -1,进入方法2-select方法。

    1 public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    2         return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    3     }

        然后看一下匿名类selectSupplier.get方法中的逻辑,如下,可以看到它直接调的非阻塞select方法。

    1 private final IntSupplier selectNowSupplier = new IntSupplier() {
    2         @Override
    3         public int get() throws Exception {
    4             return selectNow();
    5         }
    6     };

        总结一下calculateStrategy方法这么做的用意。从run方法的整体顺序中可以看到,每次循环中都是先执行方法3处理channel事件,再执行方法4处理队列中的任务,即处理channel事件的优先级更高。但如果队列中有任务待处理,那么为提高框架处理性能,就不允许执行阻塞的select方法,而是执行非阻塞的selectNow方法,这样就能快速处理完channel事件后去处理队列中的任务。

    二、select(boolean)方法

        要理解该方法,需先理解wakenUp变量和wakeup方法的作用。wakenUp是AtomicBoolean类型的变量,如果是true,则表示最近调用过wakeup方法,如果是false,则表示最近未调用wakeup方法,另外每次进入select(boolean)方法都会将wakenUp置为false。而wakeup方法是针对selector.select方法设计的,如果调用wakeup方法时处于selector.select阻塞方法中,则会直接唤醒处于selector.select阻塞中的线程,而如果调用wakeup方法时selector不处于selector.select阻塞方法中,则效果是在下一次调selector.select方法时不阻塞(有点像LockSupport.park/unpark的效果)。下面是select(boolean)方法逻辑:

     1 private void select(boolean oldWakenUp) throws IOException {
     2         Selector selector = this.selector;
     3         try {
     4             int selectCnt = 0;
     5             long currentTimeNanos = System.nanoTime();
     6             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
     7             for (;;) {
     8                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
     9                 if (timeoutMillis <= 0) {
    10                     if (selectCnt == 0) {
    11                         selector.selectNow();
    12                         selectCnt = 1;
    13                     }
    14                     break;
    15                 }
    16                 // 重点1:在调用阻塞的select方法前再判断一遍是否有任务需要处理,此处逻辑虽然不多,但有深意  ***
    17                 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
    18                     selector.selectNow();
    19                     selectCnt = 1;
    20                     break;
    21                 }
    22                 // 调用阻塞的select方法,但设置了超时时间
    23                 int selectedKeys = selector.select(timeoutMillis);
    24                 selectCnt ++;
    25 
    26                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    27                     // 有事件;wakenUp之前是true(说明有新任务进入了队列中);wakenUp现在是true(说明有新任务在本方法执行的过程中进来了),有任务   满足以上任意一个都退出循环
    28                     break;
    29                 }
    30                 if (Thread.interrupted()) {
    31                     // 省略异常日志打印
    32                     selectCnt = 1;
    33                     break;
    34                 }
    35 
    36                 long time = System.nanoTime();
    37                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    38                     // timeoutMillis elapsed without anything selected.
    39                     selectCnt = 1;
    40                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
    41                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    42                     // 重点2: 说明触发了空轮训,需要做处理
    43                     selector = selectRebuildSelector(selectCnt);
    44                     selectCnt = 1;
    45                     break;
    46                 }
    47                 currentTimeNanos = time;
    48             }
    49              // catch 异常处理
    50     }    

        该方法有两处重点,均已标出。

    重点1

        该处逻辑需结合wakenUp变量和wakeup方法来理解。

        首先,对wakenUp变量的操作除了run方法外,还有SingleThreadEventExecutor的execute方法。execute中添加完task后,会调用NioEventLoop中的重写方法wakeup:

    1 protected void wakeup(boolean inEventLoop) {
    2         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
    3             selector.wakeup();
    4         }
    5     }

        注:selector.wakenUp方法用于唤醒被selector.select()或者selector.select(long time)阻塞的selector,让其立马返回key的数量。

        它做了两件事,1是通过cas将wakenUp由false变为true,2是调用selector.wakeup方法。

        再来看select(boolean)方法的入口处,通过wakenUp.getAndSet(false)方法将wakenUp设为false,然后将原值作为入参传入select(boolean)方法。

        一切条件就绪,然后再回过头看重点1(如下)。它想实现的功能就是如果队列中有新的任务来了,能不调selector.select的阻塞方法,有任务等待执行时能不阻塞就不阻塞,提高效率。

    1 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
    2                     selector.selectNow();
    3                     selectCnt = 1;
    4                     break;
    5                 }

        但细究一下会发现这个方法的两个判断逻辑存在一个矛盾,首先进入当前select(boolean)方法时,wakenUp被置为false,而在添加完任务后,NioEventLoop中的wakeup方法又会将wakenUp置为true,即如果hasTasks()方法返回true时,因为wakenUp已经被置为true了所以第二个条件肯定判断为false,那if里面的逻辑什么场景下才会走到呢?

        不知道各位园友们走到这里的时候会不会有这样的疑问,反正博主刚开始是被自己难倒了,后来又重新分析了下才找到原因。其实博主刚才对矛盾点的描述就未分清时间先后。因为有新任务来的时候,是先往队列中添加任务,再将wakenUp置为true(selector.wakeup()方法可以认为与置为true是同时发生的),即如果添加了task但还没来得及将wakenUp置为true时才会进入这个if中。

        那么新的问题来了,为什么将wakenUp置为true了就不用进if中呢?是因为如果wakenUp已经是true了,那么可以认为已经执行了selector.wakeup方法了,既然如此,selector.select虽然是阻塞方法也就不会再阻塞了,而是直接返回结果,所以没必要再进if中。

        此处还有一个容易让人迷糊的地方就是下面的四个或的逻辑判断:

    1 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    2                     break;
    3                 }

        即满足这四个条件中的任意一个就退出循环,这四个条件各代表什么意思?

        第一个:channel中有socket事件需处理,这个肯定是要跳出循环处理的;

        第二个:oldWakenUp为true,即进select(boolean)方法之前wakenUp为true,说明队列中有新任务来了,所以也要跳出循环,出去处理;

        第三个:wakenUp现在为true,说明在进入select(boolean)方法之后队列中有新任务来了,需跳出循环处理;

        第四/五个:两个队列中有任务,需出去处理。

        其实就是说,如果当前没有事件过来,队列中又没有任务处理,那么就继续走select(boolean)的无限for循环(反正没事做),否则说明来菜了需要跳出循环出去处理。

    重点2:

        对于空轮训的处理其实没有太多花哨的地方,netty开发者设置了一个阈值512,如果selectCnt计数达到了512,说明触发了空轮训,此时 selectRebuildSelector 方法会创建一个新的selector,将原selector上的全部事件重新注册到新selector上。

        注:空轮训即调select(time)/select()阻塞方法的时候,由于出现了bug导致不阻塞而是直接返回空结果,并且后面每次都这样,仿佛螺丝滑了丝一般顺滑,,,

    三、processSelectedKeys()方法

        点进去看到里面的逻辑,第一个方法是优化之后的处理,第二个是未优化的处理,一般都是走优化的逻辑。

    private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }

        processSelectedKeysOptimized方法如下:

     1 private void processSelectedKeysOptimized() {
     2         for (int i = 0; i < selectedKeys.size; ++i) {
     3             final SelectionKey k = selectedKeys.keys[i];
     4             selectedKeys.keys[i] = null;
     5             final Object a = k.attachment();
     6             if (a instanceof AbstractNioChannel) {
     7                 processSelectedKey(k, (AbstractNioChannel) a); // 从attachment中取出之前放入的AbstractNioChannel对象,进行处理
     8             } else {
     9                 @SuppressWarnings("unchecked")
    10                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    11                 processSelectedKey(k, task);
    12             }
    13             if (needsToSelectAgain) {
    14                 selectedKeys.reset(i + 1);
    15                 selectAgain();
    16                 i = -1;
    17             }
    18         }
    19     }

        继续跟进针对单个SelectionKey的处理:

     1 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     2         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     3         if (!k.isValid()) {
     4             // 针对无效key的处理
     5         }
     6 
     7         try {
     8             int readyOps = k.readyOps(); // 获取已经就绪的操作类型
     9             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    10                 // 1、针对连接事件的处理
    11                 int ops = k.interestOps();
    12                 ops &= ~SelectionKey.OP_CONNECT;
    13                 k.interestOps(ops);
    14                 unsafe.finishConnect();
    15             }
    16 
    17             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    18                 // 2、针对写事件的处理
    19                 ch.unsafe().forceFlush();
    20             }
    21 
    22             ///3、针对读事件/接受连接事件的处理
    23             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    24                 unsafe.read();
    25             }
    26         } catch (CancelledKeyException ignored) {
    27             unsafe.close(unsafe.voidPromise());
    28         }
    29     }

        可以看到,在此方法中按不同的事件类型调用unsafe方法对其进行处理,再往后追溯就是pipeline的相关处理了,具体内容较多,有兴趣可自行查看,后面有机会博主也会继续更新。

    有一点需要着重提的是对ACCEPT事件的处理(服务端在接收到客户端的连接请求时触发该事件),因为是服务端,所以进入AbstractNioMessageChannel.NioMessageUnsafe#read方法,

        可以看到有段do/while循环,如下:

     1 do {
     2                         int localRead = doReadMessages(readBuf);
     3                         if (localRead == 0) {
     4                             break;
     5                         }
     6                         if (localRead < 0) {
     7                             closed = true;
     8                             break;
     9                         }
    10 
    11                         allocHandle.incMessagesRead(localRead);
    12                     } while (allocHandle.continueReading());

        doReadMessages方法的实现位于NioServerSocketChannel中,可以看到第五行往buf中添加了一个NioSocketChannel对象。

     1 protected int doReadMessages(List<Object> buf) throws Exception {
     2         SocketChannel ch = SocketUtils.accept(javaChannel());
     3         try {
     4             if (ch != null) {
     5                 buf.add(new NioSocketChannel(this, ch));
     6                 return 1;
     7             }
     8         } catch (Throwable t) {
     9             logger.warn("Failed to create a new channel from an accepted socket.", t);
    10             try {
    11                 ch.close();
    12             } catch (Throwable t2) {
    13                 logger.warn("Failed to close a socket.", t2);
    14             }
    15         }
    16         return 0;
    17     }

        再跳出来回到read方法,往下看有个for循环,开始了pipeline的调用,结合前面【https://www.cnblogs.com/zzq6032010/p/13034608.html】bind方法的博文可以知道,此时pipeline中除了头尾两个节点以外,还有一个ServerBootstrapAcceptor,此处最终就会调到ServerBootstrapAcceptor的channelRead方法,该方法很重要,最终将上面生成的NioSocketChannel中的pipeline、channelOption、attr初始化,然后注册到childGroup上。至此,服务端具备了与客户端通信的能力,可正常处理read、write事件了。

    1 int size = readBuf.size();
    2 for (int i = 0; i < size; i ++) {
    3     readPending = false;
    4     pipeline.fireChannelRead(readBuf.get(i));
    5 }

    四、runAllTasks()

        再粘贴一下runAllTasks附近的代码:

    1 final long ioStartTime = System.nanoTime();
    2 try {
    3     processSelectedKeys();
    4 } finally {
    5     // Ensure we always run tasks.
    6     final long ioTime = System.nanoTime() - ioStartTime;
    7     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    8 }

        首先说一下ioRatio变量,此变量控制的是当前线程中处理channel事件和处理任务队列所用的时间比,如果为50(即50%),则二者用的时间相同,从上面代码中可以看出,ioTime即处理channel事件所用的时间,当ioRatio=50时,runAllTasks的入参就是ioTime;而如果ioRatio=10,则runAllTasks入参为9*ioTime,即处理任务队列的最大时间是处理channel事件的9倍。

        下面是runAllTasks方法代码:

     1 protected boolean runAllTasks(long timeoutNanos) {
     2         fetchFromScheduledTaskQueue();
     3         Runnable task = pollTask();
     4         if (task == null) {
     5             afterRunningAllTasks();
     6             return false;
     7         }
     8         final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
     9         long runTasks = 0;
    10         long lastExecutionTime;
    11         for (;;) {
    12             safeExecute(task);
    13             runTasks ++;
    14             if ((runTasks & 0x3F) == 0) { // 每隔64次计算一下超时时间
    15                 lastExecutionTime = ScheduledFutureTask.nanoTime();
    16                 if (lastExecutionTime >= deadline) {
    17                     break;
    18                 }
    19             }
    20             task = pollTask();
    21             if (task == null) {
    22                 lastExecutionTime = ScheduledFutureTask.nanoTime();
    23                 break;
    24             }
    25         }
    26         afterRunningAllTasks();
    27         this.lastExecutionTime = lastExecutionTime;
    28         return true;
    29     }

        整体逻辑不难,用一个for循环来依次取出任务处理,并且为了提高效率,每隔64次计算一下超时时间(对netty开发者来说,获取系统纳秒时间也是一笔性能开支,能少获取就少获取)。

     总结

        netty中最核心的run方法就介绍到这里,至此,netty进行数据传输前的准备工作都已经过了一遍,但对于netty具体发送、接收数据的流程还未涉及到。netty具体发送、接收数据是借助pipeline和在childHandler中添加的处理器完成的,这部分将不定期的在后面博文中讲述,具体看缘分吧。

  • 相关阅读:
    Window 窗口类
    使用 Bolt 实现 GridView 表格控件
    lua的table库
    Windows编程总结之 DLL
    lua 打印 table 拷贝table
    使用 xlue 实现简单 listbox 控件
    使用 xlue 实现 tips
    extern “C”
    COleVariant如何转换为int double string cstring
    原来WIN32 API也有GetOpenFileName函数
  • 原文地址:https://www.cnblogs.com/zzq6032010/p/13122483.html
Copyright © 2011-2022 走看看