zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-18 ChannelHandler: codec--编解码框架

      编解码框架和一些常用的实现位于io.netty.handler.codec包中。

      编解码框架包含两部分:Byte流和特定类型数据之间的编解码,也叫序列化和反序列化。不类型数据之间的转换。

      下图是编解码框架的类继承体系:

      其中MessageToByteEncoder和ByteToMessageDecoder是实现了序列化和反序列化框架。 MessageToMessage是不同类型数据之间转换的框架。  

    序列化抽象实现: MessageToByteEncoder<I>

      序列化是把 类型的数据转换成Byte流。这个抽象类通过实现ChannelOutboundHandler的write方法在写数据时把 类型的数据转换成Byte流,下面是write方法的实现:

     1     @Override
     2     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     3         ByteBuf buf = null;
     4         try {
     5             if (acceptOutboundMessage(msg)) {
     6                 @SuppressWarnings("unchecked")
     7                 I cast = (I) msg;
     8                 buf = allocateBuffer(ctx, cast, preferDirect);
     9                 try {
    10                     encode(ctx, cast, buf);
    11                 } finally {
    12                     ReferenceCountUtil.release(cast);
    13                 }
    14 
    15                 if (buf.isReadable()) {
    16                     ctx.write(buf, promise);
    17                 } else {
    18                     buf.release();
    19                     ctx.write(Unpooled.EMPTY_BUFFER, promise);
    20                 }
    21                 buf = null;
    22             } else {
    23                 ctx.write(msg, promise);
    24             }
    25         } catch (EncoderException e) {
    26             throw e;
    27         } catch (Throwable e) {
    28             throw new EncoderException(e);
    29         } finally {
    30             if (buf != null) {
    31                 buf.release();
    32             }
    33         }
    34     }

      5行,  检查msg的类型,如果是 I 类型返回true, 否则返回false。

      7-10行, 分配一块buffer, 并调用encode方法把msg编码成Byte流放进这个buffer中。

      15-19行,对含有Byte流程数据的buffer继续执行写操作。(不清楚写操作流程的可以参考<<netty源码解解析(4.0)-15 Channel NIO实现:写数据>>)

        23行,如果msg不是 I 类型,跳过这个Handler, 继续执行写操作。

      这里调用的encode方法是一个抽象方法,留给子类实现定制的序列化操作。

    反序列化抽象实现: ByteToMessageDecoder

      这个抽象类型解决的主要问题是从Byte流中提取数据包。数据包是指刚好可以反序列化成一个特定类型Message的Byte数组。但是在数据包长度不确定的情况下,没办法每次刚好从Byte流中刚好分离一个数据包。每次从Byte流中读取数据有多种可能:

    1.  刚好是一个或多个完整的数据包。
    2.  不足一个完整的数据包,或错误的数据。
    3.  包含一个或多个完整的数据包,但有多余的数据不足一个完整的数据包或错误的数据。  

      这个问题本质上和"TCP粘包"问题相同。解决这个问题有两个关键点:

    1.  能够确定数据包在Byte流中的开始位置和长度。
    2.  需要暂时缓存不完整的数据包,等待后续数据拼接完整。

      关于第(1)点,在这个抽象类中没有处理,只是定义了一个抽象方法decode,留给子类处理。关于第(2)点,这个类定义了一个Cumulator(堆积器)来处理,把不完整的数据包暂时堆积到Cumulator中。Cumulator有两个实现: MERGE_CUMULATOR(合并堆积器),COMPOSITE_CUMULATOR(组合堆积器)。默认使用的是MERGE_CUMULATOR。下面详细分析一下这两种Cumulator的实现。

      MERGE_CUMULATOR的实现

      这是一个合并堆积器,使用ByteBuf作为堆积缓冲区,把通过把数据写到堆积缓冲实现新旧数据合并堆积。

     1 @Override
     2         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
     3             final ByteBuf buffer;
     4             if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
     5                     || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) {
     6                 // Expand cumulation (by replace it) when either there is not more room in the buffer
     7                 // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
     8                 // duplicate().retain() or if its read-only.
     9                 //
    10                 // See:
    11                 // - https://github.com/netty/netty/issues/2327
    12                 // - https://github.com/netty/netty/issues/1764
    13                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
    14             } else {
    15                 buffer = cumulation;
    16             }
    17             buffer.writeBytes(in);
    18             in.release();
    19             return buffer;
    20         }

      4-13行,如果当前的堆积缓冲区不能用了,分配一块新的,把旧缓冲区中的数据转移到新缓冲区中,并用新的替换旧的。当前堆积缓冲区不能用的条件是:

        cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes(): 容量不够

        或者 cumulation.refCnt() > 1 : 在其他地方本引用

                或者 cumulation instanceof ReadOnlyByteBuf 是只读的

      17行,把数据追加到堆积缓冲区中。

      

      COMPOSITE_CUMULATOR的实现

      这是一个合并堆积器,和MERGE_CUMULATOR不同的是他使用的是CompositeByteBuf作为堆积缓冲区。

     1        @Override
     2         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
     3             ByteBuf buffer;
     4             if (cumulation.refCnt() > 1) {
     5                 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
     6                 // use slice().retain() or duplicate().retain().
     7                 //
     8                 // See:
     9                 // - https://github.com/netty/netty/issues/2327
    10                 // - https://github.com/netty/netty/issues/1764
    11                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
    12                 buffer.writeBytes(in);
    13                 in.release();
    14             } else {
    15                 CompositeByteBuf composite;
    16                 if (cumulation instanceof CompositeByteBuf) {
    17                     composite = (CompositeByteBuf) cumulation;
    18                 } else {
    19                     composite = alloc.compositeBuffer(Integer.MAX_VALUE);
    20                     composite.addComponent(true, cumulation);
    21                 }
    22                 composite.addComponent(true, in);
    23                 buffer = composite;
    24             }
    25             return buffer;
    26         }

      4-13行,和MERGE_CUMULATOR一样。

      15-23行,如果当前的堆积缓冲区不是CompositeByteBuf类型,使用一个新的CompositeByteBuf类型的堆积缓冲区代替,并把数据转移的新缓冲区中。

      分离数据包的主流程

      ByteToMessageDecoder是ChannelInboundHandlerAdapter的派生类,它通过覆盖channelRead实现了反序列化的主流程。这个主流程主要是对堆积缓冲区cumulation的管理,主要步骤是:

    1. 把Byte流数据追加到cumulation中。
    2. 调用decode方法从cumulation中分离出完整的数据包,并把数据包反序列化成特定类型的数据,直到不能分离数据包为止。
    3. 检查cumulation,如果没有剩余数据,就销毁掉这个cumulation。否则,增加读计数。如果读计数超过丢弃阈值,丢掉部分数据,这一步是为了防止cumulation中堆积的数据过多。
    4. 把反序列化得到的Message List传递到pipeline中的下一个ChannelInboundHandler处理。

      由于使用了cumulation,ByteToMessageDecoder就变成了一个有状态的ChannelHandler, 它必须是独占的,不能使用ChannelHandler.@Sharable注解。

      在channelRead中,并没有直接调用decode方法,而是通过callDecode间接调用。而callDecdoe也不是直接调用,而是调用了decodeRemovalReentryProtection方法,这个方法只是对decode调用的简单封装。参数in是堆积缓冲区cumulation。 这个方法主要实现上面描述的第2个步骤。

     1 //在channelRead中调用方式:callDecode(ctx, cumulation, out);    
     2 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
     3         try {
     4             while (in.isReadable()) {
     5                 int outSize = out.size();
     6 
     7                 if (outSize > 0) {
     8                     fireChannelRead(ctx, out, outSize);
     9                     out.clear();
    10 
    11                     // Check if this handler was removed before continuing with decoding.
    12                     // If it was removed, it is not safe to continue to operate on the buffer.
    13                     //
    14                     // See:
    15                     // - https://github.com/netty/netty/issues/4635
    16                     if (ctx.isRemoved()) {
    17                         break;
    18                     }
    19                     outSize = 0;
    20                 }
    21 
    22                 int oldInputLength = in.readableBytes();
    23                 decodeRemovalReentryProtection(ctx, in, out);
    24 
    25                 // Check if this handler was removed before continuing the loop.
    26                 // If it was removed, it is not safe to continue to operate on the buffer.
    27                 //
    28                 // See https://github.com/netty/netty/issues/1664
    29                 if (ctx.isRemoved()) {
    30                     break;
    31                 }
    32 
    33                 if (outSize == out.size()) {
    34                     if (oldInputLength == in.readableBytes()) {
    35                         break;
    36                     } else {
    37                         continue;
    38                     }
    39                 }
    40 
    41                 if (oldInputLength == in.readableBytes()) {
    42                     throw new DecoderException(
    43                             StringUtil.simpleClassName(getClass()) +
    44                                     ".decode() did not read anything but decoded a message.");
    45                 }
    46 
    47                 if (isSingleDecode()) {
    48                     break;
    49                 }
    50             }
    51         } catch (DecoderException e) {
    52             throw e;
    53         } catch (Exception cause) {
    54             throw new DecoderException(cause);
    55         }
    56     }

      5-19行,如果已经成功分离出了至少一个数据包并成功反序列化,就调用fireChannelRead把得到的Message传递给pipeline中的下一个Handler处理。fireChannelRead会对out中的每一个Message调用一次ctx.fireChannelRead。

      22,23行,先记下in中的数据长度,再执行反序列化操作。

      33,39行,如果outSize == out.size()(没有反序列化到新的Message), 且oldInputLength == in.readableBytes()(in中的数据长度没有变化)表示in中的数据不足以完成一次反序列化操作,跳出循环。否则,继续。

        41行,出现了异常,完成了一次反序列化操作,但in中的数据没变化,凭空多了(或少了)一些反序列化的后Message。

     同时可以进行序列化和反序列化的抽象类: ByteToMessageCodec<I>

      这个类是ChannelDuplexHandler的派生类,可以同时序列化和反序列化操作。和前面两个类相比,它没什么特别是实现,内部使用MessageToByteEncoder<I>

    序列化,使用ByteToMessageDecoder反序列化。

    类型转换编码的抽象实现: MessageToMessageEncoder<I>

      这个类是ChannelOutboundHandlerAdapter的派生类,它在功能是在write过程中,把 I 类型的数据转换成另一种类型的数据。它定义了抽象方法encode,有子类负责实现具体的转换操作。

     1     @Override
     2     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     3         CodecOutputList out = null;
     4         try {
     5             if (acceptOutboundMessage(msg)) {
     6                 out = CodecOutputList.newInstance();
     7                 @SuppressWarnings("unchecked")
     8                 I cast = (I) msg;
     9                 try {
    10                     encode(ctx, cast, out);
    11                 } finally {
    12                     ReferenceCountUtil.release(cast);
    13                 }
    14 
    15                 if (out.isEmpty()) {
    16                     out.recycle();
    17                     out = null;
    18 
    19                     throw new EncoderException(
    20                             StringUtil.simpleClassName(this) + " must produce at least one message.");
    21                 }
    22             } else {
    23                 ctx.write(msg, promise);
    24             }
    25         } catch (EncoderException e) {
    26             throw e;
    27         } catch (Throwable t) {
    28             throw new EncoderException(t);
    29         } finally {
    30             if (out != null) {
    31                 final int sizeMinusOne = out.size() - 1;
    32                 if (sizeMinusOne == 0) {
    33                     ctx.write(out.get(0), promise);
    34                 } else if (sizeMinusOne > 0) {
    35                     // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
    36                     // See https://github.com/netty/netty/issues/2525
    37                     ChannelPromise voidPromise = ctx.voidPromise();
    38                     boolean isVoidPromise = promise == voidPromise;
    39                     for (int i = 0; i < sizeMinusOne; i ++) {
    40                         ChannelPromise p;
    41                         if (isVoidPromise) {
    42                             p = voidPromise;
    43                         } else {
    44                             p = ctx.newPromise();
    45                         }
    46                         ctx.write(out.getUnsafe(i), p);
    47                     }
    48                     ctx.write(out.getUnsafe(sizeMinusOne), promise);
    49                 }
    50                 out.recycle();
    51             }
    52         }
    53     }

      6-12行,如果msg是 I 类型的数据,调用encode把它转换成另一种类型。

      16-20行,如果没有转换成功,抛出异常。

      23行, 如果msg不是 I 类型,跳过当前的Handler。

      31-50, 如果转换成功,把转换后的数据传到到下一个Handler处理。33行处理只有一个转换结果的情况。37-48行处理有多个转换结果的情况。

    类型转换解码的抽象实现: MessageToMessageDecoder<I>

      这个类是ChannelInboundHandlerAdapter的派生类,它的功能是在read的过程中,把 I 类型的数据转换成另一种类型的数据。它定义了抽象方法decode,有子类负责实现具体的转换操作。它的channelRead和上面的类实现相似,但更简单,这里就不再分析源码了。

    类型转换编解码的抽象实现: MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>

      这个类是ChannelDuplexHandler的派生类,它的功能是在write过程中把OUTBOUND_IN类型的数据转换成INBOUND_IN类型的数据,在read过程中进程相反的操作。它没有特别的实现,内部使用前面的两个类实现编解码。

      

  • 相关阅读:
    C#设计模式(6)——原型模式(Prototype Pattern)
    C#设计模式(4)——抽象工厂模式
    C#设计模式(3)——工厂方法模式
    C#设计模式(2)——简单工厂模式
    cmd 打 jar 包
    java eclipse 中给args 传递参数
    java 中值传递和引用传递(转)
    java unreachable code不可达代码
    java语言中if语句的用法
    java中 构造器与void
  • 原文地址:https://www.cnblogs.com/brandonli/p/11252772.html
Copyright © 2011-2022 走看看