zoukankan      html  css  js  c++  java
  • NioEventLoopGroup

    NioEventLoopGroup

    服务端demo

     1 public class MyNettyServer {
     2     public static void main(String[] args) throws Exception {
     3         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
     4         EventLoopGroup workerGroup = new NioEventLoopGroup();
     5 
     6         try {
     7             ServerBootstrap bootstrap = new ServerBootstrap();
     8             bootstrap.group(bossGroup, workerGroup)
     9                     .channel(NioServerSocketChannel.class)
    10                     .childHandler(new ChannelInitializer<SocketChannel>() {
    11                         @Override
    12                         protected void initChannel(SocketChannel ch) throws Exception {
    13                             ch.pipeline().addLast(new ServerHandler());
    14                         }
    15                     });
    16             ChannelFuture cf = bootstrap.bind(8888).sync();
    17             cf.addListener((ChannelFutureListener) future -> {
    18                 if (cf.isSuccess()) {
    19                     System.out.println("监听端口 8888 成功");
    20                 } else {
    21                     System.out.println("监听端口 8888 失败");
    22                 }
    23             });
    24             cf.channel().closeFuture().sync();
    25         } finally {
    26             bossGroup.shutdownGracefully();
    27             workerGroup.shutdownGracefully();
    28         }
    29     }
    30 }

    业务处理类

     1 public class ServerHandler extends ChannelInboundHandlerAdapter {
     2     @Override
     3     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     4         System.out.println("msg:" + msg);
     5         ctx.writeAndFlush("hello world");
     6     }
     7 
     8     @Override
     9     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    10         System.out.println("error:" + cause.getMessage());
    11         ctx.close();
    12     }
    13 }

    流程图

    配置线程池

    Netty 是采用 Reactor 模型进行开发的,可以非常容易切换三种 Reactor 模式:单线程模式、多线程模式、主从多线程模式。

    单线程模式

     

    Reactor 单线程模型所有 I/O 操作(包括连接建立、数据读写、事件分发等),都由一个线程完成,所以只需要启动一个 EventLoopGroup 即可。

    EventLoopGroup group = new NioEventLoopGroup(1);
    
    ServerBootstrap b = new ServerBootstrap();
    
    b.group(group)

    单线程模型逻辑简单,缺陷也十分明显:

    • 一个线程支持处理的连接数非常有限,CPU 很容易打满,性能方面有明显瓶颈;
    • 当多个事件被同时触发时,只要有一个事件没有处理完,其他后面的事件就无法执行,这就会造成消息积压及请求超时;
    • 线程在处理 I/O 事件时,Select 无法同时处理连接建立、事件分发等操作;
    • 如果 I/O 线程一直处于满负荷状态,很可能造成服务端节点不可用。

    多线程模式

     

    Reactor 单线程模型有非常严重的性能瓶颈,因此 Reactor 多线程模型出现了。Reactor 多线程模型将业务逻辑交给多个线程进行处理。除此之外,多线程模型其他的操作与单线程模型是类似的,例如读取数据依然保留了串行化的设计。当客户端有数据发送至服务端时,Select 会监听到可读事件,数据读取完毕后提交到业务线程池中并发处理。在 Netty 中使用 Reactor 多线程模型与单线程模型非常相似,区别是 NioEventLoopGroup 可以不需要任何参数,它默认会启动 2 倍 CPU 核数的线程。当然,你也可以自己手动设置固定的线程数。

    EventLoopGroup group = new NioEventLoopGroup();
    
    ServerBootstrap b = new ServerBootstrap();
    
    b.group(group)

    主从多线程模式

     主从多线程模型由多个 Reactor 线程组成,每个 Reactor 线程都有独立的 Selector 对象。MainReactor 仅负责处理客户端连接的 Accept 事件,连接建立成功后将新创建的连接对象注册至 SubReactor。再由 SubReactor 分配线程池中的 I/O 线程与其连接绑定,它将负责连接生命周期内所有的 I/O 事件。在大多数场景下,我们采用的都是主从多线程 Reactor 模型。

    Netty 推荐使用主从多线程模型,Boss 是主 Reactor,Worker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup,主 Reactor 负责处理 Accept,然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    ServerBootstrap b = new ServerBootstrap();
    
    b.group(bossGroup, workerGroup)

    Netty 线程模型的可定制化程度很高。它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型,而且无需变更其他的代码,很大程度上降低了用户开发和调试的成本。

    Reactor 线程模型运行机制的四个步骤,分别为连接注册、事件轮询、事件分发、任务处理,如下图所示。

    • 连接注册:Channel 建立后,注册至 Reactor 线程中的 Selector 选择器。
    • 事件轮询:轮询 Selector 选择器中已注册的所有 Channel 的 I/O 事件。
    • 事件分发:为准备就绪的 I/O 事件分配相应的处理线程。
    • 任务处理:Reactor 线程还负责任务队列中的非 I/O 任务,每个 Worker 线程从各自维护的任务队列中取出任务异步执行。

    netty组件 

    Netty中主要组件包括:

    • - Channel:代表了一个链接,与EventLoop一起用来参与IO处理。
    • - ChannelHandler:为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
    • - ChannelPipeline:提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。
    • - EventLoop:Channel处理IO操作,一个EventLoop可以为多个Channel服务。
    • - EventLoopGroup:会包含多个EventLoop。

    上述组件的关系结构如下图所示:

    • 一个 EventLoopGroup 包含一个或者多个 EventLoop;
    • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
    • 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
    • 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
    • 一个 EventLoop 可能会被分配给一个或多个 Channel。

    组件说明
    Channel
    在Java中(Socket类),基本的 I/O 操作(bind()、 connect()、 read()和 write())依赖于底层网络传输所提供的功能。 Netty 的 Channel 接口所提供的 API降低了直接使用 Socket 类的复杂性。channel是Netty网络通信的组件,用于网络IO操作。通过Channel可以获得当前王略连接的通道的状态与网络配置参数。

    Netty中也提供了使用多种方式连接的Channel:

    • - EmbeddedChannel;
    • - LocalServerChannel;
    • - NioDatagramChannel;
    • - NioSctpChannel;
    • - NioServerSocketChannel;
    • - NioSocketChannel。

    channel生命周期

    • ChannelUnregistered :Channel 已经被创建,但还未注册EventLoop
    • ChannelRegistered :Channel 已经被注册到了 EventLoop
    • ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
    • ChannelInactive :Channel 没有连接到远程节点

    当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline中的 ChannelHandler,其可以随后对它们做出响应。

    EventLoop&EventLoopGroup

     

    NioEventLoopGroup可以理解为线程池,NioEventLoop理解为一个线程,每个EventLoop对应一个Selector和任务队列 taskQueue,负责处理多个Channel上的事件以及普通任务和定时任务。EventLoop主要用于处理连接的生命周期中所发生的事件,它实现了Netty的线程模型部分的功能。EventLoop充当任务调度丶线程管理丶线程分配的重要对象。

    EventLoop 通用的运行模式。每当事件发生时,应用程序都会将产生的事件放入事件队列当中,然后 EventLoop 会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行。事件执行的方式通常分为立即执行、延后执行、定期执行几种。

    EventLoop具有任务调度,充当线程池的作用,一个 EventLoop 将由一个永远都不会改变的 Thread 驱动.根据配置和可用核心的不同,可能会创建多个 EventLoop 实例用以优化资源的使用,且单个 EventLoop可指派用于服务多个 Channel(处理多个网络连接)。

    Netty 的 EventLoop 在继承了ScheduledExecutorService,可调度一个任务以便稍后(延迟)执行或者周期性地执行。
    比如,想要注册一个在客户端已经连接了 5 分钟之后触发的任务。一个常见的做法是,发送心跳消息到远程节点,检查连接是否还活着。如果没有响应,你便知道可以关闭该 Channel了。

    线程管理

    在内部,提交任务,如果(当前)调用线程正是支撑 EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当 EventLoop 下次处理它的事件时,它会执行队列中的那些任务/事件。

     

    线程分配

    服务于 Channel 的 I/O 和事件的 EventLoop 则包含在 EventLoopGroup 中。

    IO多路复用:在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来支撑大量的 Channel,而不是每个 Channel 分配一个 Thread。

    分配EventLoop:EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。

    线程安全:一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的 Thread)。请牢记这一点,因为它可以使你从担忧你的ChannelHandler 实现中的线程安全和同步问题中解脱出来。

    注意:需要注意,EventLoop对 ThreadLocal的使用的影响。因为一个 EventLoop通常会用于支撑多个 Channel.

    所以对于所有相关联的 Channel 来说若使用它来实现状态追踪则会有线程安全问题。但是在一些无状态的上下文中,它仍然可以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。


    ChannelHandler&ChannelPipeline
    ChannelHandler

    对于开发一个Netty的应用而言,主要开发的组件可能就是 ChannelHandler, 它主要负责处理接收和发送数据的的业务逻辑,根据数据流向的不同,ChannelHandler可以分为ChannelInboundHandler以及ChannelOutboundHandler。从一个客户端应用程序的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为Outbound的,反之则称为Inbound的。需要注意的是对于inbound操作而言,处理的顺序为从头到尾,outbound的处理顺序为从尾到头。Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。

    1、ChannelInboundHandler:对接收的信息进行处理。一般用来执行解码、读取客户端数据、进行业务处理等。如ByteToMessageDecoder;

    主要方法如下:

    • channelRegistered 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
    • channelUnregistered 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
    • channelActive 当 Channel 处于活动状态,可以被读写时被调用;Channel 已经连接/绑定并且已经就绪
    • channelInactive 当 Channel 离开活动状态并且不再连接它的远程节点时被调用
    • channelReadComplete 当 Channel 上的一个读操作完成时被调用
    • channelRead 当从 Channel 读取数据时被调用

    2、ChannelOutboundHandler:对发送的信息进行处理,一般用来进行编码、发送报文到客户端。如MessageToByteEncoder;

    主要方法如下:

    • bind(ChannelHandlerContext,SocketAddress,ChannelPromise)当请求将 Channel 绑定到本地地址时被调用
    • connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当请求将 Channel 连接到远程节点时被调用
    • disconnect(ChannelHandlerContext,ChannelPromise)当请求将 Channel 从远程节点断开时被调用
    • close(ChannelHandlerContext,ChannelPromise) 当请求关闭 Channel 时被调用
    • deregister(ChannelHandlerContext,ChannelPromise)当请求将 Channel 从它的 EventLoop 注销时被调用
    • read(ChannelHandlerContext) 当请求从 Channel 读取更多的数据时被调用
    • flush(ChannelHandlerContext) 当请求通过 Channel 将入队数据冲刷到远程节点时被调用
    • write(ChannelHandlerContext,Object,ChannelPromise) 当请求通过 Channel 将数据写到远程节点时被调用。

    当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 直接冲刷数据然后输出到对端,即调用writeAndFlush或flush时则会经过出站处理器(常实现channelOutbountHandler子接口)

    而ChannelPipeline为ChannelHandler链提供了容器。ChannelHandler在ChannelPipeline里工作,执行顺序是由它们被添加的顺序来决定的,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler。

    入站运动:如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结束了。

    出站运动:数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,这里显示为 Socket。通常情况下,这将触发一个写操作。

    ChannelPipeline 的双向链表分别维护了 HeadContext 和 TailContext 的头尾节点。我们自定义的 ChannelHandler 会插入到 Head 和 Tail 之间,这两个节点在 Netty 中已经默认实现了,它们在 ChannelPipeline 中起到了至关重要的作用。

    HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler 和 ChannelOutboundHandler。网络数据写入操作的入口就是由 HeadContext 节点完成的。HeadContext 作为 Pipeline 的头结点负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过 Outbound 处理器,最终传递到 HeadContext,所以 HeadContext 又是处理 Outbound 事件的最后一站。此外 HeadContext 在传递事件之前,还会执行一些前置操作。

    TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 Inbound 事件传播,例如释放 Message 数据资源等。TailContext 节点作为 OutBound 事件传播的第一站,仅仅是将 OutBound 事件传递给上一个节点。

    从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。

    当 ChannelHandler 被添加到 ChannelPipeline 时,它将会被分配一个 ChannelHandlerContext,其代表了 ChannelHandler 和 ChannelPipeline 之间的绑定。虽然这个对象可以被用于获取底层的 Channel,但是它主要还是被用于写出站数据。

    在 Netty 中,有两种发送消息的方式。你可以直接写到 Channel 中,也可以 写到和 ChannelHandler相关联的ChannelHandlerContext对象中。前一种方式将会导致消息从ChannelPipeline 的尾端开始流动,而后者将导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动。

    主要方法如下:

    • addFirst、addBefore、addAfter、addLast将一个 ChannelHandler 添加到 ChannelPipeline 中
    • remove 将一个 ChannelHandler 从 ChannelPipeline 中移除
    • replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
    • get 通过类型或者名称返回 ChannelHandler
    • context 返回和 ChannelHandler 绑定的 ChannelHandlerContext
    • names 返回 ChannelPipeline 中所有 ChannelHandler 的名称

    ChannelHandler的类结构

    首先要说明的是ChannelInboundHandle 和ChannelOutboundHandle 都继承自ChannelHandler,将两个类别的 ChannelHandler都混合添加到同一个 ChannelPipeline 中时,Netty 能区分 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个 ChannelHandler 之间传递。

    对于ChannelHandler的实现类而言,可能不需要关注事件处理周期的每个环节,如果要把Inbound或是Outboud接口的每个方法都实现,就会额外的带来很多的工作量,Netty对于该种情况提供了几种Adapter的解决方案:

    ChannelHandlerAdapter
    ChannelInboundHandlerAdapter
    ChannelOutboundHandlerAdapter
    ChannelDuplexHandler

     ChannelHandler生命周期

    接口 ChannelHandler 定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从 ChannelPipeline 中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext 参数。

    • handlerAdded: 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
    • handlerRemoved: 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
    • exceptionCaught: 当处理过程中在 ChannelPipeline 中有错误产生时被调用

    ChannelHandler异常传播机制

    ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。如果有一个节点处理逻辑出现异常,会将异常按顺序从 Head 节点传播到 Tail 节点。如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理。虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。我们应该在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器。即在 Tail 节点的前一个处理器添加。异常事件的处理顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

    ChannelPipeline

    ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。每个Channel都有且只有一个ChannelPipeline与之对应。

    ChannelHandler 安装到 ChannelPipeline 中的过程如下所示:
    1. 一个ChannelInitializer被注册到了ServerBootstrap中;
    2. 当 ChannelInitializer.initChannel()方法被调用时, ChannelInitializer
    将在 ChannelPipeline 中安装一组自定义的 ChannelHandler;
    3. ChannelInitializer 将它自己从 ChannelPipeline 中移除

    一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。

    ChannelHandlerContext

    通道处理器上下文接口,保存Channel相关的所有上下文信息,同时关联一个ChannelHandler。ChannelHandlerContext 则包含了 ChannelHandler 生命周期的所有事件,如 connect、bind、read、flush、write、close 等。可以试想一下,如果没有 ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候,前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。这样虽然能解决问题,但是代码结构的耦合,会非常不优雅。

     

    Bootstrap与ServerBootstrap

    Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的端口,或者将一个进程连接到另一个运行在某个指定主机的指定端口上的进程。

    有两种类型的引导:
    1. 用于客户端(Bootstrap)
    2. 用于服务器(ServerBootstrap)。

    类继承关系:

     

    需要注意的是,引导一个客户端只需要一个 EventLoopGroup,但是一个ServerBootstrap 则需要两个(也可以是同一个实例)

    第一个EventLoopGroup用来专门负责绑定到端口监听连接事件,而把第二个EventLoopGroup用来处理每个接收到的连接。

    对于Server端,如果仅由一个EventLoopGroup处理所有请求和连接的话,在并发量很大的情况下,这个EventLoopGroup有可能会忙于处理已经接收到的连接而不能及时处理新的连接请求,用两个的话,会有专门的线程来处理连接请求,不会导致请求超时的情况,大大提高了并发处理能力。

    最佳实践方案

    1. 网络连接建立过程中三次握手、安全认证的过程会消耗不少时间。这里建议采用 Boss 和 Worker 两个 EventLoopGroup,有助于分担 Reactor 线程的压力。
    2. 由于 Reactor 线程模式适合处理耗时短的任务场景,对于耗时较长的 ChannelHandler 可以考虑维护一个业务线程池,将编解码后的数据封装成 Task 进行异步处理,避免 ChannelHandler 阻塞而造成 EventLoop 不可用。
    3. 如果业务逻辑执行时间较短,建议直接在 ChannelHandler 中执行。例如编解码操作,这样可以避免过度设计而造成架构的复杂性。
    4. 不宜设计过多的 ChannelHandler。对于系统性能和可维护性都会存在问题,在设计业务架构的时候,需要明确业务分层和 Netty 分层之间的界限。不要一味地将业务逻辑都添加到 ChannelHandler 中。

    NioEventLoopGroup

    构造方法

     1    public NioEventLoopGroup() {
     2         this(0);
     3     }
     4 
     5     public NioEventLoopGroup(int nThreads) {
     6         this(nThreads, (Executor) null);
     7     }
     8 
     9     public NioEventLoopGroup(ThreadFactory threadFactory) {
    10         this(0, threadFactory, SelectorProvider.provider());//选择器提供器
    11     }
    12     
    13     ......
    14     
    15     public NioEventLoopGroup(
    16             int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    17         this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);//选择器的策略工厂
    18     }
    19 
    20     public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
    21         final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    22         super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());//拒绝处理器,跟线程池一样的,饱和了要拒绝
    23     }
    24 
    25     public NioEventLoopGroup(
    26             int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    27         this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    28     }
    29 
    30     public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
    31                              final SelectStrategyFactory selectStrategyFactory) {
    32         super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    33     }
    34 
    35     public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
    36                              final SelectorProvider selectorProvider,
    37                              final SelectStrategyFactory selectStrategyFactory) {
    38         super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
    39                 RejectedExecutionHandlers.reject());
    40     }
    41 
    42     public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
    43                              final SelectorProvider selectorProvider,
    44                              final SelectStrategyFactory selectStrategyFactory,
    45                              final RejectedExecutionHandler rejectedExecutionHandler) {
    46         super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    47     }
    48 
    49     public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
    50                              final SelectorProvider selectorProvider,
    51                              final SelectStrategyFactory selectStrategyFactory,
    52                              final RejectedExecutionHandler rejectedExecutionHandler,
    53                              final EventLoopTaskQueueFactory taskQueueFactory) {
    54         super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
    55                 rejectedExecutionHandler, taskQueueFactory);
    56     }

    追踪构造方法,调用了父类MultithreadEventLoopGroup的构造方法

     1     protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     2         super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
     3     }//如果默认不传是0,他会用CPU核数*2
     4     
     5     //静态代码块
     6     static {
     7         DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
     8                 "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
     9 
    10         if (logger.isDebugEnabled()) {
    11             logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    12         }
    13     }

    继续追踪,调用父类MultithreadEventExecutorGroup的构造方法

     1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
     2         this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
     3     }//这里又创建了执行器选择工厂,也可以说是负载均衡吧,这个就是说如何选择执行器来做事,默认是可以从头到尾轮着来,就是取模
     4     
     5     protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
     6                                             EventExecutorChooserFactory chooserFactory, Object... args) {
     7         if (nThreads <= 0) {
     8             throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
     9         }
    10 
    11         if (executor == null) {
    12             executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    13         }
    14 
    15         children = new EventExecutor[nThreads];
    16 
    17         for (int i = 0; i < nThreads; i ++) {
    18             boolean success = false;
    19             try {
    20                 children[i] = newChild(executor, args);
    21                 success = true;
    22             } catch (Exception e) {
    23                 // TODO: Think about if this is a good exception type
    24                 throw new IllegalStateException("failed to create a child event loop", e);
    25             } finally {
    26                 if (!success) {
    27                     for (int j = 0; j < i; j ++) {
    28                         children[j].shutdownGracefully();
    29                     }
    30 
    31                     for (int j = 0; j < i; j ++) {
    32                         EventExecutor e = children[j];
    33                         try {
    34                             while (!e.isTerminated()) {
    35                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    36                             }
    37                         } catch (InterruptedException interrupted) {
    38                             // Let the caller handle the interruption.
    39                             Thread.currentThread().interrupt();
    40                             break;
    41                         }
    42                     }
    43                 }
    44             }
    45         }
    46 
    47         chooser = chooserFactory.newChooser(children);
    48 
    49         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
    50             @Override
    51             public void operationComplete(Future<Object> future) throws Exception {
    52                 if (terminatedChildren.incrementAndGet() == children.length) {
    53                     terminationFuture.setSuccess(null);
    54                 }
    55             }
    56         };
    57 
    58         for (EventExecutor e: children) {
    59             e.terminationFuture().addListener(terminationListener);
    60         }
    61 
    62         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    63         Collections.addAll(childrenSet, children);
    64         readonlyChildren = Collections.unmodifiableSet(childrenSet);//创建不可变集合
    65     }

    newDefaultThreadFactory()

    调用MultithreadEventLoopGroup的newDefaultThreadFactory,创建线程工厂,跟线程池的线程工厂一样

     1     @Override
     2     protected ThreadFactory newDefaultThreadFactory() {
     3         return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
     4     }
     5     
     6     //最终调用
     7     public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
     8         ObjectUtil.checkNotNull(poolName, "poolName");
     9 
    10         if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
    11             throw new IllegalArgumentException(
    12                     "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
    13         }
    14 
    15         prefix = poolName + '-' + poolId.incrementAndGet() + '-';
    16         this.daemon = daemon;
    17         this.priority = priority;
    18         this.threadGroup = threadGroup;
    19     }

    debug,发现是nioEventLoopGroup-2-,因为1已经在MultithreadEventExecutorGroup的变量初始化时使用了,有个叫全局事件执行器GlobalEventExecutor的变量要初始化,根据类加载的流程,他会在构造函数之前初始化的

     1     //MultithreadEventExecutorGroup类
     2     private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
     3         
     4     //GlobalEventExecutor类
     5     public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
     6 
     7     private GlobalEventExecutor() {
     8         scheduledTaskQueue().add(quietPeriodTask);
     9         threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
    10                 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
    11     }

    ThreadPerTaskExecutor(ThreadFactory threadFactory)

    设置了一个线程工厂,有任务就创建一个线程执行

     1     public final class ThreadPerTaskExecutor implements Executor {
     2     private final ThreadFactory threadFactory;
     3 
     4     public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
     5         this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
     6     }
     7 
     8     @Override
     9     public void execute(Runnable command) {
    10         threadFactory.newThread(command).start();
    11     }
    12 }

     

    初始化事件执行器

    根据nThreads创建相应个数的EventExecutor

     1     ...
     2      children = new EventExecutor[nThreads];
     3 
     4         for (int i = 0; i < nThreads; i ++) {
     5             boolean success = false;
     6             try {
     7                 children[i] = newChild(executor, args);
     8                 success = true;
     9             } catch (Exception e) {
    10                 // TODO: Think about if this is a good exception type
    11                 throw new IllegalStateException("failed to create a child event loop", e);
    12             } finally {
    13                 ...
    14             }
    15         }

    newChild(executor, args)

    调用了NioEventLoopGroup的newChild方法

    1     @Override
    2     protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    3         EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    4         return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    5             ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    6     }

    判断参数是否有四个,有的话就拿出第4个参数EventLoopTaskQueueFactory类型,否则返回null

    返回实例化对象NioEventLoop

     1    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
     2     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
     3     EventLoopTaskQueueFactory queueFactory) {
     4       super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
     5       rejectedExecutionHandler);
     6       this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");//选择器提供器
     7       this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");//选择器策略,有可能可以推迟select方法而先去执行任务
     8       final SelectorTuple selectorTuple = openSelector();
     9       this.selector = selectorTuple.selector;//包装后的选择器
    10       this.unwrappedSelector = selectorTuple.unwrappedSelector;//原始NIO的选择器
    11     }

    newTaskQueue(queueFactory)

     1     private static Queue<Runnable> newTaskQueue(
     2             EventLoopTaskQueueFactory queueFactory) {
     3         if (queueFactory == null) {
     4             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
     5         }
     6         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
     7     }
     8     
     9     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    10         // This event loop never calls takeTask()
    11         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
    12                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    13     }

    PlatformDependent类就是根据不同的操作系统创建不同的数据,创建的是jctools.queues包下的,是个高性能队列,里面比较复杂,是AbstractQueue得子类,暂时知道是个队列就好。

    NioEventLoop父类SingleThreadEventLoop构造方法

    1   protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
    2                                     boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
    3                                     RejectedExecutionHandler rejectedExecutionHandler) {
    4         super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    5         tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");//尾队列
    6     }

    SingleThreadEventLoop的父类SingleThreadEventExecutor构造方法

     1   protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
     2                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
     3                                         RejectedExecutionHandler rejectedHandler) {
     4         super(parent);
     5         this.addTaskWakesUp = addTaskWakesUp;
     6         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
     7         this.executor = ThreadExecutorMap.apply(executor, this);//包装过的执行器
     8         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");//任务队列
     9         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");//拒绝策略
    10     }

    最终调用AbstractEventExecutor构造方法

    1   protected AbstractEventExecutor(EventExecutorGroup parent) {
    2         this.parent = parent;
    3     }

    SingleThreadEventExecutor构造函数中

    1 this.executor = ThreadExecutorMap.apply(executor, this); //this指代NioEventLoop

    里面把刚才创建好的ThreadPerTaskExecutor和NioEventLoop包装了下,返回是ThreadExecutorMap内部匿名对象executor

    只是里面是用ThreadPerTaskExecutor来执行任务

     1   public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
     2         ObjectUtil.checkNotNull(executor, "executor");
     3         ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
     4         return new Executor() {
     5             @Override
     6             public void execute(final Runnable command) {
     7                 executor.execute(apply(command, eventExecutor));
     8             }
     9         };
    10     }

    里面还有一层apply,任务真正运行之前会设置setCurrentEventExecutor当前的eventExecutor也就是NioEventLoop,里面用了ThreadLocal,当前线程独有的,任务运行完了就设置空了

     1     public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
     2         ObjectUtil.checkNotNull(command, "command");
     3         ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
     4         return new Runnable() {
     5             @Override
     6             public void run() {
     7                 setCurrentEventExecutor(eventExecutor);
     8                 try {
     9                     command.run();
    10                 } finally {
    11                     setCurrentEventExecutor(null);
    12                 }
    13             }
    14         };
    15     }

    现在我们知道SingleThreadEventExecutor构造函数设置了执行器ThreadExecutorMap 内部executor,里面是封装了ThreadPerTaskExecutorNioEventLoop

    设置了任务队列taskQueue,拒绝处理器rejectedExecutionHandler。而SingleThreadEventLoop构造函数设置了tailTasks,这个后面会讲,也就是前面创建的两个队列中的一个。

    最终回到NioEventLoop构造函数设置了选择器提供器provider,选择器的策略selectStrategy,通过openSelector()方法获得SelectorTuple

    里面含有未包装的获得包装的unwrappedSelector和包装的SelectedSelectionKeySetSelector

    chooserFactory.newChooser(children)

    选择执行器,默认从头到尾往复循环

     1   @Override
     2     public EventExecutorChooser newChooser(EventExecutor[] executors) {
     3         if (isPowerOfTwo(executors.length)) {//根据执行器的长度是否是2的幂次选择不同的计算方式
     4             return new PowerOfTwoEventExecutorChooser(executors);
     5         } else {
     6             return new GenericEventExecutorChooser(executors);
     7         }
     8     }
     9     
    10     private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    11         private final AtomicInteger idx = new AtomicInteger();
    12         private final EventExecutor[] executors;
    13 
    14         PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
    15             this.executors = executors;
    16         }
    17 
    18         @Override
    19         public EventExecutor next() {
    20             return executors[idx.getAndIncrement() & executors.length - 1];
    21         }
    22     }
    23 
    24     private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    25         // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
    26         // The 64-bit long solves this by placing the overflow so far into the future, that no system
    27         // will encounter this in practice.
    28         private final AtomicLong idx = new AtomicLong();
    29         private final EventExecutor[] executors;
    30 
    31         GenericEventExecutorChooser(EventExecutor[] executors) {
    32             this.executors = executors;
    33         }
    34 
    35         @Override
    36         public EventExecutor next() {
    37             return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    38         }
    39     }

    terminationListener设置终止监听器

     1     final FutureListener<Object> terminationListener = new FutureListener<Object>() {
     2             @Override
     3             public void operationComplete(Future<Object> future) throws Exception {
     4                 if (terminatedChildren.incrementAndGet() == children.length) {
     5                     terminationFuture.setSuccess(null);
     6                 }
     7             }
     8         };
     9 
    10         for (EventExecutor e: children) {//添加终止事件
    11             e.terminationFuture().addListener(terminationListener);
    12         }

    至此,NioEventLoopGroup初始化完成。

  • 相关阅读:
    Centos系统下载
    Centos7 安装 docker
    Centos6 安装 docker
    yum提示Another app is currently holding the yum lock; waiting for it to exit...
    Oracle 时间字段显示不正确,类型错误
    Android 报错:error: too many padding sections on bottom border
    vs文件上传失败--超过最大字符限制
    ButterKnife 牛油刀使用
    VS项目发布到本地IIS
    Visual Studio无法调试
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/14684433.html
Copyright © 2011-2022 走看看