zoukankan      html  css  js  c++  java
  • Netty-NioEventLoop

    Netty 事件循环主逻辑在 NioEventLoop.run 中的 processSelectedKeys函数中

     1 protected void run() {
           //主循环不断读取IO事件和task,因为 EventLoop 也是 juc 的 ScheduledExecutorService 实现
    2 for (;;) { 3 try { 4 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 5 case SelectStrategy.CONTINUE: 6 continue; 7 case SelectStrategy.SELECT: 8 select(wakenUp.getAndSet(false)); 9 10 37 38 if (wakenUp.get()) { 39 selector.wakeup(); 40 } 41 // fall through 42 default: 43 } 44 45 cancelledKeys = 0; 46 needsToSelectAgain = false;
               // IO事件占总执行时间的百分比 */
    47 final int ioRatio = this.ioRatio; 48 if (ioRatio == 100) { 49 try { 50 processSelectedKeys(); 51 } finally { 52 // Ensure we always run tasks. 53 runAllTasks(); 54 } 55 } else { 56 final long ioStartTime = System.nanoTime(); 57 try { 58 processSelectedKeys(); 59 } finally { 60 // Ensure we always run tasks. 61 final long ioTime = System.nanoTime() - ioStartTime; 62 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 63 } 64 } 65 } catch (Throwable t) { 66 handleLoopException(t); 67 } 68 // Always handle shutdown even if the loop processing threw an exception. 69 try { 70 if (isShuttingDown()) { 71 closeAll(); 72 if (confirmShutdown()) { 73 return; 74 } 75 } 76 } catch (Throwable t) { 77 handleLoopException(t); 78 } 79 } 80 }

    processSelectedKeys 函数 执行时会判断是否执行优化的版本,即判断 SelectedSelectionKeySet 是否为空。

    是否开启优化取决于是否设置了环境变量  io.netty.noKeySetOptimization ,默认是 false 代表开启

    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
                SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

    原理是通过反射的方式设置 eventLoop绑定的selector中的 selectKeys属性 为 SelectedSelectionKeySet ,好处是不用 迭代  selector.selectedKeys()

    注入时机为初始化 EventLoop 的时候

     1 private SelectorTuple openSelector() {
     2         12      //注入逻辑40 
    41         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    42             @Override
    43             public Object run() {
    44                 try {
    45                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    46                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    47 
    48                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
    49                     if (cause != null) {
    50                         return cause;
    51                     }
    52                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
    53                     if (cause != null) {
    54                         return cause;
    55                     }
    56 
    57                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
    58                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
    59                     return null;
    60                 } catch (NoSuchFieldException e) {
    61                     return e;
    62                 } catch (IllegalAccessException e) {
    63                     return e;
    64                 }
    65             }
    66         });
    67 
    68         ........78     }

    处理读事件  主要在    processSelectedKey 中 ,分别对 读、写、连接事件进行了处理。

     
    private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
    //分别处理每个channel的事件 processSelectedKey(k, (AbstractNioChannel) a); }
    else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);           //处理了连接事件 unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
              
              //将要写入的buffer flush掉
              ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
             //回调 pipeline 上所有的 ChannelInboundHandler 的 fireChannelRead 和 channelReadComplete 函数 unsafe.read(); } }
    catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

    注意 : NioServerSocketChannel 和 NioSocketChannel 都是 同样的 处理逻辑, 不同的是  前者 只关注 OP_ACCEPT 和 OP_READ事件, 后者 关注  OP_READ、OP_WRITE、OP_CONNECT事件

    当NioServerSocketChannel 发生 OP_ACCEPT事件时 会 触发 AbstractNioChannel.NioUnsafe.read ->  NioSctpServerChannel.doReadMessages(List<Object>)  -> ServerBootstrapAcceptor.channelRead ,将受到的 NioSocketChannel 注册到 childEventLoop 。

  • 相关阅读:
    Find the Smallest K Elements in an Array
    Count of Smaller Number
    Number of Inversion Couple
    Delete False Elements
    Sort Array
    Tree Diameter
    Segment Tree Implementation
    Java Programming Mock Tests
    zz Morris Traversal方法遍历二叉树(非递归,不用栈,O(1)空间)
    Algorithm about SubArrays & SubStrings
  • 原文地址:https://www.cnblogs.com/ironroot/p/8575134.html
Copyright © 2011-2022 走看看