zoukankan      html  css  js  c++  java
  • Netty中粘包和拆包的解决方案

    粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

    TCP粘包和拆包

    TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    img

    如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

    1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
    2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
    3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
    4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

    如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

    TCP粘包和拆包产生的原因

    数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

    img

    详细来说,造成粘包和拆包的原因主要有以下三个:

    1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
    2. 进行MSS大小的TCP分段
    3. 以太网帧的payload大于MTU进行IP分片。

    img

    粘包和拆包的解决方法

    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

    1. 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
    2. 将回车换行符作为消息结束符
    3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
    4. 通过在消息头中定义长度字段来标识消息的总长度

    Netty中的粘包和拆包解决方案

    针对上一小节描述的粘包和拆包的解决方案,对于拆包问题比较简单,用户可以自己定义自己的编码器进行处理,Netty并没有提供相应的组件。对于粘包的问题,由于拆包比较复杂,代码比较处理比较繁琐,Netty提供了4种解码器来解决,分别如下:

    1. 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
    2. 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
    3. 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
    4. 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

    以上解码器在使用时只需要添加到Netty的责任链中即可,大多数情况下这4种解码器都可以满足了,当然除了以上4种解码器,用户也可以自定义自己的解码器进行处理。具体可以参考以下代码示例:

    // Server主程序
    public class XNettyServer {
      public static void main(String[] args) throws Exception {
        // accept 处理连接的线程池
        NioEventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 处理数据的线程池
        NioEventLoopGroup readGroup = new NioEventLoopGroup();
        try {
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          serverBootstrap
              .group(acceptGroup, readGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(
                  new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                      ChannelPipeline pipeline = ch.pipeline();
    
                      // 增加解码器
                      pipeline.addLast(new XDecoder());
    
                      // 打印出内容 handdler
                      pipeline.addLast(new XHandler());
                    }
                  });
          System.out.println("启动成功,端口 7777");
          serverBootstrap.bind(7777).sync().channel().closeFuture().sync();
        } finally {
          acceptGroup.shutdownGracefully();
          readGroup.shutdownGracefully();
        }
      }
    }
    
    // 解码器
    public class XDecoder extends ByteToMessageDecoder {
    
      static final int PACKET_SIZE = 220;
    
      // 用来临时保留没有处理过的请求报文
      ByteBuf tempMsg = Unpooled.buffer();
    
      /**
       * @param ctx
       * @param in 请求的数据
       * @param out 将粘在一起的报文拆分后的结果保留起来
       * @throws Exception
       */
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());
    
        // 合并报文
        ByteBuf message = null;
        int tmpMsgSize = tempMsg.readableBytes();
        // 如果暂存有上一次余下的请求报文,则合并
        if (tmpMsgSize > 0) {
          message = Unpooled.buffer();
          message.writeBytes(tempMsg);
          message.writeBytes(in);
          System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
        } else {
          message = in;
        }
    
        int size = message.readableBytes();
        int counter = size / PACKET_SIZE;
        for (int i = 0; i < counter; i++) {
          byte[] request = new byte[PACKET_SIZE];
          // 每次从总的消息中读取220个字节的数据
          message.readBytes(request);
    
          // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
          out.add(Unpooled.copiedBuffer(request));
        }
    
        // 多余的报文存起来
        // 第一个报文: i+  暂存
        // 第二个报文: 1 与第一次
        size = message.readableBytes();
        if (size != 0) {
          System.out.println("多余的数据长度:" + size);
          // 剩下来的数据放到tempMsg暂存
          tempMsg.clear();
          tempMsg.writeBytes(message.readBytes(size));
        }
      }
    }
    
    // 处理器
    public class XHandler extends ChannelInboundHandlerAdapter {
    
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
      }
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] content = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(content);
        System.out.println(Thread.currentThread() + ": 最终打印" + new String(content));
        ((ByteBuf) msg).release();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
      }
    }
    

  • 相关阅读:
    《Linux内核设计与实现》读书笔记(二)- 内核开发的准备
    《Linux内核设计与实现》读书笔记(一)-内核简介
    Redis常用命令
    redis——学习之路五(简单的C#使用redis)
    Redis——学习之路四(初识主从配置)
    Redis——学习之路三(初识redis config配置)
    Redis——学习之路二(初识redis服务器命令)
    Redis——学习之路一(初识redis)
    SQL Server 查询分析器提供的所有快捷方式(快捷键)
    降维中的特征选择(转)
  • 原文地址:https://www.cnblogs.com/coding-diary/p/11650686.html
Copyright © 2011-2022 走看看