zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-15 Channel NIO实现:写数据

      写数据是NIO Channel实现的另一个比较复杂的功能。每一个channel都有一个outboundBuffer,这是一个输出缓冲区。当调用channel的write方法写数据时,这个数据被一系列ChannelOutboundHandler处理之后,它被放进这个缓冲区中,并没有真正把数据写到socket channel中。然后再调用channel的flush方法,flush会把outboundBuffer中数据真正写到socket channel。正常情况下flush之后,数据已经真正写完了。但使用Selector加非阻塞socket的方式写数据,让写操作变得复杂了。操作系统为每个socket维护了一个数据发送缓冲区,它的长度SO_SNDBUF, 每次发送数据,先把数据写到这个缓冲区中,操作系统负责把这个发送缓冲区中的数据发送出去,并清理这个缓冲区。当向缓冲区写的速率大于系统的发送速率时,它会被填满,在非阻塞模式下的表现为: 调用socket的write方法写入长度为n数据,实际写入的数据长度m的范围是:0=<m<n。这个时候还剩下长度为n-m的数据没有写入到socket,而数据必须以正确的顺序完整地写入到socket中。 outboundBuffer正是为解决这个问题而设计的,没写进socket的剩余数据会以正确的顺序保存在outboundBuffer中,当发送缓冲区中有空间可以写时,可以从outboundBuffer中取出剩余的数据继续写入到socket中。

      

      Channel write实现: 把数据写到outboundBuffer中

      write调用栈:

    1 io.netty.channel.AbstractChannel#write(java.lang.Object)
    2 io.netty.channel.DefaultChannelPipeline#write(java.lang.Object)
    3 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object)
    4 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    5 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
    6 io.netty.channel.AbstractChannelHandlerContext#invokeWrite
    7 io.netty.channel.DefaultChannelPipeline.HeadContext#write
    8 io.netty.channel.AbstractChannel.AbstractUnsafe#write

      write的主要逻辑在io.netty.channel.AbstractChannel.AbstractUnsafe#write中实现,这个方法把要写的数据msg对象放到outboundBuffer中。在执行close时,netty不希望有希望写新的数据,避免引起不可预料的错误,因此会把outboundBuffer置为null。这里在向outboundBuffer写数据之前会把对它进行检查,如果是null就抛出错误。下面是这个write方法的实现。

     1 @Override
     2 public final void write(Object msg, ChannelPromise promise) {
     3     assertEventLoop();
     4 
     5     ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
     6     if (outboundBuffer == null) {
     7         safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
     8         ReferenceCountUtil.release(msg);
     9         return;
    10     }
    11 
    12     int size;
    13     try {
    14         msg = filterOutboundMessage(msg);
    15         size = pipeline.estimatorHandle().size(msg);
    16         if (size < 0) {
    17             size = 0;
    18         }
    19     } catch (Throwable t) {
    20         safeSetFailure(promise, t);
    21         ReferenceCountUtil.release(msg);
    22         return;
    23     }
    24 
    25     outboundBuffer.addMessage(msg, size, promise);
    26 }

      第5-9行,对outboudBuffer进行检查,如果是null抛出错误。这个里有个小细节,用一个局部变量引用outboundBuffer,避免由其他线程对this.outboundBuffer置空引发错误。

      14行,调用filterOutboundMessage对msg进行过滤。这是一个protected方法,默认实现是什么都没做,返回输入的msg参数。子类可以覆盖这个方法,把msg转换成期望的类型。

      15行,计算msg的长度。

      25行,把放入到outboundBuffer中。

       

      Channel flush实现:把数据真正写到channel

      flush调用栈:

     1 io.netty.channel.AbstractChannel#flush
     2 io.netty.channel.DefaultChannelPipeline#flush
     3 io.netty.channel.AbstractChannelHandlerContext#flush
     4 io.netty.channel.AbstractChannelHandlerContext#invokeFlush
     5 io.netty.channel.DefaultChannelPipeline.HeadContext#flush
     6 io.netty.channel.AbstractChannel.AbstractUnsafe#flush
     7 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
     8 io.netty.channel.socket.nio.NioSocketChannel#doWrite
     9 io.netty.channel.nio.AbstractNioByteChannel#doWrite
    10 io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes

       以上是io.netty.channel.socket.nio.NioSocketChannel的flush调用栈,对于io.netty.channel.socket.nio.NioDatagramChannel来说,从第8行开始变得不同:

    7 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
    8 io.netty.channel.nio.AbstractNioMessageChannel#doWrite
    9 io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage

      

      把Byte数据流写入channel

      io.netty.channel.socket.nio.NioSocketChannel#doWrite是Byte数据流的写逻辑,io.netty.channel.nio.AbstractNioByteChannel#doWrite也是,这两者不同的地方在于前者是在outboundBuffer可以转换成java.nio.ByteBuffer的情况下执行,后者是在outboundBuffer中的msg是ByteBuf或FileRegin类型时执行。除此之外其他逻辑都一样:

    1. 尽量把outboundBuffer中的数据写到channel中。
    2. 如果channel无法写入数据,在channel的SelectionKey上注册OP_WRITE事件,等channel可写的时候再继续写入。
    3. 如写入次数超过限制,把flush操作包装成task放到eventLoop排队,等待再次执行。

      下面来看看io.netty.channel.socket.nio.NioSocketChannel#doWrite的实现代码:

     1 @Override
     2 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
     3     for (;;) {
     4         int size = in.size();
     5         if (size == 0) {
     6             // All written so clear OP_WRITE
     7             clearOpWrite();
     8             break;
     9         }
    10         long writtenBytes = 0;
    11         boolean done = false;
    12         boolean setOpWrite = false;
    13 
    14         // Ensure the pending writes are made of ByteBufs only.
    15         ByteBuffer[] nioBuffers = in.nioBuffers();
    16         int nioBufferCnt = in.nioBufferCount();
    17         long expectedWrittenBytes = in.nioBufferSize();
    18         SocketChannel ch = javaChannel();
    19 
    20         // Always us nioBuffers() to workaround data-corruption.
    21         // See https://github.com/netty/netty/issues/2761
    22         switch (nioBufferCnt) {
    23             case 0:
    24                 // We have something else beside ByteBuffers to write so fallback to normal writes.
    25                 super.doWrite(in);
    26                 return;
    27             case 1:
    28                 // Only one ByteBuf so use non-gathering write
    29                 ByteBuffer nioBuffer = nioBuffers[0];
    30                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    31                     final int localWrittenBytes = ch.write(nioBuffer);
    32                     if (localWrittenBytes == 0) {
    33                         setOpWrite = true;
    34                         break;
    35                     }
    36                     expectedWrittenBytes -= localWrittenBytes;
    37                     writtenBytes += localWrittenBytes;
    38                     if (expectedWrittenBytes == 0) {
    39                         done = true;
    40                         break;
    41                     }
    42                 }
    43                 break;
    44             default:
    45                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    46                     final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
    47                     if (localWrittenBytes == 0) {
    48                         setOpWrite = true;
    49                         break;
    50                     }
    51                     expectedWrittenBytes -= localWrittenBytes;
    52                     writtenBytes += localWrittenBytes;
    53                     if (expectedWrittenBytes == 0) {
    54                         done = true;
    55                         break;
    56                     }
    57                 }
    58                 break;
    59         }
    60 
    61         // Release the fully written buffers, and update the indexes of the partially written buffer.
    62         in.removeBytes(writtenBytes);
    63 
    64         if (!done) {
    65             // Did not write all buffers completely.
    66             incompleteWrite(setOpWrite);
    67             break;
    68         }
    69     }
    70 }

      第5-7行,如果outboundBuffer中已经没有数据了,调用clearOpWrite方法清除channel SelectionKey上的OP_WRITE事件。

      第15-17行,把outboundBuffer转换成ByteBuffer类型,并得到数据长度。

      25行,outboundBuffer不能转换成ByteBuffer, 调用io.netty.channel.nio.AbstractNioByteChannel#doWrite执行写操作。

      29-42,45-57的逻辑基本已经,都是尽量把ByteBuffer中的数据写到channel中,满足下列条件中的任意一个时,结束本次写操作:

        1. ByteBuffer中的数据已经写完,正常结束。

        2. channel已经不能写入数据,需要在channel可以写是继续执行写操作。

        3. 者超过channel config中写入次数限制,需要选择合适的实际继续执行写操作。

      62行,把已经写入到channel的数据从outboundBuffer中删除。

      64-66行, 如果数据没写完,调用incompleteWrite处理没写完的情况。当setOpWrite==true时,在channel的SelectionKey上设置OP_WRITE事件,等eventLoop触发这个事件时再继续执行flush操作。否则,把flush包装成task放到eventLoop中排队执行。

      当NioEventLoop检测到OP_WRITE事件时,会调用processSelectedKey方法处理:

    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

      forceFlush的调用栈如下:

    1 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#forceFlush
    2 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
    3 io.netty.channel.socket.nio.NioSocketChannel#doWrite
    4 io.netty.channel.nio.AbstractNioByteChannel#doWrite
    5 io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes

      把数据写入UDP类型的channel

      io.netty.channel.nio.AbstractNioMessageChannel#doWrite是数据报的写逻辑。相较于Byte流类型的数据,数据报的写逻辑简单一些。它只是把outboundBuffer中的数据报依次写入到channel中,如果channel写满了,在channel的SelectionKey上设置OP_WRITE事件随后退出,其后OP_WRITE事件处理逻辑和Byte流写逻辑一样。 真正的写操作在io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage中实现,这个方法的实现如下:

     1 @Override
     2 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
     3     final SocketAddress remoteAddress;
     4     final ByteBuf data;
     5     if (msg instanceof AddressedEnvelope) {
     6         @SuppressWarnings("unchecked")
     7         AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
     8         remoteAddress = envelope.recipient();
     9         data = envelope.content();
    10     } else {
    11         data = (ByteBuf) msg;
    12         remoteAddress = null;
    13     }
    14 
    15     final int dataLen = data.readableBytes();
    16     if (dataLen == 0) {
    17         return true;
    18     }
    19 
    20     final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
    21     final int writtenBytes;
    22     if (remoteAddress != null) {
    23         writtenBytes = javaChannel().send(nioData, remoteAddress);
    24     } else {
    25         writtenBytes = javaChannel().write(nioData);
    26     }
    27     return writtenBytes > 0;
    28 }

      5-9行,处理AddressedEnvelope类型的数据报,得到数据报的远程地址和数据。

      10-12行,发送的是一个ByteBuf。没有指定远程地址。这种情况下需要先调用channel的connect方法。

      20-26行,分别针对两种情况发送数据报. 23行指定了远程地址,25行没有指定远程地址,但调用过了connect方法。

      

  • 相关阅读:
    如何使用IntelliJ IDEA 14创建基于Maven3的Java Web Project
    一些python语法的合集
    thuwc2019滚粗记
    noip2018总结
    树链剖分
    NOIp2017游记(滚粗之旅)
    使用Redux管理你的React应用
    校招面试中积累的前端问题
    WebService 生成类的命令语句
    Request 请求页面的地址路径获取
  • 原文地址:https://www.cnblogs.com/brandonli/p/10314962.html
Copyright © 2011-2022 走看看