zoukankan      html  css  js  c++  java
  • netty权威指南学习笔记三——TCP粘包/拆包之粘包现象

      TCP是个流协议,流没有一定界限。TCP底层不了解业务,他会根据TCP缓冲区的实际情况进行包划分,在业务上,一个业务完整的包,可能会被TCP底层拆分为多个包进行发送,也可能多个小包组合成一个大的数据包进行发送,这就是TCP的拆包和粘包。

      产生问题的原因

    •   应用程序write写入的字节大小大于套接字缓冲区大小
    •   进行MSS大小的TCP分割;
    •   以太网帧的payload大于MTU进行IP分片

    关于MSS和MTU的分段问题可以参考https://www.cnblogs.com/yuyutianxia/p/8120741.html学习。

      解决策略

      由于底层的TCP无法理解上层的业务数据,所以在底层无法保证数据包不被拆分和重组,要解决这个问题就需要应用层进行人为干预,主要方案有:

    •   消息定长,例如每个报文大小为固定长度200字节,如果不够,空位补空格;
    •   在包尾增加回车换行符进行分割,例如FTP协议;
    •   将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息的总长度;
    •   其他更复杂的应用层协议。

    下面模拟粘包的场景,即在服务端去掉分隔符,而客户端发送大量信息过来

    TimeServer 服务端代码

     1 package com.StickyUnpack;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.channel.Channel;
     5 import io.netty.channel.ChannelFuture;
     6 import io.netty.channel.ChannelInitializer;
     7 import io.netty.channel.ChannelOption;
     8 import io.netty.channel.nio.NioEventLoopGroup;
     9 import io.netty.channel.socket.nio.NioServerSocketChannel;
    10 
    11 public class TimeServer {
    12     public void bind(int port) throws InterruptedException {
    13         NioEventLoopGroup bossGroup = new NioEventLoopGroup();//创建接收请求的线程组
    14         NioEventLoopGroup workGroup = new NioEventLoopGroup();//创建处理IO的线程组
    15         try {
    16             ServerBootstrap b = new ServerBootstrap();//创建NIO服务启动辅助类
    17             b.group(bossGroup,workGroup)//将两个线程组传递到NIO服务辅助启动类中
    18                     .channel(NioServerSocketChannel.class)//通过反射创建1一个severSocketChannel对象
    19                     .option(ChannelOption.SO_BACKLOG,1024)//BACKLOG用于构造服务端套接字ServerSocket对象,
    20                     // 标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。
    21                     // 如果未设置或所设置的值小于1,Java将使用默认值50。
    22                     .childHandler(new ChildHandler());//设置绑定的IO处理类,用来处理网络IO事件
    23 //          绑定端口,同步等待成功,返回ChannelFutrue,用于异步操作通知回调
    24             ChannelFuture f = b.bind(port).sync();
    25 //          等待服务端监听端口关闭
    26             f.channel().closeFuture().sync();
    27         }finally {
    28 //            优雅的退出,释放线程池资源
    29             bossGroup.shutdownGracefully();
    30             workGroup.shutdownGracefully();
    31         }
    32 
    33     }
    34 //    IO处理类的初始化
    35     private class ChildHandler extends ChannelInitializer {
    36         @Override
    37         protected void initChannel(Channel channel) throws Exception {
    38             channel.pipeline().addLast(new TimeServerHandler());
    39         }
    40     }
    41 
    42     public static void main(String[] args) throws InterruptedException {
    43         int port = 8080;
    44         try {
    45             if(args.length>0&&args[0]!=null){
    46                 System.out.println(args[0]);
    47                 port = Integer.valueOf(args[0]);
    48             }
    49         } catch (Exception e) {
    50             port = 8080;
    51         }
    52         new TimeServer().bind(port);
    53     }
    54 }

    服务端处理IO代码

     1 package com.StickyUnpack;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.buffer.Unpooled;
     5 import io.netty.channel.ChannelHandlerContext;
     6 import io.netty.channel.ChannelInboundHandlerAdapter;
     7 
     8 import java.util.Date;
     9 
    10 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    11     int count;
    12     @Override
    13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    14         ByteBuf byteBuf = (ByteBuf) msg;//将请求转为ByteBuf缓冲区
    15         byte[] req = new byte[byteBuf.readableBytes()];//获取byteBuf的可读字节数
    16         byteBuf.readBytes(req);//将缓冲区字节数组复制到req数组中
    17         String body = new String(req,"utf-8")//转换为字符串
    18                 //改造去掉客户端传递过来的换行符号,模拟故障造成粘包问题
    19              .substring(0,req.length-System.lineSeparator().length());
    20         System.out.println("the time server receive order:"+body+"the count is:"+ ++count);
    21 //      处理IO内容
    22         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
    23                 ?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
    24         currentTime = currentTime+System.getProperty("line.separator");
    25         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());//返回客户端的消息转化为ByteBuf对象
    26         ctx.write(resp);//将待应答消息放入缓冲数组中
    27     }
    28 
    29     @Override
    30     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    31         ctx.flush();//将应答消息写入SocketChannel中
    32     }
    33 
    34     @Override
    35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    36         ctx.close();
    37     }
    38 }

    客户发送请求端代码

     1 package com.StickyUnpack;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.ChannelInitializer;
     6 import io.netty.channel.ChannelOption;
     7 import io.netty.channel.nio.NioEventLoopGroup;
     8 import io.netty.channel.socket.SocketChannel;
     9 import io.netty.channel.socket.nio.NioSocketChannel;
    10 
    11 public class TimeClient {
    12     public void connect(String host,int port){
    13         NioEventLoopGroup workGroup = new NioEventLoopGroup();
    14         Bootstrap b = new Bootstrap();
    15         try {
    16         b.group(workGroup).channel(NioSocketChannel.class)
    17                 .option(ChannelOption.TCP_NODELAY,true)
    18                 .handler(new ClientChildHandler());
    19         ChannelFuture f = b.connect(host,port).sync();
    20             f.channel().closeFuture().sync();
    21         } catch (InterruptedException e) {
    22             e.printStackTrace();
    23         }finally {
    24             workGroup.shutdownGracefully();
    25         }
    26     }
    27     public class ClientChildHandler extends ChannelInitializer<SocketChannel>{
    28         @Override
    29         protected void initChannel(SocketChannel socketChannel) throws Exception {
    30             socketChannel.pipeline().addLast(new TimeClientHandler());
    31         }
    32     }
    33     public static void main(String[] args) {
    34         int port = 8080;
    35         if (args.length>0&&args!=null){
    36             try {
    37                 port = Integer.parseInt(args[0]);
    38             } catch (Exception e) {
    39                 e.printStackTrace();
    40             }
    41         }
    42         new TimeClient().connect("127.0.0.1",port);
    43     }
    44 }

    客户端处理IO代码

     1 package com.StickyUnpack;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.buffer.Unpooled;
     5 import io.netty.channel.ChannelHandlerContext;
     6 import io.netty.channel.ChannelInboundHandlerAdapter;
     7 
     8 public class TimeClientHandler extends ChannelInboundHandlerAdapter {
     9     int count;
    10     private byte[] req;
    11 //    private final ByteBuf firstMessage;
    12     public TimeClientHandler() {
    13         /*byte[] req = "QUERY TIME ORDER".getBytes();
    14         firstMessage = Unpooled.buffer(req.length);
    15         firstMessage.writeBytes(req);*/
    16 //        改造代码
    17         req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
    18     }
    19 
    20     @Override
    21     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    22         ByteBuf message = null;
    23         for(int i=0;i<100;i++){
    24             message = Unpooled.buffer(req.length);
    25             message.writeBytes(req);
    26             ctx.writeAndFlush(message);
    27         }
    28     }
    29 
    30     @Override
    31     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    32         ByteBuf byteBuf = (ByteBuf) msg;
    33         byte[] req = new byte[byteBuf.readableBytes()];
    34         byteBuf.readBytes(req);
    35         String body = new String(req,"utf-8");
    36         System.out.println("Now is:"+body+";The client count is:"+ ++count);
    37     }
    38 
    39     @Override
    40     public void exceptionCaught(ChannelHandlerContext chc, Throwable throwable) throws Exception {
    41         chc.close();
    42     }
    43 }

    运行结果

    客户端运行结果

    1 10:38:35.073 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
    2 10:38:35.073 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
    3 Now is:BAD ORDER
    4 BAD ORDER
    5 ;The client count is:1

    服务端运行结果

      1 10:38:35.129 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6751e055
      2 the time server receive order:QUERY TIME ORDER
      3 QUERY TIME ORDER
      4 QUERY TIME ORDER
      5 QUERY TIME ORDER
      6 QUERY TIME ORDER
      7 QUERY TIME ORDER
      8 QUERY TIME ORDER
      9 QUERY TIME ORDER
     10 QUERY TIME ORDER
     11 QUERY TIME ORDER
     12 QUERY TIME ORDER
     13 QUERY TIME ORDER
     14 QUERY TIME ORDER
     15 QUERY TIME ORDER
     16 QUERY TIME ORDER
     17 QUERY TIME ORDER
     18 QUERY TIME ORDER
     19 QUERY TIME ORDER
     20 QUERY TIME ORDER
     21 QUERY TIME ORDER
     22 QUERY TIME ORDER
     23 QUERY TIME ORDER
     24 QUERY TIME ORDER
     25 QUERY TIME ORDER
     26 QUERY TIME ORDER
     27 QUERY TIME ORDER
     28 QUERY TIME ORDER
     29 QUERY TIME ORDER
     30 QUERY TIME ORDER
     31 QUERY TIME ORDER
     32 QUERY TIME ORDER
     33 QUERY TIME ORDER
     34 QUERY TIME ORDER
     35 QUERY TIME ORDER
     36 QUERY TIME ORDER
     37 QUERY TIME ORDER
     38 QUERY TIME ORDER
     39 QUERY TIME ORDER
     40 QUERY TIME ORDER
     41 QUERY TIME ORDER
     42 QUERY TIME ORDER
     43 QUERY TIME ORDER
     44 QUERY TIME ORDER
     45 QUERY TIME ORDER
     46 QUERY TIME ORDER
     47 QUERY TIME ORDER
     48 QUERY TIME ORDER
     49 QUERY TIME ORDER
     50 QUERY TIME ORDER
     51 QUERY TIME ORDER
     52 QUERY TIME ORDER
     53 QUERY TIME ORDER
     54 QUERY TIME ORDER
     55 QUERY TIME ORDER
     56 QUERY TIME ORDER
     57 QUERY TIME ORDER
     58 QUERY TIME ORDthe count is:1
     59 the time server receive order:
     60 QUERY TIME ORDER
     61 QUERY TIME ORDER
     62 QUERY TIME ORDER
     63 QUERY TIME ORDER
     64 QUERY TIME ORDER
     65 QUERY TIME ORDER
     66 QUERY TIME ORDER
     67 QUERY TIME ORDER
     68 QUERY TIME ORDER
     69 QUERY TIME ORDER
     70 QUERY TIME ORDER
     71 QUERY TIME ORDER
     72 QUERY TIME ORDER
     73 QUERY TIME ORDER
     74 QUERY TIME ORDER
     75 QUERY TIME ORDER
     76 QUERY TIME ORDER
     77 QUERY TIME ORDER
     78 QUERY TIME ORDER
     79 QUERY TIME ORDER
     80 QUERY TIME ORDER
     81 QUERY TIME ORDER
     82 QUERY TIME ORDER
     83 QUERY TIME ORDER
     84 QUERY TIME ORDER
     85 QUERY TIME ORDER
     86 QUERY TIME ORDER
     87 QUERY TIME ORDER
     88 QUERY TIME ORDER
     89 QUERY TIME ORDER
     90 QUERY TIME ORDER
     91 QUERY TIME ORDER
     92 QUERY TIME ORDER
     93 QUERY TIME ORDER
     94 QUERY TIME ORDER
     95 QUERY TIME ORDER
     96 QUERY TIME ORDER
     97 QUERY TIME ORDER
     98 QUERY TIME ORDER
     99 QUERY TIME ORDER
    100 QUERY TIME ORDER
    101 QUERY TIME ORDER
    102 QUERY TIME ORDERthe count is:2

    从服务端运行结果来看我们期待服务端接收到100条信息,但是运行结果显示只接收到了2条信息,发生了粘包现象,同样客户端也应该接收到100条相应信息,但因为服务端只接收到2条信息,故而只返回两条相应均为BAD ORDER,但是从服务端运行结果看,却只收到1条应答,说明在服务端返回客户端的两条信息也发生了粘包现象。

  • 相关阅读:
    HDU-1754 I Hate It (树状数组模板题——单点更新,区间查询最大值)
    HDU-1166 敌兵布阵 (树状数组模板题——单点更新,区间求和)
    JavaScript dotAll模式
    ECMAScript6补全字符串长度方法padStart()和padEnd()
    ECMAScript6重复字符串方法repeat()
    JavaScript确定一个字符串是否包含在另一个字符串中的四种方法
    JavaScript中label与break配合使用
    JavaScript数据结构与算法-集合练习
    JavaScript数据结构与算法-散列练习
    JavaScript数据结构与算法-字典练习
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9345448.html
Copyright © 2011-2022 走看看