zoukankan      html  css  js  c++  java
  • Netty源码分析之客户端启动过程

    一、先来看一下客户端示例代码。

     1 public class NettyClientTest {
     2     public void connect(int port, String host) throws Exception {
     3         EventLoopGroup group = new NioEventLoopGroup();//与服务端不同,客户端只需要一个IO线程组
     4 
     5         try {
     6             Bootstrap b = new Bootstrap();
     7             b.group(group)
     8                     .option(ChannelOption.TCP_NODELAY, true)//禁用nagel算法
     9                     .channel(NioSocketChannel.class)//设置channel类型为NioSocketChannel
    10                     .handler(new ChannelInitializer<SocketChannel>() {//为channel设置初始化Handler
    11                         @Override
    12                         protected void initChannel(SocketChannel ch) throws Exception {
    13                             ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    14                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    15                             ch.pipeline().addLast(new StringDecoder());
    16                             ch.pipeline().addLast(new EchoClientHandler());
    17                         }
    18                     });
    19             ChannelFuture f = b.connect(host, port).sync();//等不等待连接结束
    20             f.channel().closeFuture().sync();//同步等待关闭
    21         }finally {
    22              group.shutdownGracefully();
    23         }
    24     }
    25    public static void main(String[] args) throws Exception{
    26        int port = 8082;
    27        new NettyClientTest().connect(port,"127.0.0.1");
    28    }
    29 }
    30 
    31 class EchoClientHandler extends ChannelInboundHandlerAdapter{
    32     private int count = 0;
    33     static final String ECHO_REQ = "HI , MY NAME IS CHENYANG.$_";
    34 
    35     @Override
    36     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    37         for(int i = 0;i < 10;i++){
    38             ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    39         }
    40     }
    41 
    42     @Override
    43     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    44         System.out.println("This is"+ ++count + "times receive server:[" + msg + "]");
    45         ctx.writeAndFlush(Unpooled.copiedBuffer("hehe.$_".getBytes()));
    46         ctx.fireChannelRead(msg);
    47     }
    48 
    49     @Override
    50     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    51         ctx.flush();
    52     }
    53 
    54     @Override
    55     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    56         cause.printStackTrace();
    57         ctx.close();
    58     }
    59 }

    二、启动过程分析

    由于客户端Bootstrap的配置过程和服务端ServerBootstrap配置过程原理相类似,此处不再单独讲解客户端的配置过程。接下来直接看客户端的connect过程。

    三、connect过程分析

           ChannelFuture f = b.connect(host, port).sync();

           connect代码如下:

    1    /**
    2      * Connect a {@link Channel} to the remote peer.
    3      */
    4     public ChannelFuture connect(String inetHost, int inetPort) {
    5         return connect(new InetSocketAddress(inetHost, inetPort));
    6     }

           继续深入

     1   /**
     2      * Connect a {@link Channel} to the remote peer.
     3      */
     4     public ChannelFuture connect(SocketAddress remoteAddress) {
     5         if (remoteAddress == null) {
     6             throw new NullPointerException("remoteAddress");
     7         }
     8 
     9         validate();
    10         return doConnect(remoteAddress, localAddress());
    11     }

           继续查看doConnect源码

     1  /**
     2      * @see {@link #connect()}
     3      */
     4     private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
     5         final ChannelFuture regFuture = initAndRegister();//与服务端的类似,负责初始化和注册这个channel
     6         final Channel channel = regFuture.channel();//获得创建的channel
     7         if (regFuture.cause() != null) {
     8             return regFuture;
     9         }
    10 
    11         final ChannelPromise promise = channel.newPromise();
    12         if (regFuture.isDone()) {
    13             doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//连接
    14         } else {
    15             regFuture.addListener(new ChannelFutureListener() {
    16                 @Override
    17                 public void operationComplete(ChannelFuture future) throws Exception {
    18                     doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
    19                 }
    20             });
    21         }
    22 
    23         return promise;
    24     }

            看一下initAndRegister代码

     1  final ChannelFuture initAndRegister() {
     2         final Channel channel = channelFactory().newChannel();//调用之前设置的channel工厂,创建channel,此处就是NioSocketChannel
     3         try {
     4             init(channel);//初始化这个channel,这个针对客户端和服务端是不同的
     5         } catch (Throwable t) {
     6             channel.unsafe().closeForcibly();
     7             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
     8             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
     9         }
    10 
    11         ChannelFuture regFuture = group().register(channel);//向NioEventLoopGroup中注册这个channel
    12         if (regFuture.cause() != null) {
    13             if (channel.isRegistered()) {
    14                 channel.close();
    15             } else {
    16                 channel.unsafe().closeForcibly();
    17             }
    18         }
    19 
    20         // If we are here and the promise is not failed, it's one of the following cases:
    21         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    22         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    23         // 2) If we attempted registration from the other thread, the registration request has been successfully
    24         //    added to the event loop's task queue for later execution.
    25         //    i.e. It's safe to attempt bind() or connect() now:
    26         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    27         //         because register(), bind(), and connect() are all bound to the same thread.
    28 
    29         return regFuture;
    30     }

          首先看一下针对客户端的init代码。

     1     @Override
     2     @SuppressWarnings("unchecked")
     3     void init(Channel channel) throws Exception {
     4         ChannelPipeline p = channel.pipeline();
     5         p.addLast(handler());//设置用户添加的handler,也就是初始化的handler
     6 
     7         final Map<ChannelOption<?>, Object> options = options();
     8         synchronized (options) {
     9             for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
    10                 try {
    11                     if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {//设置channel的配置选项
    12                         logger.warn("Unknown channel option: " + e);
    13                     }
    14                 } catch (Throwable t) {
    15                     logger.warn("Failed to set a channel option: " + channel, t);
    16                 }
    17             }
    18         }
    19 
    20         final Map<AttributeKey<?>, Object> attrs = attrs();
    21         synchronized (attrs) {
    22             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    23                 channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());//设置channel的属性
    24             }
    25         }
    26     }

           接下来看register过程,这个和服务端是一样的。(ChannelFuture regFuture = group().register(channel);)

    1   @Override
    2     public ChannelFuture register(Channel channel) {
    3         return next().register(channel);//next()会在Group中选出下一个NioEventLoop
    4     }
    1     @Override
    2     public ChannelFuture register(Channel channel) {
    3         return register(channel, new DefaultChannelPromise(channel, this));
    4     }
     1    @Override
     2     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
     3         if (channel == null) {
     4             throw new NullPointerException("channel");
     5         }
     6         if (promise == null) {
     7             throw new NullPointerException("promise");
     8         }
     9 
    10         channel.unsafe().register(this, promise);//unsafe中执行真正的注册操作
    11         return promise;
    12     }
     1 @Override
     2         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     3             if (eventLoop == null) {
     4                 throw new NullPointerException("eventLoop");
     5             }
     6             if (isRegistered()) {
     7                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
     8                 return;
     9             }
    10             if (!isCompatible(eventLoop)) {
    11                 promise.setFailure(
    12                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    13                 return;
    14             }
    15 
    16             AbstractChannel.this.eventLoop = eventLoop;//设置该channel绑定的eventloop
    17 
    18             if (eventLoop.inEventLoop()) {//必须保证在eventloop线程中执行
    19                 register0(promise);//注册
    20             } else {
    21                 try {
    22                     eventLoop.execute(new OneTimeTask() {
    23                         @Override
    24                         public void run() {
    25                             register0(promise);
    26                         }
    27                     });
    28                 } catch (Throwable t) {
    29                     logger.warn(
    30                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    31                             AbstractChannel.this, t);
    32                     closeForcibly();
    33                     closeFuture.setClosed();
    34                     safeSetFailure(promise, t);
    35                 }
    36             }
    37         }

         继续看register0代码

     1 private void register0(ChannelPromise promise) {
     2             try {
     3                 // check if the channel is still open as it could be closed in the mean time when the register
     4                 // call was outside of the eventLoop
     5                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
     6                     return;
     7                 }
     8                 boolean firstRegistration = neverRegistered;
     9                 doRegister();//在selector上注册
    10                 neverRegistered = false;
    11                 registered = true;//设置已经注册标识
    12                 safeSetSuccess(promise);//设置注册成功
    13                 pipeline.fireChannelRegistered();//引发channelRegistered事件,这会导致初始化Handler的channelRegistered被调用
    14                 // Only fire a channelActive if the channel has never been registered. This prevents firing
    15                 // multiple channel actives if the channel is deregistered and re-registered.
    16                 if (firstRegistration && isActive()) {//如果channel可用,针对客户端,也就是connect成功
    17                     pipeline.fireChannelActive();//引发channelActive事件,最终注册read事件
    18                 }
    19             } catch (Throwable t) {
    20                 // Close the channel directly to avoid FD leak.
    21                 closeForcibly();
    22                 closeFuture.setClosed();
    23                 safeSetFailure(promise, t);
    24             }
    25         }

           看doRegister代码

     1 @Override
     2     protected void doRegister() throws Exception {
     3         boolean selected = false;
     4         for (;;) {
     5             try {
     6                 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,这里注册的op为0,不会监听任何事件
     7                 return;
     8             } catch (CancelledKeyException e) {
     9                 if (!selected) {
    10                     // Force the Selector to select now as the "canceled" SelectionKey may still be
    11                     // cached and not removed because no Select.select(..) operation was called yet.
    12                     eventLoop().selectNow();
    13                     selected = true;
    14                 } else {
    15                     // We forced a select operation on the selector before but the SelectionKey is still cached
    16                     // for whatever reason. JDK bug ?
    17                     throw e;
    18                 }
    19             }
    20         }
    21     }

           initAndRegister执行完成之后,继续看doConnect0代码

     1  private static void doConnect0(
     2             final ChannelFuture regFuture, final Channel channel,
     3             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
     4 
     5         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
     6         // the pipeline in its channelRegistered() implementation.
     7         channel.eventLoop().execute(new Runnable() {//接下来的代码实在eventloop中执行,而不是用户线程
     8             @Override
     9             public void run() {
    10                 if (regFuture.isSuccess()) {
    11                     if (localAddress == null) {
    12                         channel.connect(remoteAddress, promise);//执行connect
    13                     } else {
    14                         channel.connect(remoteAddress, localAddress, promise);
    15                     }
    16                     promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    17                 } else {
    18                     promise.setFailure(regFuture.cause());
    19                 }
    20             }
    21         });
    22     }

          继续看connect代码,简单的调用了pipeline.connect

    1    @Override
    2     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    3         return pipeline.connect(remoteAddress, promise);
    4     }

          从tail开始     

    1    @Override
    2     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    3         return tail.connect(remoteAddress, promise);
    4     }

          最终会调用到head.connect()

    1   @Override
    2         public void connect(
    3                 ChannelHandlerContext ctx,
    4                 SocketAddress remoteAddress, SocketAddress localAddress,
    5                 ChannelPromise promise) throws Exception {
    6             unsafe.connect(remoteAddress, localAddress, promise);
    7         }
     1    @Override
     2         public final void connect(
     3                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
     4             if (!promise.setUncancellable() || !ensureOpen(promise)) {
     5                 return;
     6             }
     7 
     8             try {
     9                 if (connectPromise != null) {
    10                     throw new IllegalStateException("connection attempt already made");
    11                 }
    12 
    13                 boolean wasActive = isActive();
    14                 if (doConnect(remoteAddress, localAddress)) {
    15                     fulfillConnectPromise(promise, wasActive);//设置promise
    16                 } else {
    17                     connectPromise = promise;
    18                     requestedRemoteAddress = remoteAddress;
    19 
    20                     // Schedule connect timeout.
    21                     int connectTimeoutMillis = config().getConnectTimeoutMillis();//支持连接超时机制
    22                     if (connectTimeoutMillis > 0) {
    23                         connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
    24                             @Override
    25                             public void run() {
    26                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    27                                 ConnectTimeoutException cause =
    28                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
    29                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
    30                                     close(voidPromise());
    31                                 }
    32                             }
    33                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    34                     }
    35 
    36                     promise.addListener(new ChannelFutureListener() {
    37                         @Override
    38                         public void operationComplete(ChannelFuture future) throws Exception {
    39                             if (future.isCancelled()) {
    40                                 if (connectTimeoutFuture != null) {
    41                                     connectTimeoutFuture.cancel(false);
    42                                 }
    43                                 connectPromise = null;
    44                                 close(voidPromise());
    45                             }
    46                         }
    47                     });
    48                 }
    49             } catch (Throwable t) {
    50                 promise.tryFailure(annotateConnectException(t, remoteAddress));
    51                 closeIfClosed();
    52             }
    53         }

      客户端的isActive()

    1     @Override
    2     public boolean isActive() {
    3         SocketChannel ch = javaChannel();
    4         return ch.isOpen() && ch.isConnected();
    5     }

      服务端的isActive()

    1   @Override
    2     public boolean isActive() {
    3         return javaChannel().socket().isBound();
    4     }

       看一下doConnect代码

     1 @Override
     2     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
     3         if (localAddress != null) {
     4             javaChannel().socket().bind(localAddress);
     5         }
     6 
     7         boolean success = false;
     8         try {
     9             boolean connected = javaChannel().connect(remoteAddress);//执行真正的异步connect
    10             if (!connected) {
    11                 selectionKey().interestOps(SelectionKey.OP_CONNECT);//如果没有注册成功,就注册OP_CONNECT事件
    12             }
    13             success = true;
    14             return connected;
    15         } finally {
    16             if (!success) {
    17                 doClose();
    18             }
    19         }
    20     }
     1 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
     2             if (promise == null) {
     3                 // Closed via cancellation and the promise has been notified already.
     4                 return;
     5             }
     6 
     7             // trySuccess() will return false if a user cancelled the connection attempt.
     8             boolean promiseSet = promise.trySuccess();
     9 
    10             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    11             // because what happened is what happened.
    12             if (!wasActive && isActive()) {//如果connect成功
    13                 pipeline().fireChannelActive();//最终会注册read事件,细节如下
    14             }
    15 
    16             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
    17             if (!promiseSet) {
    18                 close(voidPromise());
    19             }
    20         }
     1   @Override
     2     public ChannelPipeline fireChannelActive() {
     3         head.fireChannelActive();
     4 
     5         if (channel.config().isAutoRead()) {
     6             channel.read();//pipeline.read()-->tail.read()-->****-->head.read()-->unsafe.beginRead()-->doBeginRead()-->real操作
     7         }
     8 
     9         return this;
    10     }

    四、看一下如何获取异步连接结果的

      在NioEventLoop的循环中,可以看到如下代码:

    1  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    2                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    3                 // See https://github.com/netty/netty/issues/924
    4                 int ops = k.interestOps();
    5                 ops &= ~SelectionKey.OP_CONNECT;
    6                 k.interestOps(ops);
    7 
    8                 unsafe.finishConnect();
    9             }

           当发生OP_CONNECT事件时,最终会调用unsafe.finishConnect,代码如下

     1 @Override
     2         public final void finishConnect() {
     3             // Note this method is invoked by the event loop only if the connection attempt was
     4             // neither cancelled nor timed out.
     5 
     6             assert eventLoop().inEventLoop();//确保该操作是在eventLoop线程中的
     7 
     8             try {
     9                 boolean wasActive = isActive();
    10                 doFinishConnect();
    11                 fulfillConnectPromise(connectPromise, wasActive);
    12             } catch (Throwable t) {
    13                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    14             } finally {
    15                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
    16                 // See https://github.com/netty/netty/issues/1770
    17                 if (connectTimeoutFuture != null) {
    18                     connectTimeoutFuture.cancel(false);
    19                 }
    20                 connectPromise = null;
    21             }
    22         }
    1   @Override
    2     protected void doFinishConnect() throws Exception {
    3         if (!javaChannel().finishConnect()) {//判断JDK的SocketChannel连接结果,返回true表示连接成功
    4             throw new Error();
    5         }
    6     }

     判断JDK的SocketChannel连接结果,返回true表示连接成功

      1  public boolean finishConnect() throws IOException {
      2         Object var1 = this.readLock;
      3         synchronized(this.readLock) {
      4             Object var2 = this.writeLock;
      5             synchronized(this.writeLock) {
      6                 Object var3 = this.stateLock;
      7                 boolean var10000;
      8                 synchronized(this.stateLock) {
      9                     if(!this.isOpen()) {
     10                         throw new ClosedChannelException();
     11                     }
     12 
     13                     if(this.state == 2) {
     14                         var10000 = true;
     15                         return var10000;
     16                     }
     17 
     18                     if(this.state != 1) {
     19                         throw new NoConnectionPendingException();
     20                     }
     21                 }
     22 
     23                 int var41 = 0;
     24 
     25                 Object var4;
     26                 try {
     27                     label525: {
     28                         boolean var29 = false;
     29 
     30                         boolean var6;
     31                         label506: {
     32                             try {
     33                                 var29 = true;
     34                                 this.begin();
     35                                 synchronized(this.blockingLock()) {
     36                                     label480: {
     37                                         label494: {
     38                                             Object var5 = this.stateLock;
     39                                             synchronized(this.stateLock) {
     40                                                 if(!this.isOpen()) {
     41                                                     var6 = false;
     42                                                     break label494;
     43                                                 }
     44 
     45                                                 this.readerThread = NativeThread.current();
     46                                             }
     47 
     48                                             if(!this.isBlocking()) {
     49                                                 do {
     50                                                     var41 = checkConnect(this.fd, false, this.readyToConnect);
     51                                                 } while(var41 == -3 && this.isOpen());
     52                                             } else {
     53                                                 do {
     54                                                     while(true) {
     55                                                         var41 = checkConnect(this.fd, true, this.readyToConnect);
     56                                                         if(var41 == 0) {
     57                                                             continue;
     58                                                         }
     59                                                         break;
     60                                                     }
     61                                                 } while(var41 == -3 && this.isOpen());
     62                                             }
     63 
     64                                             var29 = false;
     65                                             break label480;
     66                                         }
     67 
     68                                         var29 = false;
     69                                         break label506;
     70                                     }
     71                                 }
     72                             } finally {
     73                                 if(var29) {
     74                                     Object var13 = this.stateLock;
     75                                     synchronized(this.stateLock) {
     76                                         this.readerThread = 0L;
     77                                         if(this.state == 3) {
     78                                             this.kill();
     79                                             var41 = 0;
     80                                         }
     81                                     }
     82 
     83                                     this.end(var41 > 0 || var41 == -2);
     84 
     85                                     assert IOStatus.check(var41);
     86 
     87                                 }
     88                             }
     89 
     90                             var4 = this.stateLock;
     91                             synchronized(this.stateLock) {
     92                                 this.readerThread = 0L;
     93                                 if(this.state == 3) {
     94                                     this.kill();
     95                                     var41 = 0;
     96                                 }
     97                             }
     98 
     99                             this.end(var41 > 0 || var41 == -2);
    100 
    101                             assert IOStatus.check(var41);
    102                             break label525;
    103                         }
    104 
    105                         Object var7 = this.stateLock;
    106                         synchronized(this.stateLock) {
    107                             this.readerThread = 0L;
    108                             if(this.state == 3) {
    109                                 this.kill();
    110                                 var41 = 0;
    111                             }
    112                         }
    113 
    114                         this.end(var41 > 0 || var41 == -2);
    115 
    116                         assert IOStatus.check(var41);
    117 
    118                         return var6;
    119                     }
    120                 } catch (IOException var38) {
    121                     this.close();
    122                     throw var38;
    123                 }
    124 
    125                 if(var41 > 0) {
    126                     var4 = this.stateLock;
    127                     synchronized(this.stateLock) {
    128                         this.state = 2;
    129                         if(this.isOpen()) {
    130                             this.localAddress = Net.localAddress(this.fd);
    131                         }
    132                     }
    133 
    134                     var10000 = true;
    135                     return var10000;
    136                 } else {
    137                     var10000 = false;
    138                     return var10000;
    139                 }
    140             }
    141         }
    142     }

        fulfillConnectPromise会出发链接激活事件

     1    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
     2             if (promise == null) {
     3                 // Closed via cancellation and the promise has been notified already.
     4                 return;
     5             }
     6 
     7             // trySuccess() will return false if a user cancelled the connection attempt.
     8             boolean promiseSet = promise.trySuccess();
     9 
    10             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    11             // because what happened is what happened.
    12             if (!wasActive && isActive()) {
    13                 pipeline().fireChannelActive();//参照前面的说明
    14             }
    15 
    16             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
    17             if (!promiseSet) {
    18                 close(voidPromise());
    19             }
    20         }

     五、write过程

      由于在服务端启动过程中已经多次分析了channel的read执行过程,因此在这里单独分析一下channel的write过程。首先看一下channe接口中关于write方法的定义:

    1 /**
    2      * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
    3      * This method will not request to actual flush, so be sure to call {@link #flush()}
    4      * once you want to request to flush all pending data to the actual transport.
    5      */
    6     ChannelFuture write(Object msg);

        其在AbstractChannel中的实现为:

    1  @Override
    2     public ChannelFuture write(Object msg) {
    3         return pipeline.write(msg);
    4     }

        继续深入

    1    @Override
    2     public ChannelFuture write(Object msg) {
    3         return tail.write(msg);
    4     }

        事件进入pipeline之后,会从tail  context开始向前传播(因为write是个outbound事件)

    1    @Override
    2     public ChannelFuture write(Object msg) {
    3         return write(msg, newPromise());
    4     }

       继续

     1  @Override
     2     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
     3         if (msg == null) {
     4             throw new NullPointerException("msg");
     5         }
     6 
     7         if (!validatePromise(promise, true)) {
     8             ReferenceCountUtil.release(msg);
     9             // cancelled
    10             return promise;
    11         }
    12         write(msg, false, promise);//false表示不flush缓冲区的意思
    13 
    14         return promise;
    15     }

      继续

     1  private void write(Object msg, boolean flush, ChannelPromise promise) {
     2 
     3         AbstractChannelHandlerContext next = findContextOutbound();
     4         EventExecutor executor = next.executor();
     5         if (executor.inEventLoop()) {
     6             next.invokeWrite(msg, promise);
     7             if (flush) {
     8                 next.invokeFlush();
     9             }
    10         } else {
    11             int size = channel.estimatorHandle().size(msg);
    12             if (size > 0) {
    13                 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
    14                 // Check for null as it may be set to null if the channel is closed already
    15                 if (buffer != null) {
    16                     buffer.incrementPendingOutboundBytes(size);
    17                 }
    18             }
    19             Runnable task;
    20             if (flush) {
    21                 task = WriteAndFlushTask.newInstance(next, msg, size, promise);
    22             }  else {
    23                 task = WriteTask.newInstance(next, msg, size, promise);
    24             }
    25             safeExecute(executor, task, promise, msg);
    26         }
    27     }

        看一下findContextOutbound的实现

    1   private AbstractChannelHandlerContext findContextOutbound() {
    2         AbstractChannelHandlerContext ctx = this;
    3         do {
    4             ctx = ctx.prev;
    5         } while (!ctx.outbound);
    6         return ctx;
    7     }

        找到下一个OutBound类型的Context之后,会调用Context中的Handler

    1    private void invokeWrite(Object msg, ChannelPromise promise) {
    2         try {
    3             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    4         } catch (Throwable t) {
    5             notifyOutboundHandlerException(t, promise);
    6         }
    7     }

         继续看handler的write实现

     1  /**
     2      * Calls {@link ChannelHandlerContext#write(Object)} to forward
     3      * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     4      *
     5      * Sub-classes may override this method to change behavior.
     6      */
     7     @Override
     8     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     9         ctx.write(msg, promise);
    10     }

         可以看到,默认的实现是将事件继续沿pipeline向前传播,最终会传到head Context

    1   @Override
    2         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    3             unsafe.write(msg, promise);
    4         }

         unsafe会执行真正的IO操作

     1  @Override
     2         public final void write(Object msg, ChannelPromise promise) {
     3             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
     4             if (outboundBuffer == null) {
     5                 // If the outboundBuffer is null we know the channel was closed and so
     6                 // need to fail the future right away. If it is not null the handling of the rest
     7                 // will be done in flush0()
     8                 // See https://github.com/netty/netty/issues/2362
     9                 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
    10                 // release message now to prevent resource-leak
    11                 ReferenceCountUtil.release(msg);
    12                 return;
    13             }
    14 
    15             int size;
    16             try {
    17                 msg = filterOutboundMessage(msg);
    18                 size = estimatorHandle().size(msg);
    19                 if (size < 0) {
    20                     size = 0;
    21                 }
    22             } catch (Throwable t) {
    23                 safeSetFailure(promise, t);
    24                 ReferenceCountUtil.release(msg);
    25                 return;
    26             }
    27 
    28             outboundBuffer.addMessage(msg, size, promise);
    29         }

        可以看到,unsafe的write操作并不是真正的将数据发送出去,而是在环形缓冲区中进行缓存。当channel调用flush时,最终会执行unsafe的flush

     1  @Override
     2         public final void flush() {
     3             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
     4             if (outboundBuffer == null) {
     5                 return;
     6             }
     7 
     8             outboundBuffer.addFlush();
     9             flush0();
    10         }

         addFlush仅仅是对之前缓存的Message进行标记

     1   /**
     2      * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
     3      * and so you will be able to handle them.
     4      */
     5     public void addFlush() {
     6         // There is no need to process all entries if there was already a flush before and no new messages
     7         // where added in the meantime.
     8         //
     9         // See https://github.com/netty/netty/issues/2577
    10         Entry entry = unflushedEntry;
    11         if (entry != null) {
    12             if (flushedEntry == null) {
    13                 // there is no flushedEntry yet, so start with the entry
    14                 flushedEntry = entry;
    15             }
    16             do {
    17                 flushed ++;
    18                 if (!entry.promise.setUncancellable()) {
    19                     // Was cancelled so make sure we free up memory and notify about the freed bytes
    20                     int pending = entry.cancel();
    21                     decrementPendingOutboundBytes(pending, false, true);
    22                 }
    23                 entry = entry.next;
    24             } while (entry != null);
    25 
    26             // All flushed so reset unflushedEntry
    27             unflushedEntry = null;
    28         }
    29     }

        接下来看一下真正的flush操作

     1  protected void flush0() {
     2             if (inFlush0) {
     3                 // Avoid re-entrance
     4                 return;
     5             }
     6 
     7             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
     8             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
     9                 return;
    10             }
    11 
    12             inFlush0 = true;
    13 
    14             // Mark all pending write requests as failure if the channel is inactive.
    15             if (!isActive()) {
    16                 try {
    17                     if (isOpen()) {
    18                         outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
    19                     } else {
    20                         // Do not trigger channelWritabilityChanged because the channel is closed already.
    21                         outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
    22                     }
    23                 } finally {
    24                     inFlush0 = false;
    25                 }
    26                 return;
    27             }
    28 
    29             try {
    30                 doWrite(outboundBuffer);
    31             } catch (Throwable t) {
    32                 boolean close = t instanceof IOException && config().isAutoClose();
    33                 // We do not want to trigger channelWritabilityChanged event if the channel is going to be closed.
    34                 outboundBuffer.failFlushed(t, !close);
    35                 if (close) {
    36                     close(voidPromise());
    37                 }
    38             } finally {
    39                 inFlush0 = false;
    40             }
    41         }

         doWrite执行真正的写操作

     1  @Override
     2     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
     3         for (;;) {
     4             int size = in.size();
     5             if (size == 0) {
     6                 // All written so clear OP_WRITE
     7                 clearOpWrite();
     8                 break;
     9             }
    10             long writtenBytes = 0;
    11             boolean done = false;
    12             boolean setOpWrite = false;
    13 
    14             // Ensure the pending writes are made of ByteBufs only.
    15             ByteBuffer[] nioBuffers = in.nioBuffers();
    16             int nioBufferCnt = in.nioBufferCount();
    17             long expectedWrittenBytes = in.nioBufferSize();
    18             SocketChannel ch = javaChannel();
    19 
    20             // Always us nioBuffers() to workaround data-corruption.
    21             // See https://github.com/netty/netty/issues/2761
    22             switch (nioBufferCnt) {
    23                 case 0:
    24                     // We have something else beside ByteBuffers to write so fallback to normal writes.
    25                     super.doWrite(in);
    26                     return;
    27                 case 1:
    28                     // Only one ByteBuf so use non-gathering write
    29                     ByteBuffer nioBuffer = nioBuffers[0];
    30                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    31                         final int localWrittenBytes = ch.write(nioBuffer);
    32                         if (localWrittenBytes == 0) {
    33                             setOpWrite = true;
    34                             break;
    35                         }
    36                         expectedWrittenBytes -= localWrittenBytes;
    37                         writtenBytes += localWrittenBytes;
    38                         if (expectedWrittenBytes == 0) {
    39                             done = true;
    40                             break;
    41                         }
    42                     }
    43                     break;
    44                 default:
    45                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    46                         final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
    47                         if (localWrittenBytes == 0) {
    48                             setOpWrite = true;
    49                             break;
    50                         }
    51                         expectedWrittenBytes -= localWrittenBytes;
    52                         writtenBytes += localWrittenBytes;
    53                         if (expectedWrittenBytes == 0) {
    54                             done = true;
    55                             break;
    56                         }
    57                     }
    58                     break;
    59             }
    60 
    61             // Release the fully written buffers, and update the indexes of the partially written buffer.
    62             in.removeBytes(writtenBytes);
    63 
    64             if (!done) {
    65                 // Did not write all buffers completely.
    66                 incompleteWrite(setOpWrite);
    67                 break;
    68             }
    69         }
    70     }
     1    protected final void clearOpWrite() {
     2         final SelectionKey key = selectionKey();
     3         // Check first if the key is still valid as it may be canceled as part of the deregistration
     4         // from the EventLoop
     5         // See https://github.com/netty/netty/issues/2104
     6         if (!key.isValid()) {
     7             return;
     8         }
     9         final int interestOps = key.interestOps();
    10         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
    11             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    12         }
    13     }

      

     1 @Override
     2     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
     3         int writeSpinCount = -1;
     4 
     5         for (;;) {
     6             Object msg = in.current();
     7             if (msg == null) {
     8                 // Wrote all messages.
     9                 clearOpWrite();
    10                 break;
    11             }
    12 
    13             if (msg instanceof ByteBuf) {
    14                 ByteBuf buf = (ByteBuf) msg;
    15                 int readableBytes = buf.readableBytes();
    16                 if (readableBytes == 0) {
    17                     in.remove();
    18                     continue;
    19                 }
    20 
    21                 boolean setOpWrite = false;
    22                 boolean done = false;
    23                 long flushedAmount = 0;
    24                 if (writeSpinCount == -1) {
    25                     writeSpinCount = config().getWriteSpinCount();
    26                 }
    27                 for (int i = writeSpinCount - 1; i >= 0; i --) {
    28                     int localFlushedAmount = doWriteBytes(buf);
    29                     if (localFlushedAmount == 0) {
    30                         setOpWrite = true;
    31                         break;
    32                     }
    33 
    34                     flushedAmount += localFlushedAmount;
    35                     if (!buf.isReadable()) {
    36                         done = true;
    37                         break;
    38                     }
    39                 }
    40 
    41                 in.progress(flushedAmount);
    42 
    43                 if (done) {
    44                     in.remove();
    45                 } else {
    46                     incompleteWrite(setOpWrite);
    47                     break;
    48                 }
    49             } else if (msg instanceof FileRegion) {
    50                 FileRegion region = (FileRegion) msg;
    51                 boolean done = region.transfered() >= region.count();
    52                 boolean setOpWrite = false;
    53 
    54                 if (!done) {
    55                     long flushedAmount = 0;
    56                     if (writeSpinCount == -1) {
    57                         writeSpinCount = config().getWriteSpinCount();
    58                     }
    59 
    60                     for (int i = writeSpinCount - 1; i >= 0; i--) {
    61                         long localFlushedAmount = doWriteFileRegion(region);
    62                         if (localFlushedAmount == 0) {
    63                             setOpWrite = true;
    64                             break;
    65                         }
    66 
    67                         flushedAmount += localFlushedAmount;
    68                         if (region.transfered() >= region.count()) {
    69                             done = true;
    70                             break;
    71                         }
    72                     }
    73 
    74                     in.progress(flushedAmount);
    75                 }
    76 
    77                 if (done) {
    78                     in.remove();
    79                 } else {
    80                     incompleteWrite(setOpWrite);
    81                     break;
    82                 }
    83             } else {
    84                 // Should not reach here.
    85                 throw new Error();
    86             }
    87         }
    88     }
    1    @Override
    2     protected int doWriteBytes(ByteBuf buf) throws Exception {
    3         final int expectedWrittenBytes = buf.readableBytes();
    4         return buf.readBytes(javaChannel(), expectedWrittenBytes);
    5     }
     1    /**
     2      * Notify the {@link ChannelPromise} of the current message about writing progress.
     3      */
     4     public void progress(long amount) {
     5         Entry e = flushedEntry;
     6         assert e != null;
     7         ChannelPromise p = e.promise;
     8         if (p instanceof ChannelProgressivePromise) {
     9             long progress = e.progress + amount;
    10             e.progress = progress;
    11             ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
    12         }
    13     }
     1 @Override
     2     public boolean tryProgress(long progress, long total) {
     3         if (total < 0) {
     4             total = -1;
     5             if (progress < 0 || isDone()) {
     6                 return false;
     7             }
     8         } else if (progress < 0 || progress > total || isDone()) {
     9             return false;
    10         }
    11 
    12         notifyProgressiveListeners(progress, total);
    13         return true;
    14     }
  • 相关阅读:
    Sql Server 跨服务器连接
    ASCII码与16进制的互相转换(表)
    c#多线程 Invoke方法的使用
    登陆时验证码的制作(asp.net)
    jQ&js给label
    IT行业的一些专业术语
    html div 加边框样式
    分布式技术 memcached
    分布式技术 webservice
    MVC 绑定 下拉框数据
  • 原文地址:https://www.cnblogs.com/chenyangyao/p/5796867.html
Copyright © 2011-2022 走看看