zoukankan      html  css  js  c++  java
  • Netty5客户端源码解析

    Netty5客户端源码解析

    今天来分析下netty5的客户端源码,示例代码如下:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * Created by yaojiafeng on 16/1/17.
     */
    public class SimpleClient {
    
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup(1);
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(new SimpleClientHandler());
                            }
                        });
    
                // 发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                // 当代客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            int port = 8081;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new SimpleClient().connect(port, "127.0.0.1");
        }
    }
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * Created by yaojiafeng on 16/1/17.
     */
    public class SimpleClientHandler extends ChannelHandlerAdapter {
    
    
        /**
         * Creates a client-side handler.
         */
        public SimpleClientHandler() {
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ByteBuf message = Unpooled.copiedBuffer("hello world".getBytes());
    
            ctx.writeAndFlush(message);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            ByteBuf body = (ByteBuf) msg;
    
            byte[] bytes = new byte[body.readableBytes()];
            body.readBytes(bytes);
            System.out.println(new String(bytes));
        }
    
    }
    
    1. 构造Bootstrap对象
      设置事件循环组为NioEventLoopGroup,设置channel为NioSocketChannel,设置一些socket配置项,比如ChannelOption.TCP_NODELAY,设置自定义的ChannelHandler

    2. 发起异步连接

      // 发起异步连接操作
                  ChannelFuture f = b.connect(host, port).sync();
      

      内部具体操作如下:

      2.1 参数校验
      执行validate方法,EventLoopGroup、channelFactory、ChannelHandler这几个基本字段不能为空

      2.2 initAndRegister方法
      异步执行初始化和注册方法,反射构造初始化构造Bootstrap时,设置的NioSocketChannel对象,NioSocketChannel包含的具体字段上篇Netty5服务端源码解析有讲解。构造NioSocketChannel后,调用init方法初始化NioSocketChannel,包括设置ChannelHandler到管道里,设置ChannelOption到socket。然后异步注册NioSocketChannel到NioEventLoopGroup里的NioEventLoop。

      2.2.1 异步注册NioSocketChannel到NioEventLoop
      最终委派调用到

      channel.unsafe().register(this, promise);
      

      NioSocketChannel里的内部类Unsafe的register方法,然后启动NioEventLoop单线程事件循环内部获取Task并执行,register0方法,内部会调用doRegister方法,注册SocketChannel到NioSocketChannel关联的唯一NioEventLoop的selector上,并加上att为NioSocketChannel。

                      selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
      

      2.2.2 safeSetSuccess方法激活doConnect0方法
      刚开始进入的doResolveAndConnect方法会调用doConnect方法,如果异步注册已经完成则直接调用doConnect0方法,否则给ChannelFuture增加监听方法,由channel注册完成后驱动调用doConnect0方法,一般情况下都是通过监听器驱动的。接下来分析doConnect0方法。

      private static void doConnect0(
                  final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
                  final ChannelPromise connectPromise) {
      
              // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
              // the pipeline in its channelRegistered() implementation.
              final Channel channel = connectPromise.channel();
              channel.eventLoop().execute(new Runnable() {
                  @Override
                  public void run() {
                      if (regFuture.isSuccess()) {
                          if (localAddress == null) {
                              channel.connect(remoteAddress, connectPromise);
                          } else {
                              channel.connect(remoteAddress, localAddress, connectPromise);
                          }
                          connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                      } else {
                          connectPromise.setFailure(regFuture.cause());
                      }
                  }
              });
          }
      

      这个方法也是增加task到NioEventLoop里,内部执行逻辑为,注册成功的情况下调用channel.connect(remoteAddress, connectPromise)方法,它会通过管道链,一路串行调用到unsafe.connect(remoteAddress, localAddress, promise)方法。调用doConnect方法

      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
              if (localAddress != null) {
                  javaChannel().socket().bind(localAddress);
              }
      
              boolean success = false;
              try {
                  boolean connected = javaChannel().connect(remoteAddress);
                  if (!connected) {
                      selectionKey().interestOps(SelectionKey.OP_CONNECT);
                  }
                  success = true;
                  return connected;
              } finally {
                  if (!success) {
                      doClose();
                  }
              }
          }   
      

      调用JDK NIO的API,因为是异步连接,返回的connected是false,所以后面会设置SelectionKey的感兴趣事件为SelectionKey.OP_CONNECT,为了后续的NioEventLoop的事件循环可以获取CONNECT激活的SelectionKey做后续的连接操作。

      2.2.3 完成连接
      从SingleThreadEventExecutor的asRunnable到processSelectedKeys到processSelectedKeysOptimized到processSelectedKey方法,判断激活的操作为SelectionKey.OP_CONNECT。

       if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                      // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                      // See https://github.com/netty/netty/issues/924
                      int ops = k.interestOps();
                      ops &= ~SelectionKey.OP_CONNECT;
                      k.interestOps(ops);
      
                      unsafe.finishConnect();
                  }    
      

      重新设置感兴趣事件为0,并且调用unsafe.finishConnect方法,内部调用doFinishConnect方法完成最终连接。

       protected void doFinishConnect() throws Exception {
              if (!javaChannel().finishConnect()) {
                  throw new Error();
              }
          }
      

      然后再调用fulfillConnectPromise方法,内部继续调用pipeline().fireChannelActive()方法

      public ChannelPipeline fireChannelActive() {
              head.fireChannelActive();
      
              if (channel.config().isAutoRead()) {
                  channel.read();
              }
      
              return this;
          }
      

      head.fireChannelActive走管道链,channel.read()方法也走管道链,最终调用unsafe.beginRead()方法,然后调用doBeginRead方法,重新设置感兴趣事件为SelectionKey.OP_READ(NioSocketChannel的默认感兴趣事件),至此客户端启动完成。

    3. 自行操作channel发送消息
      客户端启动完成获取channel我们可以调用writeAndFlush发送消息。当然服务端返回的消息,NioEventLoop会感知到,并通过管道链回调到自定义的channelRead方法进行读取。

  • 相关阅读:
    Jane Austen【简·奥斯汀】
    I Like for You to Be Still【我会一直喜欢你】
    Dialogue between Jack and Rose【jack 和 Rose的对话】
    git删除远程.idea目录
    码云初次导入项目(Idea)
    DelayQueue 订单限时支付实例
    eclipse安装spring的插件
    redis安装命令
    log4j详解
    jstree API
  • 原文地址:https://www.cnblogs.com/yaojf/p/8179608.html
Copyright © 2011-2022 走看看