zoukankan      html  css  js  c++  java
  • Netty中粘包和拆包的解决方案

    粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

    TCP粘包和拆包

    TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    img

    如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

    1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
    2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
    3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
    4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

    如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

    TCP粘包和拆包产生的原因

    数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

    img

    详细来说,造成粘包和拆包的原因主要有以下三个:

    1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
    2. 进行MSS大小的TCP分段
    3. 以太网帧的payload大于MTU进行IP分片。

    img

    粘包和拆包的解决方法

    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

    1. 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
    2. 将回车换行符作为消息结束符
    3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
    4. 通过在消息头中定义长度字段来标识消息的总长度

    Netty中的粘包和拆包解决方案

    针对上一小节描述的粘包和拆包的解决方案,对于拆包问题比较简单,用户可以自己定义自己的编码器进行处理,Netty并没有提供相应的组件。对于粘包的问题,由于拆包比较复杂,代码比较处理比较繁琐,Netty提供了4种解码器来解决,分别如下:

    1. 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
    2. 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
    3. 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
    4. 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

    以上解码器在使用时只需要添加到Netty的责任链中即可,大多数情况下这4种解码器都可以满足了,当然除了以上4种解码器,用户也可以自定义自己的解码器进行处理。具体可以参考以下代码示例:

    // Server主程序
    public class XNettyServer {
      public static void main(String[] args) throws Exception {
        // accept 处理连接的线程池
        NioEventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 处理数据的线程池
        NioEventLoopGroup readGroup = new NioEventLoopGroup();
        try {
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          serverBootstrap
              .group(acceptGroup, readGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(
                  new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                      ChannelPipeline pipeline = ch.pipeline();
    
                      // 增加解码器
                      pipeline.addLast(new XDecoder());
    
                      // 打印出内容 handdler
                      pipeline.addLast(new XHandler());
                    }
                  });
          System.out.println("启动成功,端口 7777");
          serverBootstrap.bind(7777).sync().channel().closeFuture().sync();
        } finally {
          acceptGroup.shutdownGracefully();
          readGroup.shutdownGracefully();
        }
      }
    }
    
    // 解码器
    public class XDecoder extends ByteToMessageDecoder {
    
      static final int PACKET_SIZE = 220;
    
      // 用来临时保留没有处理过的请求报文
      ByteBuf tempMsg = Unpooled.buffer();
    
      /**
       * @param ctx
       * @param in 请求的数据
       * @param out 将粘在一起的报文拆分后的结果保留起来
       * @throws Exception
       */
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());
    
        // 合并报文
        ByteBuf message = null;
        int tmpMsgSize = tempMsg.readableBytes();
        // 如果暂存有上一次余下的请求报文,则合并
        if (tmpMsgSize > 0) {
          message = Unpooled.buffer();
          message.writeBytes(tempMsg);
          message.writeBytes(in);
          System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
        } else {
          message = in;
        }
    
        int size = message.readableBytes();
        int counter = size / PACKET_SIZE;
        for (int i = 0; i < counter; i++) {
          byte[] request = new byte[PACKET_SIZE];
          // 每次从总的消息中读取220个字节的数据
          message.readBytes(request);
    
          // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
          out.add(Unpooled.copiedBuffer(request));
        }
    
        // 多余的报文存起来
        // 第一个报文: i+  暂存
        // 第二个报文: 1 与第一次
        size = message.readableBytes();
        if (size != 0) {
          System.out.println("多余的数据长度:" + size);
          // 剩下来的数据放到tempMsg暂存
          tempMsg.clear();
          tempMsg.writeBytes(message.readBytes(size));
        }
      }
    }
    
    // 处理器
    public class XHandler extends ChannelInboundHandlerAdapter {
    
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
      }
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] content = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(content);
        System.out.println(Thread.currentThread() + ": 最终打印" + new String(content));
        ((ByteBuf) msg).release();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
      }
    }
    

  • 相关阅读:
    模板 无源汇上下界可行流 loj115
    ICPC2018JiaozuoE Resistors in Parallel 高精度 数论
    hdu 2255 奔小康赚大钱 最佳匹配 KM算法
    ICPC2018Beijing 现场赛D Frog and Portal 构造
    codeforce 1175E Minimal Segment Cover ST表 倍增思想
    ICPC2018Jiaozuo 现场赛H Can You Solve the Harder Problem? 后缀数组 树上差分 ST表 口胡题解
    luogu P1966 火柴排队 树状数组 逆序对 离散化
    luogu P1970 花匠 贪心
    luogu P1967 货车运输 最大生成树 倍增LCA
    luogu P1315 观光公交 贪心
  • 原文地址:https://www.cnblogs.com/coding-diary/p/11650686.html
Copyright © 2011-2022 走看看