zoukankan      html  css  js  c++  java
  • NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

    1、粘包与段包

    粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
    造成的可能原因:

        发送端需要等缓冲区满才发送出去,造成粘包

        接收方不及时接收缓冲区的包,造成多个包接收

    断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。

    2、消息传输的格式

    消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束。

    消息头+消息体    即固定长度的消息,前几个字节为消息头,后面的是消息头。

    在MINA中用的是

    消息长度+消息体 即前4个字节用于存储消息的长度,用于判断当前消息什么时候结束。

    3、编码与解码

       在网络中,信息的传输都是通过字节的形式传输的,而我们在编写自己的代码时,则都是具体的对象,那么要想我们的对象能够在网络中传输,就需要编码与解码。

       编码:即把我们的消息编码成二进制形式,能以字节的形式在网络中传输。

       解码:即把我们收到的字节解码成我们代码中的对象。

       在MINA中对象的编码与解码用的都是JDK提供的ObjectOutputStream来实现的。

    4、MINA中消息的处理实现

    消息的接受处理,我们常用的是TCP协议,而TCP协议会分片的,在下面的代码中,具体功能就是循环从通道里面读取数据,直到没有数据可读,或者buffer满了,然后就把接受到的数据发给解码工厂进行处理。

    4.1、消息的接收

    [java] view plain copy
     
     print?
    1. //class AbstractPollingIoProcessor  
    2. private void read(S session) {  
    3.         IoSessionConfig config = session.getConfig();  
    4.         int bufferSize = config.getReadBufferSize();  
    5.         IoBuffer buf = IoBuffer.allocate(bufferSize);  
    6.   
    7.         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();  
    8.   
    9.         try {  
    10.             int readBytes = 0;  
    11.             int ret;  
    12.   
    13.             try {  
    14.                 //是否有分片 tcp传输会有分片,即把大消息分片成多个小消息再传输  
    15.                 if (hasFragmentation) {  
    16.             //read方法非阻塞,没有读到数据的时候返回0  
    17.                     while ((ret = read(session, buf)) > 0) {  
    18.                         readBytes += ret;  
    19.                         //buffer 满了  
    20.                         if (!buf.hasRemaining()) {  
    21.                             break;  
    22.                         }  
    23.                     }  
    24.                 } else {  
    25.                     ret = read(session, buf);  
    26.   
    27.                     if (ret > 0) {  
    28.                         readBytes = ret;  
    29.                     }  
    30.                 }  
    31.             } finally {  
    32.                 buf.flip();  
    33.             }  
    34.   
    35.             if (readBytes > 0) {  
    36.                 IoFilterChain filterChain = session.getFilterChain();  
    37.         //处理消息  
    38.                 filterChain.fireMessageReceived(buf);  
    39.                 buf = null;  
    40.   
    41.                 if (hasFragmentation) {  
    42.                     if (readBytes << 1 < config.getReadBufferSize()) {  
    43.                         session.decreaseReadBufferSize();  
    44.                     } else if (readBytes == config.getReadBufferSize()) {  
    45.                         session.increaseReadBufferSize();  
    46.                     }  
    47.                 }  
    48.             }  
    49.   
    50.             if (ret < 0) {  
    51.                 scheduleRemove(session);  
    52.             }  
    53.         } catch (Throwable e) {  
    54.             if (e instanceof IOException) {  
    55.                 if (!(e instanceof PortUnreachableException)  
    56.                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())  
    57.                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {  
    58.                     scheduleRemove(session);  
    59.                 }  
    60.             }  
    61.   
    62.             IoFilterChain filterChain = session.getFilterChain();  
    63.             filterChain.fireExceptionCaught(e);  
    64.         }  
    65.     }  



    4.2、解码与编码

    [java] view plain copy
     
     print?
    1. //class AbstractIoBuffer  
    2.  public Object getObject(final ClassLoader classLoader) throws ClassNotFoundException {  
    3.     //首先判断当前buffer中消息长度是否完整,不完整的话直接返回  
    4.         if (!prefixedDataAvailable(4)) {  
    5.             throw new BufferUnderflowException();  
    6.         }  
    7.   
    8.     //消息长度  
    9.         int length = getInt();  
    10.         if (length <= 4) {  
    11.             throw new BufferDataException("Object length should be greater than 4: " + length);  
    12.         }  
    13.   
    14.         int oldLimit = limit();  
    15.     //limit到消息结尾处  
    16.         limit(position() + length);  
    17.         try {  
    18.             ObjectInputStream in = new ObjectInputStream(asInputStream()) {  
    19.                 @Override  
    20.                 protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {  
    21.                     int type = read();  
    22.                     if (type < 0) {  
    23.                         throw new EOFException();  
    24.                     }  
    25.                     switch (type) {  
    26.                     case 0: // NON-Serializable class or Primitive types  
    27.                         return super.readClassDescriptor();  
    28.                     case 1: // Serializable class  
    29.                         String className = readUTF();  
    30.                         Class<?> clazz = Class.forName(className, true, classLoader);  
    31.                         return ObjectStreamClass.lookup(clazz);  
    32.                     default:  
    33.                         throw new StreamCorruptedException("Unexpected class descriptor type: " + type);  
    34.                     }  
    35.                 }  
    36.   
    37.                 @Override  
    38.                 protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {  
    39.                     String name = desc.getName();  
    40.                     try {  
    41.                         return Class.forName(name, false, classLoader);  
    42.                     } catch (ClassNotFoundException ex) {  
    43.                         return super.resolveClass(desc);  
    44.                     }  
    45.                 }  
    46.             };  
    47.             return in.readObject();  
    48.         } catch (IOException e) {  
    49.             throw new BufferDataException(e);  
    50.         } finally {  
    51.             limit(oldLimit);  
    52.         }  
    53.     }  
    54.   
    55. //判断当前消息是否完整   
    56. public boolean prefixedDataAvailable(int prefixLength, int maxDataLength) {  
    57.         if (remaining() < prefixLength) {  
    58.             return false;  
    59.         }  
    60.   
    61.         int dataLength;  
    62.         switch (prefixLength) {  
    63.         case 1:  
    64.             dataLength = getUnsigned(position());  
    65.             break;  
    66.         case 2:  
    67.             dataLength = getUnsignedShort(position());  
    68.             break;  
    69.         case 4:  
    70.             dataLength = getInt(position());  
    71.             break;  
    72.         default:  
    73.             throw new IllegalArgumentException("prefixLength: " + prefixLength);  
    74.         }  
    75.   
    76.         if (dataLength < 0 || dataLength > maxDataLength) {  
    77.             throw new BufferDataException("dataLength: " + dataLength);  
    78.         }  
    79.     //判断当前消息是否完整   
    80.         return remaining() - prefixLength >= dataLength;  
    81.     }  
    82.   
    83. //编码  
    84.  public IoBuffer putObject(Object o) {  
    85.         int oldPos = position();  
    86.         skip(4); // Make a room for the length field.预留4个字节用于存储消息长度  
    87.         try {  
    88.             ObjectOutputStream out = new ObjectOutputStream(asOutputStream()) {  
    89.                 @Override  
    90.                 protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {  
    91.                     try {  
    92.                         Class<?> clz = Class.forName(desc.getName());  
    93.                         if (!Serializable.class.isAssignableFrom(clz)) { // NON-Serializable class  
    94.                             write(0);  
    95.                             super.writeClassDescriptor(desc);  
    96.                         } else { // Serializable class  
    97.                             write(1);  
    98.                             writeUTF(desc.getName());  
    99.                         }  
    100.                     } catch (ClassNotFoundException ex) { // Primitive types  
    101.                         write(0);  
    102.                         super.writeClassDescriptor(desc);  
    103.                     }  
    104.                 }  
    105.             };  
    106.             out.writeObject(o);  
    107.             out.flush();  
    108.         } catch (IOException e) {  
    109.             throw new BufferDataException(e);  
    110.         }  
    111.   
    112.         // Fill the length field  
    113.         int newPos = position();  
    114.         position(oldPos);  
    115.     //存储消息长度  
    116.         putInt(newPos - oldPos - 4);  
    117.         position(newPos);  
    118.         return this;  
    119.     }  



    4.3、断包与粘包处理

    [java] view plain copy
     
     print?
    1. // class CumulativeProtocolDecoder  
    2.  public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
    3.     //是否有分片,tcp 有分片  
    4.         if (!session.getTransportMetadata().hasFragmentation()) {  
    5.             while (in.hasRemaining()) {  
    6.                 if (!doDecode(session, in, out)) {  
    7.                     break;  
    8.                 }  
    9.             }  
    10.   
    11.             return;  
    12.         }  
    13.   
    14.     // 1、断包处理  
    15.     // 2、处理粘包  
    16.         boolean usingSessionBuffer = true;  
    17.     //session中是否有断包情况(上次处理后),断包保存在session中  
    18.         IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
    19.         // If we have a session buffer, append data to that; otherwise  
    20.         // use the buffer read from the network directly.  
    21.         if (buf != null) {//有断包,则把当前包拼接到断包里面  
    22.             boolean appended = false;  
    23.             // Make sure that the buffer is auto-expanded.  
    24.             if (buf.isAutoExpand()) {  
    25.                 try {  
    26.                     buf.put(in);  
    27.                     appended = true;  
    28.                 } catch (IllegalStateException e) {  
    29.                     // A user called derivation method (e.g. slice()),  
    30.                     // which disables auto-expansion of the parent buffer.  
    31.                 } catch (IndexOutOfBoundsException e) {  
    32.                     // A user disabled auto-expansion.  
    33.                 }  
    34.             }  
    35.   
    36.             if (appended) {  
    37.                 buf.flip();  
    38.             } else {  
    39.                 // Reallocate the buffer if append operation failed due to  
    40.                 // derivation or disabled auto-expansion.  
    41.                 buf.flip();  
    42.                 IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);  
    43.                 newBuf.order(buf.order());  
    44.                 newBuf.put(buf);  
    45.                 newBuf.put(in);  
    46.                 newBuf.flip();  
    47.                 buf = newBuf;  
    48.   
    49.                 // Update the session attribute.  
    50.                 session.setAttribute(BUFFER, buf);  
    51.             }  
    52.         } else {  
    53.             buf = in;  
    54.             usingSessionBuffer = false;  
    55.         }  
    56.   
    57.     //2 粘包处理,可能buffer中有多个消息,需要多次处理(解码)每个消息,直到消息处理完,或者剩下的消息不是一个完整的消息或者buffer没有数据了  
    58.   
    59.         for (;;) {  
    60.             int oldPos = buf.position();  
    61.             boolean decoded = doDecode(session, buf, out);  
    62.             if (decoded) {//解码 成功  
    63.                 if (buf.position() == oldPos) {  
    64.                     throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");  
    65.                 }  
    66.         //buffer空了  
    67.                 if (!buf.hasRemaining()) {//buffer没有数据了  
    68.                     break;  
    69.                 }  
    70.             } else {//剩下的消息不是一个完整的消息,断包出现了  
    71.                 break;  
    72.             }  
    73.         }  
    74.   
    75.         // if there is any data left that cannot be decoded, we store  
    76.         // it in a buffer in the session and next time this decoder is  
    77.         // invoked the session buffer gets appended to  
    78.         if (buf.hasRemaining()) {//剩下的消息不是一个完整的消息,断包出现了  
    79.         //如果断包已经保存在session中,则更新buffer,没有的话,就把剩下的断包保存在session中  
    80.             if (usingSessionBuffer && buf.isAutoExpand()) {  
    81.                 buf.compact();  
    82.             } else {  
    83.                 storeRemainingInSession(buf, session);  
    84.             }  
    85.         } else {  
    86.             if (usingSessionBuffer) {  
    87.                 removeSessionBuffer(session);  
    88.             }  
    89.         }  
    90.     }  



    [java] view plain copy
     
     print?
      1. //class  ObjectSerializationDecoder  
      2.  protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
      3.     //首先判断当前buffer中消息长度是否完整,不完整的话直接返回  
      4.         if (!in.prefixedDataAvailable(4, maxObjectSize)) {  
      5.             return false;  
      6.         }  
      7.   
      8.         out.write(in.getObject(classLoader));  
      9.         return true;  
      10.     }  
  • 相关阅读:
    vscode常用插件列表
    使用docker构建supervisor全步骤
    docker删除虚悬镜像(临时镜像文件)
    消息队列的对比
    ECharts使用:this.dom.getContext is not a function
    curl命令行请求
    工作工具清单
    《SQL优化入门》讲座总结
    初始化git库并配置自动部署
    php代码进行跨域请求处理
  • 原文地址:https://www.cnblogs.com/duanxz/p/6754610.html
Copyright © 2011-2022 走看看