zoukankan      html  css  js  c++  java
  • Netty5服务端源码解析

    Netty5源码解析

    今天让我来总结下netty5的服务端代码。

    1. 服务端(ServerBootstrap)
      示例代码如下:

      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelOption;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      
      /**
       * Created by yaojiafeng on 16/1/17.
       */
      public class SimpleServer {
      
          public void bind(int port) throws Exception {
              // 配置服务端的NIO线程组
              EventLoopGroup bossGroup = new NioEventLoopGroup(1);
              EventLoopGroup workerGroup = new NioEventLoopGroup(1);
              try {
                  ServerBootstrap b = new ServerBootstrap();
                  b.group(bossGroup, workerGroup)
                          .channel(NioServerSocketChannel.class)
                          .option(ChannelOption.SO_BACKLOG, 1024)
                          .childHandler(new ChildChannelHandler());
                  // 绑定端口,同步等待成功
                  ChannelFuture f = b.bind(port).sync();
      
                  // 等待服务端监听端口关闭
                  f.channel().closeFuture().sync();
              } finally {
                  // 优雅退出,释放线程池资源
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      
          private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
              @Override
              protected void initChannel(SocketChannel arg0) throws Exception {
                  arg0.pipeline().addLast(new SimpleServerHandler());
              }
          }
      
          /**
           * @param args
           * @throws Exception
           */
          public static void main(String[] args) throws Exception {
              int port = 8081;
              if (args != null && args.length > 0) {
                  try {
                      port = Integer.valueOf(args[0]);
                  } catch (NumberFormatException e) {
                      // 采用默认值
                  }
              }
              new SimpleServer().bind(port);
          }
      }
      

      1.1. 设置EventLoopGroup
      首先创建2个EventLoopGroup,一个parentGroup(用于接受新连接),childGroup(用于执行读写事件),NioEventLoopGroup内部根据设置的nEventLoops参数创建对应大小的NioEventLoop数组,并且每个NioEventLoop默认使用ForkJoinPool的一个线程,所以NioEventLoop称为单线程事件循环。

      1.2. 构造ServerBootstrap
      构造ServerBootstrap对象,并设置EventLoopGroup,channel(NioServerSocketChannel服务端套接字),一些option例如ChannelOption.SO_BACKLOG,childHandler(客户端连接后在管道链设置的ChannelHandler)

      1.3. 同步绑定端口

      b.bind(port).sync()
      

      1.3.1 validate方法
      validate方法验证parentGroup和channelFactory不能为null

      1.3.2 initAndRegister方法
      刚方法内部使用channelFactory通过反射构造NioServerSocketChannel的实例对象,NioServerSocketChannel实例对象构造内部主要包含ServerSocketChannel,DefaultChannelId(标识唯一性),Unsafe(所有IO操作都在这个类里),DefaultChannelPipeline(通道处理器管道链,自定义的ChannelHandler都在这里),NioServerSocketChannelConfig(一些配置信息)。
      构造完调用init初始化NioServerSocketChannel,包括设置自定义的ChannelHandler,ServerBootstrapAcceptor(专门用于接受客户端新连接时,初始化NioSocketChannel并注册进childGroup进行读写监听)。

      ChannelFuture regFuture = group().register(channel)
      

      异步注册NioServerSocketChannel到parentGroup里的NioEventLoop。
      因为注册过程是在NioEventLoop异步执行的,这里直接先分析register方法

      1.3.3 异步register

      channel.unsafe().register(this, promise)
      

      注册的时候会调用以上方法,委派给Unsafe的register方法,内部会给NioServerSocketChannel的字段eventLoop初始化(NioServerSocketChannel关联唯一的一个NioEventLoop),然后会调用

      eventLoop.execute(new OneTimeTask() {
                              @Override
                              public void run() {
                                  register0(promise);
                              }
                          });
      

      这个会开启NioEventLoop的事件循环线程,并放task到taskQueue里,作为异步执行register0方法。

      register0方法会调用外部类(NioServerSocketChannel)的doRegister方法,

      protected void doRegister() throws Exception {
              boolean selected = false;
              for (;;) {
                  try {
                      selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
                      return;
                  } catch (CancelledKeyException e) {
                      if (!selected) {
                          // Force the Selector to select now as the "canceled" SelectionKey may still be
                          // cached and not removed because no Select.select(..) operation was called yet.
                          ((NioEventLoop) eventLoop().unwrap()).selectNow();
                          selected = true;
                      } else {
                          // We forced a select operation on the selector before but the SelectionKey is still cached
                          // for whatever reason. JDK bug ?
                          throw e;
                      }
                  }
              }
          }
      

      这里使用了NIO的API,把NioServerSocketChannel里的ServerSocketChannel注册到NioServerSocketChannel关联的NioEventLoop里的selector。
      接下来的safeSetSuccess会把Main线程设置的监听器,设置bind任务。

      1.3.4 执行bind操作

      private static void doBind0(
                  final ChannelFuture regFuture, final Channel channel,
                  final SocketAddress localAddress, final ChannelPromise promise) {
      
              // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
              // the pipeline in its channelRegistered() implementation.
              channel.eventLoop().execute(new Runnable() {
                  @Override
                  public void run() {
                      if (regFuture.isSuccess()) {
                          channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                      } else {
                          promise.setFailure(regFuture.cause());
                      }
                  }
              });
          }
      

      注册成功的情况下,执行bind操作(NioServerSocketChannel的bind方法),一路追踪到

      unsafe.bind(localAddress, promise);
      

      unsafe的bind方法,内部调用NioServerSocketChannel的doBind方法

      protected void doBind(SocketAddress localAddress) throws Exception {
              javaChannel().socket().bind(localAddress, config.getBacklog());
          }
      

      并且设置pipeline.fireChannelActive()任务,fireChannelActive任务会调用channel.read()方法,内部会调用到unsafe.beginRead()方法,最终调用的是NioServerSocketChannel的doBeginRead方法,重新设置SelectionKey的感兴趣的事件readInterestOp(NioServerSocketChannel构造的时候确定的为SelectionKey.OP_ACCEPT),开始接收新连接。

      protected void doBeginRead() throws Exception {
              // Channel.read() or ChannelHandlerContext.read() was called
              if (inputShutdown) {
                  return;
              }
      
              final SelectionKey selectionKey = this.selectionKey;
              if (!selectionKey.isValid()) {
                  return;
              }
      
              readPending = true;
      
              final int interestOps = selectionKey.interestOps();
              if ((interestOps & readInterestOp) == 0) {
                  selectionKey.interestOps(interestOps | readInterestOp);
              }
          }
      

      1.4. NioEventLoop事件循环接受新连接
      NioEventLoop不停的通过ForkJoinPool执行它的asRunnable任务(通过每次执行任务将要完成时,重新把asRunnable设置到ForkJoinPool里)。
      从asRunnable的run方法开始,内部先执行selector的select操作,然后先调用processSelectedKeys()方法,获取到激活的selectedKeys数组,这里如果有新连接进来,那么就有一个SelectionKey,获取它的attachment(NioServerSocketChannel),然后调用processSelectedKey方法。

      private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
              final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
              if (!k.isValid()) {
                  // close the channel if the key is not valid anymore
                  unsafe.close(unsafe.voidPromise());
                  return;
              }
      
              try {
                  int readyOps = k.readyOps();
                  // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                  // to a spin loop
                  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                      unsafe.read();
                      if (!ch.isOpen()) {
                          // Connection already closed - no need to handle write.
                          return;
                      }
                  }
                  if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                      // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                      ch.unsafe().forceFlush();
                  }
                  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                      // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                      // See https://github.com/netty/netty/issues/924
                      int ops = k.interestOps();
                      ops &= ~SelectionKey.OP_CONNECT;
                      k.interestOps(ops);
      
                      unsafe.finishConnect();
                  }
              } catch (CancelledKeyException ignored) {
                  unsafe.close(unsafe.voidPromise());
              }
          }
      

      1.4.1 执行获取新连接方法

      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                      unsafe.read();
                      if (!ch.isOpen()) {
                          // Connection already closed - no need to handle write.
                          return;
                      }
                  }
      

      当readyOps等于SelectionKey.OP_ACCEPT调用unsafe.read(),这里调用到了AbstractNioMessageChannel的内部类NioMessageUnsafe的read方法。
      read方法会循环接受新连接,一次默认能接受16个连接,具体调用doReadMessages方法。

      protected int doReadMessages(List<Object> buf) throws Exception {
              SocketChannel ch = javaChannel().accept();
      
              try {
                  if (ch != null) {
                      buf.add(new NioSocketChannel(this, ch));
                      return 1;
                  }
              } catch (Throwable t) {
                  logger.warn("Failed to create a new channel from an accepted socket.", t);
      
                  try {
                      ch.close();
                  } catch (Throwable t2) {
                      logger.warn("Failed to close a socket.", t2);
                  }
              }
      
              return 0;
          }
      

      这里调用NIO的API,accept方法获取SocketChannel,并封装成NioSocketChannel(NioSocketChannel的构造字段和NioServerSocketChannel类似,只是NioSocketChannel默认的感兴趣事件为SelectionKey.OP_READ)。
      接受连接完成,循环调用pipeline.fireChannelRead()方法。

      1.4.2 ServerBootstrapAcceptor的channelRead方法
      上面的管道调用fireChannelRead方法,通过责任链方式依次调用ChannelHandler的channelRead方法,最重要的就是ServerBootstrapAcceptor的channelRead方法。
      它这个方法设置了childHandler到NioSocketChannel(新连接)的管道链里,然后又是异步注册NioSocketChannel到childGroup里的NioEventLoop里,注册过程和前面1.3章节的大体一致,也是启动了childGroup里的NioEventLoop的事件循环异步注册。只是因为是NioSocketChannel一些实现的方法不一样,执行的代码有点差别,最终注册完成也会调用pipeline的fireChannelActive()方法。

      1.4.3 fireChannelActive方法

      public ChannelPipeline fireChannelActive() {
              head.fireChannelActive();
      
              if (channel.config().isAutoRead()) {
                  channel.read();
              }
      
              return this;
          }
      

      channel的read方法最终委派调用到unsafe.beginRead()方法,然后又是NioSocketChannel的doBeginRead方法,重新设置SelectionKey的感兴趣事件为SelectionKey.OP_READ(NioSocketChannel的默认值)。到这里连接已经建立,并且开启了客户端连接读事件的监听。

    PS:上面的SimpleServerHandler代码如下:

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * Created by yaojiafeng on 16/1/17.
     */
    public class SimpleServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            ByteBuf body = (ByteBuf) msg;
            byte[] bytes = new byte[body.readableBytes()];
            body.readBytes(bytes);
            System.out.println(new String(bytes));
    
            ByteBuf resp = Unpooled.copiedBuffer(bytes);
            ctx.writeAndFlush(resp);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }
    
  • 相关阅读:
    随笔
    洛谷
    洛谷
    洛谷
    (水题)洛谷
    洛谷
    (水题)洛谷
    洛谷
    (水题)洛谷
    (水题)洛谷
  • 原文地址:https://www.cnblogs.com/yaojf/p/8127198.html
Copyright © 2011-2022 走看看