zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-12 Channel NIO实现:channel初始化

      创建一个channel实例,并把它register到eventLoopGroup中之后,这个channel然后处于inactive状态,仍然是不可用的。只有在bind或connect方法调用成功之后才能正常。因此bind或connect算是channel初始化的最后一步,本章这就重点分析这两个功能的实现。

      接下来的代码分析如果没有特别说明,都是以NioSocketChannel为例。

      bind实现

      bind方法的调用栈如下:

    io.netty.channel.AbstractChannel#bind(java.net.SocketAddress)
    io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress)
    io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress)  
    io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) io.netty.channel.AbstractChannelHandlerContext#invokeBind io.netty.channel.DefaultChannelPipeline.HeadContext#bind io.netty.channel.AbstractChannel.AbstractUnsafe#bind io.netty.channel.socket.nio.NioSocketChannel#doBind io.netty.channel.socket.nio.NioSocketChannel#doBind0

      为了能简单明了地展示调用关系,这个调用栈忽略了一些调用。可能有多个AbstractChannelHandlerContext的方法在不同的线程中被调用。以后在描述调用栈时也会忽略这一点,不再赘述。

      io.netty.channel.AbstractChannel.AbstractUnsafe#bind执行了主要的bind逻辑,它会调用doBind, 然后在channel的状态从inactive变成active,就调用pipline的fireChannelActive方法触发channelActives事件。doBind是io.netty.channel.AbstractChannel定义的抽象方法。NioSocketChannel只需要实现这个方法,整个bind功能就完整了。

     1     @Override
     2     protected void doBind(SocketAddress localAddress) throws Exception {
     3         doBind0(localAddress);
     4     }
     5     private void doBind0(SocketAddress localAddress) throws Exception {
     6         if (PlatformDependent.javaVersion() >= 7) {
     7             SocketUtils.bind(javaChannel(), localAddress);
     8         } else {
     9             SocketUtils.bind(javaChannel().socket(), localAddress);
    10         }
    11     }

       SocketUtils封装了通过AccessController调用JDK的socket API接口,事实上还是调用Socket或SocketChannel的bind方法。Nio的三个Channel类实现doBind的代码几乎一样。

      connect实现

      connect的调用栈如下:

    io.netty.channel.AbstractChannel#connect(java.net.SocketAddress)
    io.netty.channel.DefaultChannelPipeline#connect(java.net.SocketAddress)
    io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress)
    io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)
    io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, java.net.SocketAddress, io.netty.channel.ChannelPromise)
    io.netty.channel.AbstractChannelHandlerContext#invokeConnect
    io.netty.channel.DefaultChannelPipeline.HeadContext#connect
    io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
    io.netty.channel.socket.nio.NioSocketChannel#doConnect

      connect的主要逻辑在io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect中实现,它的流程是:

      1. 调用doConnect方法,这个方法是AbstractNioChanne定义的抽象方法。

      2. 如果doConnect成功,且channel的状态从inactive变成active,则调用pipeline的fireChannelActive方法触发channelActive事件。

      3. 如果doConnection失败,调用close关闭channel。

      io.netty.channel.socket.nio.NioSocketChannel#doConnect中是socket connect API的调用。下面是connect的关键代码。

     1 @Override
     2 public final void connect(
     3         final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
     4     if (!promise.setUncancellable() || !ensureOpen(promise)) {
     5         return;
     6     }
     7 
     8     try {
     9         if (connectPromise != null) {
    10             // Already a connect in process.
    11             throw new ConnectionPendingException();
    12         }
    13 
    14         boolean wasActive = isActive();
    15         if (doConnect(remoteAddress, localAddress)) {
    16             fulfillConnectPromise(promise, wasActive);
    17         } else {
    18             connectPromise = promise;
    19             requestedRemoteAddress = remoteAddress;
    20 
    21             // Schedule connect timeout.
    22             int connectTimeoutMillis = config().getConnectTimeoutMillis();
    23             if (connectTimeoutMillis > 0) {
    24                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
    25                     @Override
    26                     public void run() {
    27                         ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    28                         ConnectTimeoutException cause =
    29                                 new ConnectTimeoutException("connection timed out: " + remoteAddress);
    30                         if (connectPromise != null && connectPromise.tryFailure(cause)) {
    31                             close(voidPromise());
    32                         }
    33                     }
    34                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    35             }
    36 
    37             promise.addListener(new ChannelFutureListener() {
    38                 @Override
    39                 public void operationComplete(ChannelFuture future) throws Exception {
    40                     if (future.isCancelled()) {
    41                         if (connectTimeoutFuture != null) {
    42                             connectTimeoutFuture.cancel(false);
    43                         }
    44                         connectPromise = null;
    45                         close(voidPromise());
    46                     }
    47                 }
    48             });
    49         }
    50     } catch (Throwable t) {
    51         promise.tryFailure(annotateConnectException(t, remoteAddress));
    52         closeIfClosed();
    53     }
    54 }
    55 
    56 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    57     if (promise == null) {
    58         return;
    59     }
    60     boolean active = isActive();
    61     boolean promiseSet = promise.trySuccess();
    62 
    63     if (!wasActive && active) {
    64         pipeline().fireChannelActive();
    65     }
    66     if (!promiseSet) {
    67         close(voidPromise());
    68     }
    69 }

      第14,15行和整个fulfillConnectPromise方法处理正常流程。

      第18-52行处理异常流程。代码虽然多,但总结起来就一句话: 设置promis返回错误,确保能够调用close方法

      io.netty.channel.socket.nio.NioSocketChannel#doConnect实现和doBind实现类似:

     1 @Override
     2 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
     3     if (localAddress != null) {
     4         doBind0(localAddress);
     5     }
     6 
     7     boolean success = false;
     8     try {
     9         boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
    10         if (!connected) {
    11             selectionKey().interestOps(SelectionKey.OP_CONNECT);
    12         }
    13         success = true;
    14         return connected;
    15     } finally {
    16         if (!success) {
    17             doClose();
    18         }
    19     }
    20 }

      在第11行,注册OP_CONNECT事件。由于channel在初始化是被设置成非阻塞模式,connect方法可能返回false, 如果返回false表示connect操作没有完成,需要通过selector关注OP_CONNECT事件,把connect变成一个异步过程。只有异步调用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect之后,connect才算完成。finishConnect在eventLoop中被调用:

    1 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
    2 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    3     int ops = k.interestOps();
    4     ops &= ~SelectionKey.OP_CONNECT;
    5     k.interestOps(ops);
    6     unsafe.finishConnect();
    7 }

       finishConnection的实现如下:

     1 //io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect
     2 @Override
     3 public final void finishConnect() {
     4     // Note this method is invoked by the event loop only if the connection attempt was
     5     // neither cancelled nor timed out.
     6 
     7     assert eventLoop().inEventLoop();
     8     try {
     9         boolean wasActive = isActive();
    10         doFinishConnect();
    11         fulfillConnectPromise(connectPromise, wasActive);
    12     } catch (Throwable t) {
    13         fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    14     } finally {
    15         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
    16         // See https://github.com/netty/netty/issues/1770
    17         if (connectTimeoutFuture != null) {
    18             connectTimeoutFuture.cancel(false);
    19         }
    20         connectPromise = null;
    21     }
    22 }
    23 
    24 //io.netty.channel.socket.nio.NioSocketChannel#doFinishConnect
    25 @Override
    26 protected void doFinishConnect() throws Exception {
    27     if (!javaChannel().finishConnect()) {
    28         throw new Error();
    29     }
    30 }

      9-11行是finishConnection的关键代码, 先调用doFinishConnect执行完成连接之后的操作,NioSocketChannel实现是检查连接是否真的已经完成(27-29行),然后调用fulfillConnectPromise触发事件,设置promise返回值。在前面分析netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect代码时,可以看到在doConnect调用成功以后会立即调用这个方法。这个方法被调用两次是为了确保channelActive事件一定会被触发一次。

      localAddress,remoteAddress实现:得到channel的本地和远程地址

      这个两个方法的实现几乎一样,这里只分析localAddress,它的调用栈如下:

    1 io.netty.channel.AbstractChannel#localAddress
    2 io.netty.channel.AbstractChannel.AbstractUnsafe#localAddress
    3 io.netty.channel.socket.nio.NioSocketChannel#localAddress0

      这个方法不会触发任何事件,因此没有通过pipline调用unsafe,它直接调用unsafe的方法:

     1 //io.netty.channel.AbstractChannel#localAddress
     2 @Override
     3 public SocketAddress localAddress() {
     4     SocketAddress localAddress = this.localAddress;
     5     if (localAddress == null) {
     6         try {
     7             this.localAddress = localAddress = unsafe().localAddress();
     8         } catch (Throwable t) {
     9             // Sometimes fails on a closed socket in Windows.
    10             return null;
    11         }
    12     }
    13     return localAddress;
    14 }

      在第7行直接调用unsafe的locallAddress方法,这个方法在AbstractUnsafe中实现,它调用了localAddress0,这一个protected的抽象方法,在NioSocketChannel中的实现是:

    1 @Override
    2 protected SocketAddress localAddress0() {
    3     return javaChannel().socket().getLocalSocketAddress();
    4 }

      

      

  • 相关阅读:
    【English】20190307
    【Teradata】四舍五入函数
    【Teradata】配置PE和AMP(congfig和reconfig工具、vprocmanager)
    【English】20190306
    【Teradata】数据库初始化(sysinit和dip工具)
    【Teradata】日期类型转换
    Optional常用操作
    Stream学习笔记
    拦截Restful API的三种方式
    maven之可执行jar包
  • 原文地址:https://www.cnblogs.com/brandonli/p/10277841.html
Copyright © 2011-2022 走看看