zoukankan      html  css  js  c++  java
  • TCP粘包拆包基本解决方案

    上个小节我们浅析了在Netty的使用的时候TCP的粘包和拆包的现象,Netty对此问题提供了相对比较丰富的解决方案

    Netty提供了几个常用的解码器,帮助我们解决这些问题,其实上述的粘包和拆包的问题,归根结底的解决方案就是发送端给远程端一个标记,告诉远程端,每个信息的结束标志是什么,这样,远程端获取到数据后,根据跟发送端约束的标志,将接收的信息分切或者合并成我们需要的信息,这样我们就可以获取到正确的信息了

    例如,我们刚才的例子中,我们可以在发送的信息中,加一个结束标志,例如两个远程端规定以行来切分数据,那么发送端,就需要在每个信息体的末尾加上行结束的标志,部分代码如下:

    修改BaseClientHandler的req的构造:

    [java] view plain copy
     
    1. public BaseClientHandler() {  
    2. //        req = ("BazingaLyncc is learner").getBytes();  
    3.         req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w"  
    4.                 + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process "  
    5.                 + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge"  
    6.                 + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss"  
    7.                 + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi"  
    8.                 + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the"  
    9.                 + " process of configuring and connecting all of Netty’s components to bring your learned about threading "  
    10.                 + "models in general and Netty’s threading model in particular, whose performance and consistency advantag"  
    11.                 + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri"  
    12.                 + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222"  
    13.                 + "sdsa ddasd asdsadas dsadasdas" + System.getProperty("line.separator")).getBytes();  
    14.     }  

    我们在我们巨长的req中末尾加了System.getProperty("line.separator"),这样相当于给req打了一个标记

    打完标记,其实我们这个示例中的server中还不知道是以行为结尾的,所以我们需要修改server的handler链,在inbound链中加一个额外的处理链,判断一下,获取的信息按照行来切分,我们很庆幸,这样枯燥的代码Netty已经帮我们完美地完成了,Netty提供了一个LineBasedFrameDecoder这个类,顾名思义,这个类名字中有decoder,说明是一个解码器,我们再看看它的详细声明:

    [java] view plain copy
     
    1. /** 
    2.  * A decoder that splits the received {@link ByteBuf}s on line endings. 
    3.  * <p> 
    4.  * Both {@code " "} and {@code " "} are handled. 
    5.  * For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}. 
    6.  */  
    7. public class LineBasedFrameDecoder extends ByteToMessageDecoder {  
    8.   
    9.     /** Maximum length of a frame we're willing to decode.  */  
    10.     private final int maxLength;  
    11.     /** Whether or not to throw an exception as soon as we exceed maxLength. */  
    12.     private final boolean failFast;  
    13.     private final boolean stripDelimiter;  
    14.   
    15.     /** True if we're discarding input because we're already over maxLength.  */  
    16.     private boolean discarding;  
    17.     private int discardedBytes;  

    它是继承ByteToMessageDecoder的,是将byte类型转化成Message的,所以我们应该将这个解码器放在inbound处理器链的第一个,所以我们修改一下Server端的启动代码:

    [java] view plain copy
     
    1. package com.lyncc.netty.stickpackage.myself;  
    2.   
    3. import io.netty.bootstrap.ServerBootstrap;  
    4. import io.netty.channel.ChannelFuture;  
    5. import io.netty.channel.ChannelInitializer;  
    6. import io.netty.channel.ChannelOption;  
    7. import io.netty.channel.EventLoopGroup;  
    8. import io.netty.channel.nio.NioEventLoopGroup;  
    9. import io.netty.channel.socket.SocketChannel;  
    10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
    11. import io.netty.handler.codec.LineBasedFrameDecoder;  
    12. import io.netty.handler.codec.string.StringDecoder;  
    13.   
    14. import java.net.InetSocketAddress;  
    15.   
    16. public class BaseServer {  
    17.   
    18.     private int port;  
    19.       
    20.     public BaseServer(int port) {  
    21.         this.port = port;  
    22.     }  
    23.       
    24.     public void start(){  
    25.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);  
    26.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    27.         try {  
    28.             ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
    29.                     .childHandler(new ChannelInitializer<SocketChannel>() {  
    30.                           
    31.                         protected void initChannel(SocketChannel ch) throws Exception {  
    32.                             ch.pipeline().addLast(new LineBasedFrameDecoder(2048));  
    33.                             ch.pipeline().addLast(new StringDecoder());  
    34.                             ch.pipeline().addLast(new BaseServerHandler());  
    35.                         };  
    36.                           
    37.                     }).option(ChannelOption.SO_BACKLOG, 128)     
    38.                     .childOption(ChannelOption.SO_KEEPALIVE, true);  
    39.              // 绑定端口,开始接收进来的连接  
    40.              ChannelFuture future = sbs.bind(port).sync();    
    41.                
    42.              System.out.println("Server start listen at " + port );  
    43.              future.channel().closeFuture().sync();  
    44.         } catch (Exception e) {  
    45.             bossGroup.shutdownGracefully();  
    46.             workerGroup.shutdownGracefully();  
    47.         }  
    48.     }  
    49.       
    50.     public static void main(String[] args) throws Exception {  
    51.         int port;  
    52.         if (args.length > 0) {  
    53.             port = Integer.parseInt(args[0]);  
    54.         } else {  
    55.             port = 8080;  
    56.         }  
    57.         new BaseServer(port).start();  
    58.     }  
    59. }  

    这样,我们只是在initChannel方法中增加了一个LineBasedFrameDecoder这个类,其中2048是规定一行数据最大的字节数

    我们再次运行,我们再看看效果:

    可以看到客户端发送的两次msg,被服务器端成功地两次接收了,我们要的效果达到了

    我们将LineBasedFrameDecoder中的2048参数,缩小一半,变成1024,我们再看看效果:

    出现了异常,这个异常时TooLongFrameException,这个异常在Netty in Action中介绍过,帧的大小太大,在我们这个场景中,就是我们发送的一行信息大小是1076,大于了我们规定的1024所以报错了

    我们再解决另一个粘包的问题,我们可以看到上节中介绍的那个粘包案例中,我们发送了100次的信息“BazingaLyncc is learner”,这个案例很特殊,这个信息是一个特长的数据,字节长度是23,所以我们可以使用Netty为我们提供的FixedLengthFrameDecoder这个解码器,看到这个名字就明白了大半,定长数据帧的解码器,所以我们修改一下代码:

    BaseClientHandler:

    [java] view plain copy
     
    1. package com.lyncc.netty.stickpackage.myself;  
    2.   
    3. import io.netty.buffer.ByteBuf;  
    4. import io.netty.buffer.Unpooled;  
    5. import io.netty.channel.ChannelHandlerContext;  
    6. import io.netty.channel.ChannelInboundHandlerAdapter;  
    7.   
    8. public class BaseClientHandler extends ChannelInboundHandlerAdapter{  
    9.       
    10.     private byte[] req;  
    11.       
    12.     public BaseClientHandler() {  
    13.         req = ("BazingaLyncc is learner").getBytes();  
    14. //        req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w"  
    15. //                + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process "  
    16. //                + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge"  
    17. //                + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss"  
    18. //                + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi"  
    19. //                + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the"  
    20. //                + " process of configuring and connecting all of Netty’s components to bring your learned about threading "  
    21. //                + "models in general and Netty’s threading model in particular, whose performance and consistency advantag"  
    22. //                + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri"  
    23. //                + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222"  
    24. //                + "sdsa ddasd asdsadas dsadasdas" + System.getProperty("line.separator")).getBytes();  
    25.     }  
    26.       
    27.       
    28.     @Override  
    29.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
    30.         ByteBuf message = null;  
    31.         for (int i = 0; i < 100; i++) {  
    32.             message = Unpooled.buffer(req.length);  
    33.             message.writeBytes(req);  
    34.             ctx.writeAndFlush(message);  
    35.         }  
    36. //        message = Unpooled.buffer(req.length);  
    37. //        message.writeBytes(req);  
    38. //        ctx.writeAndFlush(message);  
    39. //        message = Unpooled.buffer(req.length);  
    40. //        message.writeBytes(req);  
    41. //        ctx.writeAndFlush(message);  
    42.     }  
    43.   
    44.     @Override  
    45.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
    46.         ctx.close();  
    47.     }  
    48.   
    49. }  

    BaseServer:

    [java] view plain copy
     
    1. package com.lyncc.netty.stickpackage.myself;  
    2.   
    3. import io.netty.bootstrap.ServerBootstrap;  
    4. import io.netty.channel.ChannelFuture;  
    5. import io.netty.channel.ChannelInitializer;  
    6. import io.netty.channel.ChannelOption;  
    7. import io.netty.channel.EventLoopGroup;  
    8. import io.netty.channel.nio.NioEventLoopGroup;  
    9. import io.netty.channel.socket.SocketChannel;  
    10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
    11. import io.netty.handler.codec.FixedLengthFrameDecoder;  
    12. import io.netty.handler.codec.string.StringDecoder;  
    13.   
    14. import java.net.InetSocketAddress;  
    15.   
    16. public class BaseServer {  
    17.   
    18.     private int port;  
    19.       
    20.     public BaseServer(int port) {  
    21.         this.port = port;  
    22.     }  
    23.       
    24.     public void start(){  
    25.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);  
    26.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    27.         try {  
    28.             ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
    29.                     .childHandler(new ChannelInitializer<SocketChannel>() {  
    30.                           
    31.                         protected void initChannel(SocketChannel ch) throws Exception {  
    32.                             ch.pipeline().addLast(new FixedLengthFrameDecoder(23));  
    33.                             ch.pipeline().addLast(new StringDecoder());  
    34.                             ch.pipeline().addLast(new BaseServerHandler());  
    35.                         };  
    36.                           
    37.                     }).option(ChannelOption.SO_BACKLOG, 128)     
    38.                     .childOption(ChannelOption.SO_KEEPALIVE, true);  
    39.              // 绑定端口,开始接收进来的连接  
    40.              ChannelFuture future = sbs.bind(port).sync();    
    41.                
    42.              System.out.println("Server start listen at " + port );  
    43.              future.channel().closeFuture().sync();  
    44.         } catch (Exception e) {  
    45.             bossGroup.shutdownGracefully();  
    46.             workerGroup.shutdownGracefully();  
    47.         }  
    48.     }  
    49.       
    50.     public static void main(String[] args) throws Exception {  
    51.         int port;  
    52.         if (args.length > 0) {  
    53.             port = Integer.parseInt(args[0]);  
    54.         } else {  
    55.             port = 8080;  
    56.         }  
    57.         new BaseServer(port).start();  
    58.     }  
    59. }  

    我们就是在channelhandler链中,加入了FixedLengthFrameDecoder,且参数是23,告诉Netty,获取的帧数据有23个字节就切分一次

    运行结果:

    可以看见,我们获取到了我们想要的效果

    当然Netty还提供了一些其他的解码器,有他们自己的使用场景,例如有按照某个固定字符切分的DelimiterBasedFrameDecoder的解码器

    我们再次修改代码:

    BaseClientHandler.java

    [html] view plain copy
     
    1. package com.lyncc.netty.stickpackage.myself;  
    2.   
    3. import io.netty.buffer.ByteBuf;  
    4. import io.netty.buffer.Unpooled;  
    5. import io.netty.channel.ChannelHandlerContext;  
    6. import io.netty.channel.ChannelInboundHandlerAdapter;  
    7.   
    8. public class BaseClientHandler extends ChannelInboundHandlerAdapter{  
    9.       
    10.     private byte[] req;  
    11.       
    12.     public BaseClientHandler() {  
    13. //        req = ("BazingaLyncc is learner").getBytes();  
    14.         req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. $$__ His book w"  
    15.                 + "ill give We’ve reached an exciting point—in the next chapter we’ll $$__ discuss bootstrapping, the process "  
    16.                 + "of configuring and connecting all of Netty’s components to bring $$__ your learned about threading models in ge"  
    17.                 + "neral and Netty’s threading model in particular, whose performance $$__ and consistency advantages we discuss"  
    18.                 + "ed in detail In this chapter you general, we recommend Java  $$__Concurrency in Practice by Brian Goetz. Hi"  
    19.                 + "s book will give We’ve reached an exciting point—in the next $$__ chapter we’ll discuss bootstrapping, the"  
    20.                 + " process of configuring and connecting all of Netty’s components $$__ to bring your learned about threading "  
    21.                 + "models in general and Netty’s threading model in particular, $$__ whose performance and consistency advantag"  
    22.                 + "es we discussed in detailIn this chapter you general, $$__ we recommend Java Concurrency in Practice by Bri"  
    23.                 + "an Goetz. His book will give We’ve reached an exciting $$__ point—in the next chapter;the counter is: 1 2222"  
    24.                 + "sdsa ddasd asdsadas dsadasdas" + System.getProperty("line.separator")).getBytes();  
    25.     }  
    26.       
    27.       
    28.     @Override  
    29.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
    30.         ByteBuf message = null;  
    31. //        for (int i = 0; i 100; i++) {  
    32. //            message = Unpooled.buffer(req.length);  
    33. //            message.writeBytes(req);  
    34. //            ctx.writeAndFlush(message);  
    35. //        }  
    36.         message = Unpooled.buffer(req.length);  
    37.         message.writeBytes(req);  
    38.         ctx.writeAndFlush(message);  
    39.         message = Unpooled.buffer(req.length);  
    40.         message.writeBytes(req);  
    41.         ctx.writeAndFlush(message);  
    42.     }  
    43.   
    44.     @Override  
    45.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
    46.         ctx.close();  
    47.     }  
    48.   
    49. }  

    我们在req的字符串中增加了“$$__”这样的切割符,然后再Server中照例增加一个DelimiterBasedFrameDecoder,来切割字符串:

    [html] view plain copy
     
    1. ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
    2.                     .childHandler(new ChannelInitializer<SocketChannel>() {  
    3.                           
    4.                         protected void initChannel(SocketChannel ch) throws Exception {  
    5.                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("$$__".getBytes())));  
    6.                             ch.pipeline().addLast(new StringDecoder());  
    7.                             ch.pipeline().addLast(new BaseServerHandler());  
    8.                         };  
    9.                           
    10.                     }).option(ChannelOption.SO_BACKLOG, 128)     
    11.                     .childOption(ChannelOption.SO_KEEPALIVE, true);  

    我们在initChannel中第一个inbound中增加了DelimiterBasedFrameDecoder,且规定切割符就是“$$__”,这样就能正常切割了,我们看看运行效果:

    可以看到被分了20次读取,我们可以这样理解,客户端发送了2次req字节,每个req中有10个“$$__”,这样就是第11次切割的时候其实发送了粘包,第一个req中末尾部分和第二次的头部粘在了一起,作为第11部分的内容

    而最后一部分的内容因为没有"$$__"切割,所以没有打印在控制台上~

    其实这类的Handler还是相对比较简单的,真实的生产环境这些decoder只是作为比较基本的切分类,但是这些decoder还是很好用的~

    希望讲的对您有所帮助~END~

  • 相关阅读:
    Qt拖放功能
    CUDA笔记 -- 1
    c/c++笔记--5
    SpringBoot注解大全
    Java集合图谱
    sql优化的几种方法
    Spring 体系结构详解
    2019年 Java 面试题解析
    IntelliJ IDEA 快捷键大全
    java集合继承关系
  • 原文地址:https://www.cnblogs.com/duan2/p/8858138.html
Copyright © 2011-2022 走看看