zoukankan      html  css  js  c++  java
  • Netty笔记(6)

    Netty 中 TCP 粘包拆包问题

    信息通过tcp传输过程中出现的状况 .

    TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送

    产生粘包和拆包问题的主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

    入图所示:

    上图中演示了粘包和拆包的三种情况:

    • D1和D2两个包都刚好满足TCP缓冲区的大小,或者说其等待时间已经达到TCP等待时长,从而还是使用两个独立的包进行发送;
    • D1和D2两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;
    • 某一个包比较大,因而将其拆分为两个包D*_1和D*_2进行发送,而这里由于拆分后的某一个包比较小,其又与另一个包合并在一起发送。

    发生这种情况的代码:

    客户端发送数据 快速的发送 10条数据 :

    public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private int count;
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //使用客户端发送10条数据 hello,server 编号
            for(int i= 0; i< 10; ++i) {
                ByteBuf buffer = Unpooled.copiedBuffer("hello,server " +i, Charset.forName("utf-8"));
                ctx.writeAndFlush(buffer);
            }
        }
    
    }
    
    

    服务端接受打印:

    服务器接收到数据 hello,server 0
    服务器接收到数据 hello,server 1
    服务器接收到数据 hello,server 2hello,server 3
    服务器接收到数据 hello,server 4hello,server 5
    服务器接收到数据 hello,server 6
    服务器接收到数据 hello,server 7hello,server 8
    服务器接收到数据 hello,server 9
    

    很明显 其中有三条记录被粘在其他数据上,这就是TCP的粘包拆包现象

    怎么解决:

    1. Netty自带的 解决方案:

      • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小

      • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分

      • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分

      • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

    FixedLengthFrameDecoder 解码器

    服务端 添加 FixedLengthFrameDecoder 解码器 并指定长度

    public class EchoServer {
    
    
    
      public static void main(String[] args) throws InterruptedException {
    
          EventLoopGroup bossGroup = new NioEventLoopGroup();
          EventLoopGroup workerGroup = new NioEventLoopGroup();
          try {
              ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap.group(bossGroup, workerGroup)
                      .channel(NioServerSocketChannel.class)
                      .option(ChannelOption.SO_BACKLOG, 1024)
                      .childHandler(new ChannelInitializer<SocketChannel>() {
                          @Override
                          protected void initChannel(SocketChannel ch) throws Exception {
                              //指定长度为9 则每次截取长度为9的字节 
                              ch.pipeline().addLast(new FixedLengthFrameDecoder(9));
                             // 将 每次截取的字节编码为字符串
                              ch.pipeline().addLast(new StringDecoder());
    						//自定义处理类打印
                              ch.pipeline().addLast(new EchoServerHandler());
                          }
                      });
    
              ChannelFuture future = bootstrap.bind(8000).sync();
              future.channel().closeFuture().sync();
          } finally {
              bossGroup.shutdownGracefully();
              workerGroup.shutdownGracefully();
          }
      }
    }
    

    自定义服务端Handler 打印字符串:

    public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
    
        @Override
      protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("message: " + msg.trim());
      }
    }
    

    客户端发送信息 并添加字符串编码器 将信息已字符串的形式编码:

    public class EchoClient {
    
    
    
      public static void main(String[] args) throws InterruptedException {
          EventLoopGroup group = new NioEventLoopGroup();
          try {
              Bootstrap bootstrap = new Bootstrap();
              bootstrap.group(group)
                      .channel(NioSocketChannel.class)
                      .option(ChannelOption.TCP_NODELAY, true)
                      .handler(new ChannelInitializer<SocketChannel>() {
                          @Override
                          protected void initChannel(SocketChannel ch) throws Exception {
                              
                              ch.pipeline().addLast(new StringEncoder());
                              ch.pipeline().addLast(new EchoClientHandler());
                          }
                      });
    
              ChannelFuture future = bootstrap.connect("127.0.0.1", 8000).sync();
              future.channel().closeFuture().sync();
          } finally {
              group.shutdownGracefully();
          }
      }
    }
    

    客户端Handler 发送信息 刚好长度为9 :

    public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
    
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("123456789");
      }
    }
    

    总结: FixedLengthFrameDecoder 解码器 将按照指定长度截取字节 并添加到List中向后传递 , 以本案例为例,如果字节数刚好为9,则全部打印,如果 字节数为18, 则拆分打印两次,如果为19 则最后一个字节不打印,如果不足9 则什么都不打印.

    LineBasedFrameDecoder 行拆分器

    通过行换行符 或者 进行分割,

    将上面案例的FixedLengthFrameDecoder 解码器 换成 LineBasedFrameDecoder

    并指定 截取每段的最大长度 (超过报错 不往后传递)

    ...
        
    .childHandler(new ChannelInitializer<SocketChannel>() {
                          @Override
                          protected void initChannel(SocketChannel ch) throws Exception {
    
    //                          ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                              ch.pipeline().addLast(new                       LineBasedFrameDecoder(5));
                              // 将前一步解码得到的数据转码为字符串
                              ch.pipeline().addLast(new StringDecoder());
    //                           最终的数据处理
                              ch.pipeline().addLast(new EchoServerHandler());
                          }
                      });
    
    ...
    

    客户端Handler 发送字符串, 最后的"1234" 不会打印,,

    @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("1
    123456
    1234");
      }
    

    服务端接收并打印结果 分别打印了 "1" 和 "1234" 而超过字节长度5 的 "123456"则报出TooLongFrameException错误

    server receives message: 1
    
    An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
    io.netty.handler.codec.TooLongFrameException: frame length (6) exceeds the allowed maximum (5)
    
    server receives message: 1234
    

    DelimiterBasedFrameDecoder 自定义分割符

    和行分割符类似, 此解码器可以自定义分割符,常用构造方法:

     public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
    

    接收一个最大长度,和 任意个数的 分隔符(用ByteBuf的形式传入),解码器识别到任意一个 分割符 都会进行拆分

    注册解码器:

    传入 "$" 和 "*" 作为分割符,并指定最大长度为 5个字节

    .childHandler(new ChannelInitializer<SocketChannel>() {
                          @Override
                          protected void initChannel(SocketChannel ch) throws Exception {
    
                              ch.pipeline().addLast(new DelimiterBasedFrameDecoder(5,
                                  Unpooled.wrappedBuffer("$".getBytes()),Unpooled.wrappedBuffer("*".getBytes())));
                              // 将前一步解码得到的数据转码为字符串
                              ch.pipeline().addLast(new StringDecoder());
                              
    //                           最终的数据处理
                              ch.pipeline().addLast(new EchoServerHandler());
                          }
                      });
    

    客户端 发送数据:

    @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
              ctx.writeAndFlush("1$123456*1234$789$");
      }
    

    服务端只打印了 "1" 当解析到 "123456" 时 就报错了 后面就没有再解析了,会缓存着 等到该通道关闭 或者有后续数据发送过来时 才继续解析

    LengthFieldBasedFrameDecoder

    自定义数据长度,发送的 字节数组中 包含 描述 数据长度的字段 和 数据本身,

    解码过程

    常用字段:

    • maxFrameLength:指定了每个包所能传递的最大数据包大小,(上图中的最大长度为11)
    • lengthFieldOffset:指定了长度字段在字节码中的偏移量;(11这个描述长度的数据是在数组的第几位开始)
    • lengthFieldLength:指定了长度字段所占用的字节长度;(11 占 1个字节)
    • lengthAdjustment: 长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。 ( 11 这个域 不光光描述 Hello,world, 一般设置为0,)
    • initialBytesToStrip : 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有1个节点的长度域,则它的值为1. ( 如果为0代表不丢弃,则将长度域也向后传递)

    服务端添加 解码器:

    • 最大长度 为 长度描述域 的值11 + 长度描述域本身占用的长度 1 = 12
    • 长度描述域放在数据包的第一位, 没有偏移 为0
    • 长度描述域 长度为1
    • 无需矫正
    • 一个字节也不丢弃
    .childHandler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为20
                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(12,0,1,0,0));
                // 将前一步解码得到的数据转码为字符串
    
                ch.pipeline().addLast(new StringDecoder());
                // 最终的数据处理
                ch.pipeline().addLast(new EchoServerHandler());
              }
            });
    

    客户端发送数据 发送最Netty 底层操作 的ByteBuf对象 发送时 无需任何编码:

     @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeByte(11);
        buffer.writeBytes("Hello,World".getBytes());
        ctx.writeAndFlush(buffer);
      }
    

    服务端接收数据为 (11代表的制表符)Hello,World

    这样发送 每次都要计算 数据长度,并手动添加到 数据的前面,很不方便 配合LengthFieldPrepender 使用,这个编码码器可以计算 长度,并自动添加到 数据的前面

    改造客户端 先拦截数据按字符串编码,再计算字节长度 添加 长度描述字段 并占用一个字节 (这个长度要与客户端的解码器 lengthFieldLength参数 值保持一致) :

     .handler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
    
                ch.pipeline().addLast(new LengthFieldPrepender(1));
                ch.pipeline().addLast(new StringEncoder());
                // 客户端发送消息给服务端,并且处理服务端响应的消息
                ch.pipeline().addLast(new EchoClientHandler());
              }
            });
    

    客户端发送 有字符串编码器 可以直接发送字符串:

      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("Hello,World");
      }
    

    自定义协议

    上面介绍的 各种解码器 已经可以应付绝大多数场景, 如果遇到 特殊的状况 我们也可以自定义协议

    定义 协议对象:

    //协议包
    public class MessageProtocol {
        private int len; //关键
        private byte[] content;
    
        public int getLen() {
            return len;
        }
    
        public void setLen(int len) {
            this.len = len;
        }
    
        public byte[] getContent() {
            return content;
        }
    
        public void setContent(byte[] content) {
            this.content = content;
        }
    }
    
    

    客户端发送:

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
           
            for(int i = 0; i< 5; i++) {
                String mes = "Hello,World";
                byte[] content = mes.getBytes(Charset.forName("utf-8"));
                int length = mes.getBytes(Charset.forName("utf-8")).length;
    
                //创建协议包对象
                MessageProtocol messageProtocol = new MessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                ctx.writeAndFlush(messageProtocol);
    
            }
        }
    

    该协议的 自定义 编码器 将协议包发送出去:

    public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
        @Override
        protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
            System.out.println("MyMessageEncoder encode 方法被调用");
            out.writeInt(msg.getLen());
            out.writeBytes(msg.getContent());
        }
    }
    

    将客户端 发送数据的Handler 和 编码器 注册 这里就不写了

    服务端解码器 读取长度 并 判断可读数据的长度是否足够 :

    public class MyMessageDecoder extends ReplayingDecoder<Void> {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            
            in.markReaderIndex();
            
            //读取长度
            int length = in.readInt();
            //如果可读长度大于 数据长度 说明数据完整
            if (in.readableBytes()>length){
                byte[] content = new byte[length];
                in.readBytes(content);
                //封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
                MessageProtocol messageProtocol = new MessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                out.add(messageProtocol);
            }else{
                //如果数据不够长 将已经读过的的int 数据还原回去 留下次读取
                in.resetReaderIndex();
            }
        }
    }
    

    服务端成功读取:

    本例中存在很多问题, 明白这个意思就行, 感兴趣的话 可以 自己动手优化

  • 相关阅读:
    FTPClient使用中的问题--获取当前工作目录为null
    MGR安装
    脚本在Shell可以执行成功,放到crontab里执行失败
    使用Python通过SMTP发送邮件
    MySQL Router
    事务管理(ACID)
    mysqldump使用
    MySQL InnoDB Cluster
    Linux LVM逻辑卷配置过程详解(创建、扩展、缩减、删除、卸载、快照创建)
    centos命令行控制电脑发出滴滴声
  • 原文地址:https://www.cnblogs.com/xjwhaha/p/13570339.html
Copyright © 2011-2022 走看看