zoukankan      html  css  js  c++  java
  • Netty源码分析-NioEventLoop事件轮询

    NioEventLoop是Netty中用来接收客户端获取服务端请求的唯一入口,我们先来看一下NioEventLoop的构造函数

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                    SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                    EventLoopTaskQueueFactory queueFactory) {
       super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
             rejectedExecutionHandler);
       // 获取Selector对象的SelectorProvider对象
       this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
       // 作为Select选择策略以及一次select的个数
       this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
       // Selector的包装
       final SelectorTuple selectorTuple = openSelector();
       // Selector选择器,用来轮询IO事件
       this.selector = selectorTuple.selector;
       // 没有包装的原始Selector选择器
       this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                       boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                       RejectedExecutionHandler rejectedExecutionHandler) {
       super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
       // 尾部任务队列
       tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                           boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                           RejectedExecutionHandler rejectedHandler) {
       super(parent);
       // 新增任务时是否唤醒线程,默认为false
       this.addTaskWakesUp = addTaskWakesUp;
       // 最大任务等待数
       this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
       // 当前执行任务的线程池
       this.executor = ThreadExecutorMap.apply(executor, this);
       // 任务队列
       this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
       // 任务队列满时的异常处理器
       this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

    Netty是怎么实现一个Channel绑定一个线程,一个线程绑定多个Channel的呢?

    从上面的代码段可以看出,一个AbstractChannel会持有一个NioEventLoop对象,而一个NioEventLoop持有一个Selector对象。当使用SelectableChannel#register()API将Channel注册到Selector选择器中时,由于使用的是NioEventLoop中的Selector对象,所以当NioEventLoop进行轮询时,就可以只轮询当前线程绑定的所有Channel对象。

    接下来详细分析一下NioEventLoop#run()里的流程

    protected void run() {
       int selectCnt = 0;
       for (;;) {
           try {
               int strategy;
               try {
                   // 获取selector策略
                   strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                   switch (strategy) {
                       case SelectStrategy.CONTINUE:
                           continue;
                       case SelectStrategy.BUSY_WAIT:
                       case SelectStrategy.SELECT:
                           // 获取最近的定时任务开始时间,如果不存在定时任务,则为-1
                           long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                           if (curDeadlineNanos == -1L) {
                               curDeadlineNanos = NONE; // nothing on the calendar
                          }
                           nextWakeupNanos.set(curDeadlineNanos);
                           try {
                               if (!hasTasks()) {
                                   // 如果不存在异步任务,直接执行I/O事件轮询
                                   strategy = select(curDeadlineNanos);
                              }
                          } finally {
                               nextWakeupNanos.lazySet(AWAKE);
                          }
                       default:
                  }
              } catch (IOException e) {
                   // 当发生未知异常时,需重新生成一个新的Selector,替换老的Selector,并把注册到老的Selector上面的channel注册到新的Selector上去
                   rebuildSelector0();
                   selectCnt = 0;
                   handleLoopException(e);
                   continue;
              }
               // 省略部分代码
               if (ioRatio == 100) {
                   try {
                       if (strategy > 0) {
                           // 处理轮询到的I/O事件
                           processSelectedKeys();
                      }
                  } finally {
                       // 执行异步任务
                       ranTasks = runAllTasks();
                  }
              }
               // 省略部分代码
          } catch (CancelledKeyException e) {
               // 省略部分代码
          } catch (Throwable t) {
               handleLoopException(t);
          }
           // 省略部分代码
      }
    }

    获取SELECT策略时,需要用到一个hasTasks()方法,用来判断是否存在异步任务。如果存在异步任务,则会执行selector.selectNow()方法,该方法不会阻塞,接下来的switch就会跳到default分支。netty通过这种方式可以保证当存在异步任务时,优先执行异步任务。当不存在异步任务时,就会执行到selector.select()方法,在执行该方法之前,会通过nextScheduledTaskDeadlineNanos()方法获取最近的定时任务的开始时间curDeadlineNanos,如果不存在定时任务,则为Long类型的最大值。接下来会使用curDeadlineNanos作为select的超时时间,如果不存在定时任务,也就是curDeadlineNanos为Long的最大值,就会执行selector.select()方法,也就是不设置超时时间,直到有I/O事件为止才会返回。如果存在定时任务,则会通过定时任务的开始时间计算出select操作的超时时间。

    当有I/O事件响应时,则会通过processSelectedKeys()方法处理I/O事件,下面是代码段

    private void processSelectedKeys() {
       if (selectedKeys != null) {
           processSelectedKeysOptimized();
      } else {
           processSelectedKeysPlain(selector.selectedKeys());
      }
    }

    从代码可以看出,这里处理有两个分支,一个是处理优化过的SelectedKey,另一个分支是处理正常的SelectedKey,什么是优化过的key?这里就需要看一下selectedKeys变量的数据结构。跟踪代码可以看出,selectedKeys的数据类型是SelectedSelectionKeySetSelectedSelectionKeySet底层存储结构是用的数组,所以它的遍历效率比较高,原生的存储结构是HashSet,其实也就是HashMap,它的遍历效率没有数组高。当通过SelectorProvider#openSelector()方法获取一个selector时,这里有个特殊操作,就是一个优化开关,如果开关关闭,则使用原生的selectedKeys存储结构,也就是HashSet。如果开关打开,就会使用反射,将SelectorImpl类中的publicKeys与publicSelectedKeys字段替换成SelectedSelectionKeySet的存储类型。可以看下面的代码片段

    private SelectorTuple openSelector() {
       final Selector unwrappedSelector;
       try {
           // 获取一个Selector实例
           unwrappedSelector = provider.openSelector();
      } catch (IOException e) {
           throw new ChannelException("failed to open a new selector", e);
      }

       if (DISABLE_KEY_SET_OPTIMIZATION) {
           return new SelectorTuple(unwrappedSelector);
      }
    // 获取SelectorImpl的class
       Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
           @Override
           public Object run() {
               try {
                   return Class.forName(
                       "sun.nio.ch.SelectorImpl",
                       false,
                       PlatformDependent.getSystemClassLoader());
              } catch (Throwable cause) {
                   return cause;
              }
          }
      });
    // 省略部分代码
       final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
       final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
       Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
           @Override
           public Object run() {
               try {
                   // 反射获取selectedKey字段
                   Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                   Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    // 省略部分代码
                   Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                   if (cause != null) {
                       return cause;
                  }
                   cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                   if (cause != null) {
                       return cause;
                  }
                   // 反射方式设置selectedKeys为SelectedSelectionKeySet类型
                   selectedKeysField.set(unwrappedSelector, selectedKeySet);
                   // 反射方式设置publicSelectedKeys为SelectedSelectionKeySet类型
                   publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                   return null;
              } catch (NoSuchFieldException e) {
                   return e;
              } catch (IllegalAccessException e) {
                   return e;
              }
          }
      });
    // 省略部分代码
    }

    接下来再看一下processSelectedKeysOptimized()方法的具体逻辑

    private void processSelectedKeysOptimized() {
       for (int i = 0; i < selectedKeys.size; ++i) {
           final SelectionKey k = selectedKeys.keys[i];
           selectedKeys.keys[i] = null;
           // 获取I/O事件类型
           final Object a = k.attachment();
           if (a instanceof AbstractNioChannel) {
               // 处理I/O事件
               processSelectedKey(k, (AbstractNioChannel) a);
          } else {
               @SuppressWarnings("unchecked")
               NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
               // 处理异步任务
               processSelectedKey(k, task);
          }
    // 是否需要重新轮询
           if (needsToSelectAgain) {
               // 将i+1之前的I/O事件都置为空
               selectedKeys.reset(i + 1);
               //重新轮询
               selectAgain();
               // 将i置为-1,接下来会执行一次i++,然后i又会从0开始执行I/O事件
               i = -1;
          }
      }
    }

    这里开始处理I/O事件,大家还记得将channel注册到selector时,可以填写一个attach参数。在这就可以通过attachment()方法获取事件类型,如果是AbstractNioChannel类型,则处理I/O事件,否则处理异步任务。接下来有一个needsToSelectAgain标识,表示是否需要重新轮询一次I/O事件,为什么会有这个标识呢?我们先跟踪一下这个标识是在哪里写入的,最终发现,当channel从Selector取消注册超过256次时,就会将这个标志位置为true。

    接下来继续看处理I/O事件的详细逻辑processSelectedKey()

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
           final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
           // 省略部分代码
           try {
               int readyOps = k.readyOps();
               // 处理连接事件
               if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                   int ops = k.interestOps();
                   ops &= ~SelectionKey.OP_CONNECT;
                   k.interestOps(ops);
                   // 在ChinnelPipeline上传递Connect事件
                   unsafe.finishConnect();
              }

               // 处理可写事件
               if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                   ch.unsafe().forceFlush();
              }

               // 处理客户端的读事件与接收事件
               if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                   // 在ChinnelPipeline上传递Read事件
                   unsafe.read();
              }
          } catch (CancelledKeyException ignored) {
               unsafe.close(unsafe.voidPromise());
          }
      }

    Connect事件的传递比较简单,感兴趣的同学可以自己去看一下源码,这里我们主要看一下读写事件。

    首先看一下写事件,从下面的代码可以看出,主要是把buffer里的数据刷新到socket缓冲区,然后发送到客户端

    protected void flush0() {
       if (inFlush0) {
           // Avoid re-entrance
           return;
      }
       final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
       if (outboundBuffer == null || outboundBuffer.isEmpty()) {
           return;
      }
       inFlush0 = true;
    // 省略部分代码
       try {
           // 真正的将数据写到客户端,底层会调用Nio的SocketChannel#write()方法
           doWrite(outboundBuffer);
      } catch (Throwable t) {
          // 省略部分代码
      } finally {
           inFlush0 = false;
      }
    }

    下面我们再来看一下Netty是怎么处理读事件的

    public void read() {
       assert eventLoop().inEventLoop();
       final ChannelConfig config = config();
       final ChannelPipeline pipeline = pipeline();
       // 获取一个netty自己的内存分配器
       final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
       allocHandle.reset(config);
       boolean closed = false;
       Throwable exception = null;
       try {
           try {
               do {
                   // 将数据读取到readBuf中
                   int localRead = doReadMessages(readBuf);
                   if (localRead == 0) {
                       break;
                  }
                   if (localRead < 0) {
                       closed = true;
                       break;
                  }
    // 增加读的次数
                   allocHandle.incMessagesRead(localRead);
              } while (allocHandle.continueReading());
          } catch (Throwable t) {
               exception = t;
          }

           int size = readBuf.size();
           for (int i = 0; i < size; i ++) {
               readPending = false;
               // 在ChannelPipeline上传播read事件
               pipeline.fireChannelRead(readBuf.get(i));
          }
           // 清空读的缓冲区
           readBuf.clear();
           // 读取完成,记录读取总的字节数
           allocHandle.readComplete();
           // 传播一次读取完成的事件,一次读取完成可能包含多个SocketChannel
           pipeline.fireChannelReadComplete();

           if (exception != null) {
               closed = closeOnReadError(exception);

               pipeline.fireExceptionCaught(exception);
          }

           if (closed) {
               inputShutdown = true;
               if (isOpen()) {
                   close(voidPromise());
              }
          }
      } finally {
           if (!readPending && !config.isAutoRead()) {
               // 移除read 事件
               removeReadOp();
          }
      }
    }

    当Netty处理读事件时,首先会获取一个内存分配处理器,读取消息到分配的内存里(Netty内存分配器后面分析),然后在ChannelPipeline上传播读事件,最终会在channel上移除read事件。

    最后我们来看一下Netty是怎么处理异步任务的

    protected boolean runAllTasks(long timeoutNanos) {
       // 从定时任务队列中取出任务添加到普通任务队列中
       fetchFromScheduledTaskQueue();
       // 从任务列表中取出一个任务
       Runnable task = pollTask();
       if (task == null) {
           // 如果任务为空,表示没有普通任务可执行,直接执行tailTask任务队列中的任务
           afterRunningAllTasks();
           return false;
      }
    // 计算执行任务的截止时间(相对时间,当前时间减去服务启动时间再加上超时时间)
       final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
       long runTasks = 0;
       long lastExecutionTime;
       for (;;) {
           // 安全的执行任务
           safeExecute(task);
           runTasks ++;
           // 执行任务个数和0x3f(0011 1111)进行与运算,这里指的是当任务个数为64个时,进行一次截止时间判断
           if ((runTasks & 0x3F) == 0) {
               lastExecutionTime = ScheduledFutureTask.nanoTime();
               // 如果当前时间大于了截止时间,则中断执行任务,这是为了把时间留出一部分来执行I/O事件
               if (lastExecutionTime >= deadline) {
                   break;
              }
          }
           task = pollTask();
           // 如果任务为空,终止循环
           if (task == null) {
               lastExecutionTime = ScheduledFutureTask.nanoTime();
               break;
          }
      }
       // 最后执行尾部任务队列中的任务
       afterRunningAllTasks();
       this.lastExecutionTime = lastExecutionTime;
       return true;
    }

    首选执行异步任务有一个超时,通过这个超时时间来计算任务可以执行多久。首先netty会把定时任务合并到普通任务队列中,然后判断这个任务队列中是否有任务,如果没有任务,则先执行尾部任务队列,然后提前返回。如果有任务,则会计算任务的执行截止时间,每当任务执行了64个时,都会判断一下当前时间是否大于截止时间,如果大于了截止时间,则停止执行普通任务,然后再执行尾部任务队列。

  • 相关阅读:
    WCF中的序列化[下篇]
    WCF中的序列化[上篇]
    SET TRANSACTION ISOLATION LEVEL 详解
    深入探讨数据仓库建模与ETL的实践技巧
    用SQL语句添加删除修改字段等操作
    WCF数据契约与序列化
    在SQL Server中使用检查约束来验证数据
    C#线程同步的几种方法[转]
    Linq的Join用法
    测试wlm代码高亮插件
  • 原文地址:https://www.cnblogs.com/jhbbd/p/14310324.html
Copyright © 2011-2022 走看看