zoukankan      html  css  js  c++  java
  • (一) netty 拆包,粘包处理(自定义协议)

    一、netty 为啥要进行拆包粘包处理

      简单点描述,netty底层通讯是走的TCP协议,接收到的都是字节流,然后以字节字节队列的形式存在缓存堆里面。而TCP协议每一次最大接收的字节长度是1024个字节,一旦超过这个长度,那么就会出现一下各种形式:

     

     所以在字节长度超过1024的时候,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    二、解决方案
      netty基于以上问题也提供了一些组件,比如:

    1. 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
    2. 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
    3. 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
    4. 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

      因为是自定义协议,在上面的方式不适用。所以在这里我基于LengthFieldBasedFrameDecoder实现的原理下,重新编写了一个实现方式。
      我们先看一下LengthFieldBasedFrameDecoder的代码:
      /*
       * Copyright 2012 The Netty Project
       *
       * The Netty Project licenses this file to you under the Apache License,
       * version 2.0 (the "License"); you may not use this file except in compliance
       * with the License. You may obtain a copy of the License at:
       *
       *   http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
       * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
       * License for the specific language governing permissions and limitations
       * under the License.
       */
      package io.netty.handler.codec;
      
      import java.nio.ByteOrder;
      import java.util.List;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.handler.codec.serialization.ObjectDecoder;
      
      /**
       * A decoder that splits the received {@link ByteBuf}s dynamically by the
       * value of the length field in the message.  It is particularly useful when you
       * decode a binary message which has an integer header field that represents the
       * length of the message body or the whole message.
       * <p>
       * {@link LengthFieldBasedFrameDecoder} has many configuration parameters so
       * that it can decode any message with a length field, which is often seen in
       * proprietary client-server protocols. Here are some example that will give
       * you the basic idea on which option does what.
       *
       * <h3>2 bytes length field at offset 0, do not strip header</h3>
       *
       * The value of the length field in this example is <tt>12 (0x0C)</tt> which
       * represents the length of "HELLO, WORLD".  By default, the decoder assumes
       * that the length field represents the number of the bytes that follows the
       * length field.  Therefore, it can be decoded with the simplistic parameter
       * combination.
       * <pre>
       * <b>lengthFieldOffset</b>   = <b>0</b>
       * <b>lengthFieldLength</b>   = <b>2</b>
       * lengthAdjustment    = 0
       * initialBytesToStrip = 0 (= do not strip header)
       *
       * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
       * +--------+----------------+      +--------+----------------+
       * | Length | Actual Content |----->| Length | Actual Content |
       * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
       * +--------+----------------+      +--------+----------------+
       * </pre>
       *
       * <h3>2 bytes length field at offset 0, strip header</h3>
       *
       * Because we can get the length of the content by calling
       * {@link ByteBuf#readableBytes()}, you might want to strip the length
       * field by specifying <tt>initialBytesToStrip</tt>.  In this example, we
       * specified <tt>2</tt>, that is same with the length of the length field, to
       * strip the first two bytes.
       * <pre>
       * lengthFieldOffset   = 0
       * lengthFieldLength   = 2
       * lengthAdjustment    = 0
       * <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field)
       *
       * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
       * +--------+----------------+      +----------------+
       * | Length | Actual Content |----->| Actual Content |
       * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
       * +--------+----------------+      +----------------+
       * </pre>
       *
       * <h3>2 bytes length field at offset 0, do not strip header, the length field
       *     represents the length of the whole message</h3>
       *
       * In most cases, the length field represents the length of the message body
       * only, as shown in the previous examples.  However, in some protocols, the
       * length field represents the length of the whole message, including the
       * message header.  In such a case, we specify a non-zero
       * <tt>lengthAdjustment</tt>.  Because the length value in this example message
       * is always greater than the body length by <tt>2</tt>, we specify <tt>-2</tt>
       * as <tt>lengthAdjustment</tt> for compensation.
       * <pre>
       * lengthFieldOffset   =  0
       * lengthFieldLength   =  2
       * <b>lengthAdjustment</b>    = <b>-2</b> (= the length of the Length field)
       * initialBytesToStrip =  0
       *
       * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
       * +--------+----------------+      +--------+----------------+
       * | Length | Actual Content |----->| Length | Actual Content |
       * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
       * +--------+----------------+      +--------+----------------+
       * </pre>
       *
       * <h3>3 bytes length field at the end of 5 bytes header, do not strip header</h3>
       *
       * The following message is a simple variation of the first example.  An extra
       * header value is prepended to the message.  <tt>lengthAdjustment</tt> is zero
       * again because the decoder always takes the length of the prepended data into
       * account during frame length calculation.
       * <pre>
       * <b>lengthFieldOffset</b>   = <b>2</b> (= the length of Header 1)
       * <b>lengthFieldLength</b>   = <b>3</b>
       * lengthAdjustment    = 0
       * initialBytesToStrip = 0
       *
       * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
       * +----------+----------+----------------+      +----------+----------+----------------+
       * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
       * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
       * +----------+----------+----------------+      +----------+----------+----------------+
       * </pre>
       *
       * <h3>3 bytes length field at the beginning of 5 bytes header, do not strip header</h3>
       *
       * This is an advanced example that shows the case where there is an extra
       * header between the length field and the message body.  You have to specify a
       * positive <tt>lengthAdjustment</tt> so that the decoder counts the extra
       * header into the frame length calculation.
       * <pre>
       * lengthFieldOffset   = 0
       * lengthFieldLength   = 3
       * <b>lengthAdjustment</b>    = <b>2</b> (= the length of Header 1)
       * initialBytesToStrip = 0
       *
       * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
       * +----------+----------+----------------+      +----------+----------+----------------+
       * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
       * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
       * +----------+----------+----------------+      +----------+----------+----------------+
       * </pre>
       *
       * <h3>2 bytes length field at offset 1 in the middle of 4 bytes header,
       *     strip the first header field and the length field</h3>
       *
       * This is a combination of all the examples above.  There are the prepended
       * header before the length field and the extra header after the length field.
       * The prepended header affects the <tt>lengthFieldOffset</tt> and the extra
       * header affects the <tt>lengthAdjustment</tt>.  We also specified a non-zero
       * <tt>initialBytesToStrip</tt> to strip the length field and the prepended
       * header from the frame.  If you don't want to strip the prepended header, you
       * could specify <tt>0</tt> for <tt>initialBytesToSkip</tt>.
       * <pre>
       * lengthFieldOffset   = 1 (= the length of HDR1)
       * lengthFieldLength   = 2
       * <b>lengthAdjustment</b>    = <b>1</b> (= the length of HDR2)
       * <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN)
       *
       * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
       * +------+--------+------+----------------+      +------+----------------+
       * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
       * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
       * +------+--------+------+----------------+      +------+----------------+
       * </pre>
       *
       * <h3>2 bytes length field at offset 1 in the middle of 4 bytes header,
       *     strip the first header field and the length field, the length field
       *     represents the length of the whole message</h3>
       *
       * Let's give another twist to the previous example.  The only difference from
       * the previous example is that the length field represents the length of the
       * whole message instead of the message body, just like the third example.
       * We have to count the length of HDR1 and Length into <tt>lengthAdjustment</tt>.
       * Please note that we don't need to take the length of HDR2 into account
       * because the length field already includes the whole header length.
       * <pre>
       * lengthFieldOffset   =  1
       * lengthFieldLength   =  2
       * <b>lengthAdjustment</b>    = <b>-3</b> (= the length of HDR1 + LEN, negative)
       * <b>initialBytesToStrip</b> = <b> 3</b>
       *
       * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
       * +------+--------+------+----------------+      +------+----------------+
       * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
       * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
       * +------+--------+------+----------------+      +------+----------------+
       * </pre>
       * @see LengthFieldPrepender
       */
      public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
      
          private final ByteOrder byteOrder;
          private final int maxFrameLength;
          private final int lengthFieldOffset;
          private final int lengthFieldLength;
          private final int lengthFieldEndOffset;
          private final int lengthAdjustment;
          private final int initialBytesToStrip;
          private final boolean failFast;
          private boolean discardingTooLongFrame;
          private long tooLongFrameLength;
          private long bytesToDiscard;
      
          /**
           * Creates a new instance.
           *
           * @param maxFrameLength
           *        the maximum length of the frame.  If the length of the frame is
           *        greater than this value, {@link TooLongFrameException} will be
           *        thrown.
           * @param lengthFieldOffset
           *        the offset of the length field
           * @param lengthFieldLength
           *        the length of the length field
           */
          public LengthFieldBasedFrameDecoder(
                  int maxFrameLength,
                  int lengthFieldOffset, int lengthFieldLength) {
              this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
          }
      
          /**
           * Creates a new instance.
           *
           * @param maxFrameLength
           *        the maximum length of the frame.  If the length of the frame is
           *        greater than this value, {@link TooLongFrameException} will be
           *        thrown.
           * @param lengthFieldOffset
           *        the offset of the length field
           * @param lengthFieldLength
           *        the length of the length field
           * @param lengthAdjustment
           *        the compensation value to add to the value of the length field
           * @param initialBytesToStrip
           *        the number of first bytes to strip out from the decoded frame
           */
          public LengthFieldBasedFrameDecoder(
                  int maxFrameLength,
                  int lengthFieldOffset, int lengthFieldLength,
                  int lengthAdjustment, int initialBytesToStrip) {
              this(
                      maxFrameLength,
                      lengthFieldOffset, lengthFieldLength, lengthAdjustment,
                      initialBytesToStrip, true);
          }
      
          /**
           * Creates a new instance.
           *
           * @param maxFrameLength
           *        the maximum length of the frame.  If the length of the frame is
           *        greater than this value, {@link TooLongFrameException} will be
           *        thrown.
           * @param lengthFieldOffset
           *        the offset of the length field
           * @param lengthFieldLength
           *        the length of the length field
           * @param lengthAdjustment
           *        the compensation value to add to the value of the length field
           * @param initialBytesToStrip
           *        the number of first bytes to strip out from the decoded frame
           * @param failFast
           *        If <tt>true</tt>, a {@link TooLongFrameException} is thrown as
           *        soon as the decoder notices the length of the frame will exceed
           *        <tt>maxFrameLength</tt> regardless of whether the entire frame
           *        has been read.  If <tt>false</tt>, a {@link TooLongFrameException}
           *        is thrown after the entire frame that exceeds <tt>maxFrameLength</tt>
           *        has been read.
           */
          public LengthFieldBasedFrameDecoder(
                  int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
                  int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
              this(
                      ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,
                      lengthAdjustment, initialBytesToStrip, failFast);
          }
      
          /**
           * Creates a new instance.
           *
           * @param byteOrder
           *        the {@link ByteOrder} of the length field
           * @param maxFrameLength
           *        the maximum length of the frame.  If the length of the frame is
           *        greater than this value, {@link TooLongFrameException} will be
           *        thrown.
           * @param lengthFieldOffset
           *        the offset of the length field
           * @param lengthFieldLength
           *        the length of the length field
           * @param lengthAdjustment
           *        the compensation value to add to the value of the length field
           * @param initialBytesToStrip
           *        the number of first bytes to strip out from the decoded frame
           * @param failFast
           *        If <tt>true</tt>, a {@link TooLongFrameException} is thrown as
           *        soon as the decoder notices the length of the frame will exceed
           *        <tt>maxFrameLength</tt> regardless of whether the entire frame
           *        has been read.  If <tt>false</tt>, a {@link TooLongFrameException}
           *        is thrown after the entire frame that exceeds <tt>maxFrameLength</tt>
           *        has been read.
           */
          public LengthFieldBasedFrameDecoder(
                  ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
                  int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
              if (byteOrder == null) {
                  throw new NullPointerException("byteOrder");
              }
      
              if (maxFrameLength <= 0) {
                  throw new IllegalArgumentException(
                          "maxFrameLength must be a positive integer: " +
                          maxFrameLength);
              }
      
              if (lengthFieldOffset < 0) {
                  throw new IllegalArgumentException(
                          "lengthFieldOffset must be a non-negative integer: " +
                          lengthFieldOffset);
              }
      
              if (initialBytesToStrip < 0) {
                  throw new IllegalArgumentException(
                          "initialBytesToStrip must be a non-negative integer: " +
                          initialBytesToStrip);
              }
      
              if (lengthFieldOffset > maxFrameLength - lengthFieldLength) {
                  throw new IllegalArgumentException(
                          "maxFrameLength (" + maxFrameLength + ") " +
                          "must be equal to or greater than " +
                          "lengthFieldOffset (" + lengthFieldOffset + ") + " +
                          "lengthFieldLength (" + lengthFieldLength + ").");
              }
      
              this.byteOrder = byteOrder;
              this.maxFrameLength = maxFrameLength;
              this.lengthFieldOffset = lengthFieldOffset;
              this.lengthFieldLength = lengthFieldLength;
              this.lengthAdjustment = lengthAdjustment;
              lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
              this.initialBytesToStrip = initialBytesToStrip;
              this.failFast = failFast;
          }
      
          @Override
          protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
              Object decoded = decode(ctx, in);
              if (decoded != null) {
                  out.add(decoded);
              }
          }
      
          private void discardingTooLongFrame(ByteBuf in) {
              long bytesToDiscard = this.bytesToDiscard;
              int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
              in.skipBytes(localBytesToDiscard);
              bytesToDiscard -= localBytesToDiscard;
              this.bytesToDiscard = bytesToDiscard;
      
              failIfNecessary(false);
          }
      
          private static void failOnNegativeLengthField(ByteBuf in, long frameLength, int lengthFieldEndOffset) {
              in.skipBytes(lengthFieldEndOffset);
              throw new CorruptedFrameException(
                 "negative pre-adjustment length field: " + frameLength);
          }
      
          private static void failOnFrameLengthLessThanLengthFieldEndOffset(ByteBuf in,
                                                                            long frameLength,
                                                                            int lengthFieldEndOffset) {
              in.skipBytes(lengthFieldEndOffset);
              throw new CorruptedFrameException(
                 "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
          }
      
          private void exceededFrameLength(ByteBuf in, long frameLength) {
              long discard = frameLength - in.readableBytes();
              tooLongFrameLength = frameLength;
      
              if (discard < 0) {
                  // buffer contains more bytes then the frameLength so we can discard all now
                  in.skipBytes((int) frameLength);
              } else {
                  // Enter the discard mode and discard everything received so far.
                  discardingTooLongFrame = true;
                  bytesToDiscard = discard;
                  in.skipBytes(in.readableBytes());
              }
              failIfNecessary(true);
          }
      
          private static void failOnFrameLengthLessThanInitialBytesToStrip(ByteBuf in,
                                                                           long frameLength,
                                                                           int initialBytesToStrip) {
              in.skipBytes((int) frameLength);
              throw new CorruptedFrameException(
                 "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
          }
      
          /**
           * Create a frame out of the {@link ByteBuf} and return it.
           *
           * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
           * @param   in              the {@link ByteBuf} from which to read data
           * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
           *                          be created.
           */
          protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
              if (discardingTooLongFrame) {
                  discardingTooLongFrame(in);
              }
      
              if (in.readableBytes() < lengthFieldEndOffset) {
                  return null;
              }
      
              int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
              long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
      
              if (frameLength < 0) {
                  failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
              }
      
              frameLength += lengthAdjustment + lengthFieldEndOffset;
      
              if (frameLength < lengthFieldEndOffset) {
                  failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
              }
      
              if (frameLength > maxFrameLength) {
                  exceededFrameLength(in, frameLength);
                  return null;
              }
      
              // never overflows because it's less than maxFrameLength
              int frameLengthInt = (int) frameLength;
              if (in.readableBytes() < frameLengthInt) {
                  return null;
              }
      
              if (initialBytesToStrip > frameLengthInt) {
                  failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
              }
              in.skipBytes(initialBytesToStrip);
      
              // extract frame
              int readerIndex = in.readerIndex();
              int actualFrameLength = frameLengthInt - initialBytesToStrip;
              ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
              in.readerIndex(readerIndex + actualFrameLength);
              return frame;
          }
      
          /**
           * Decodes the specified region of the buffer into an unadjusted frame length.  The default implementation is
           * capable of decoding the specified region into an unsigned 8/16/24/32/64 bit integer.  Override this method to
           * decode the length field encoded differently.  Note that this method must not modify the state of the specified
           * buffer (e.g. {@code readerIndex}, {@code writerIndex}, and the content of the buffer.)
           *
           * @throws DecoderException if failed to decode the specified region
           */
          protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
              buf = buf.order(order);
              long frameLength;
              switch (length) {
              case 1:
                  frameLength = buf.getUnsignedByte(offset);
                  break;
              case 2:
                  frameLength = buf.getUnsignedShort(offset);
                  break;
              case 3:
                  frameLength = buf.getUnsignedMedium(offset);
                  break;
              case 4:
                  frameLength = buf.getUnsignedInt(offset);
                  break;
              case 8:
                  frameLength = buf.getLong(offset);
                  break;
              default:
                  throw new DecoderException(
                          "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
              }
              return frameLength;
          }
      
          private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
              if (bytesToDiscard == 0) {
                  // Reset to the initial state and tell the handlers that
                  // the frame was too large.
                  long tooLongFrameLength = this.tooLongFrameLength;
                  this.tooLongFrameLength = 0;
                  discardingTooLongFrame = false;
                  if (!failFast || firstDetectionOfTooLongFrame) {
                      fail(tooLongFrameLength);
                  }
              } else {
                  // Keep discarding and notify handlers if necessary.
                  if (failFast && firstDetectionOfTooLongFrame) {
                      fail(tooLongFrameLength);
                  }
              }
          }
      
          /**
           * Extract the sub-region of the specified buffer.
           * <p>
           * If you are sure that the frame and its content are not accessed after
           * the current {@link #decode(ChannelHandlerContext, ByteBuf)}
           * call returns, you can even avoid memory copy by returning the sliced
           * sub-region (i.e. <tt>return buffer.slice(index, length)</tt>).
           * It's often useful when you convert the extracted frame into an object.
           * Refer to the source code of {@link ObjectDecoder} to see how this method
           * is overridden to avoid memory copy.
           */
          protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
              return buffer.retainedSlice(index, length);
          }
      
          private void fail(long frameLength) {
              if (frameLength > 0) {
                  throw new TooLongFrameException(
                                  "Adjusted frame length exceeds " + maxFrameLength +
                                  ": " + frameLength + " - discarded");
              } else {
                  throw new TooLongFrameException(
                                  "Adjusted frame length exceeds " + maxFrameLength +
                                  " - discarding");
              }
          }
      }

      通过源码我们可以知道核心方法是:

       protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
              if (discardingTooLongFrame) {
                  discardingTooLongFrame(in);
              }
      
              if (in.readableBytes() < lengthFieldEndOffset) {
                  return null;
              }
      
              int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
              long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
      
              if (frameLength < 0) {
                  failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
              }
      
              frameLength += lengthAdjustment + lengthFieldEndOffset;
      
              if (frameLength < lengthFieldEndOffset) {
                  failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
              }
      
              if (frameLength > maxFrameLength) {
                  exceededFrameLength(in, frameLength);
                  return null;
              }
      
              // never overflows because it's less than maxFrameLength
              int frameLengthInt = (int) frameLength;
              if (in.readableBytes() < frameLengthInt) {
                  return null;
              }
      
              if (initialBytesToStrip > frameLengthInt) {
                  failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
              }
              in.skipBytes(initialBytesToStrip);
      
              // extract frame
              int readerIndex = in.readerIndex();
              int actualFrameLength = frameLengthInt - initialBytesToStrip;
              ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
              in.readerIndex(readerIndex + actualFrameLength);
              return frame;
          }

      所以根据自己得协议,写了一个实现方式:

      public class MessageEncoder extends ByteToMessageDecoder {
          
           @Override
              protected final void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
                  // 可读长度必须大于基本长度  
                  if (buffer.readableBytes() >= 12) {  
                            
                      // 记录包头开始的index  
                      int beginReader = buffer.readerIndex();  
                      // 标记包头开始的index  
                      buffer.markReaderIndex();  
                      // 读到了协议的开始标志,结束while循环  
                      String startDataStr = buffer.getByte(0)+""+buffer.getByte(1)+"";                    
                      if (startDataStr.equals("7188")) {  
      
                      } else {
                          buffer.skipBytes(buffer.readableBytes());  
                          return;
                      }  
            
                      // 消息的长度  
                      // String dataLen = buffer.getByte(10)+""+buffer.getByte(11)+"";
                      String commond = ByteBufUtil.hexDump(buffer);
                      String str = commond.substring(20,24);
                      Integer length = Integer.parseInt(str.substring(2,4)+str.substring(0,2),16); 
                      System.out.println("内容区数据包长度:"+length);
                      // 判断请求数据包数据是否到齐  
                      if (buffer.readableBytes() < length+14) {  
                          // 还原读指针  
                          buffer.readerIndex(beginReader);  
                          return;  
                      }  
                      // 读取data数据  
                      //buffer.readBytes(length+14);  
                      out.add(buffer.readSlice(length+14).retain());  
                  }  
              
              }
      
      }
      public void start() throws Exception {
              new Thread(new Runnable() {
      
                  @Override
                  public void run() {
      
                      EventLoopGroup group = new NioEventLoopGroup();
                      EventLoopGroup workerGroup = new NioEventLoopGroup();
                      try {
                          ServerBootstrap sb = new ServerBootstrap();
                          sb.group(group, workerGroup) // 绑定线程池
                                  .channel(NioServerSocketChannel.class) // 指定使用的channel
                                  .localAddress(port)// 绑定监听端口
                                  .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
      
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          System.out.println("connected...; Client:" + ch.remoteAddress());
                                          ch.pipeline()/*
                                                           * .addLast("logging",new
                                                           * LoggingHandler(LogLevel.
                                                           * ERROR ))
                                                           */
                                                  .addLast(new MessageEncoder())
                                                  .addLast(new EchoServerHandler()); // 客户端触发操作
                                      }
                                  });
                          ChannelFuture cf;
      
                          cf = sb.bind().sync();
                          cf.channel().closeFuture().sync(); // 关闭服务器通道
                          System.out.println(EchoServer.class + " started and listen on " + cf.channel().localAddress());
                      } catch (InterruptedException e) {
                          logger.error("listen Exception,the msg is >>"+e);
                          e.printStackTrace();
                      } // 服务器异步创建绑定
                      finally {
                          group.shutdownGracefully();
                          workerGroup.shutdownGracefully();
                      }
      
                  }
              }).start();
          }

      这个写法是基于自定义协议,然后又是非规范协议的操作来实现的。比如类似下面16进制组合ascii码的协议:
      475831393530303810d901000069da

  • 相关阅读:
    JSON
    必须使用角色管理工具 安装或配置microsoft.net framework 3.5
    Backbone.js之view篇(三)
    MSDN Webcast 资源
    JS获取select 当前选种值
    我理解的js闭包
    javascript基础温习(一)
    js动态添加删除行
    delphi 版 sqlHelper第二版
    2012末日没有到来,继续我们的2013
  • 原文地址:https://www.cnblogs.com/zcsheng/p/12910665.html
Copyright © 2011-2022 走看看