zoukankan      html  css  js  c++  java
  • Netty学习之粘包、半包

    一、什么是粘包、半包

      1、粘包半包场景重现

      来做这么一个测试,在Netty客户端发送数据到Netty服务端,代码如下:

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
            for (int i = 0; i < 500; i++) {
                ByteBuf outBuffer = Unpooled.copiedBuffer("hello netty server"+i, CharsetUtil.UTF_8);
                ctx.writeAndFlush(outBuffer);
            }
        }

      在服务端进行信息接收:

        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg) {
    
            ByteBuf byteBuf=(ByteBuf)msg;
            byte[] bytes=new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            System.out.println("Server Accept:"+new String(bytes,CharsetUtil.UTF_8));
            ctx.channel().write(msg);
        }

      得到打印结果如下:

            

       如①处就是一个粘包的例子,因为多次请求都在服务端一次读取中得到,②处就是一个半包的例子,因为一次发送应该是hello client,可是这次数据读取只拿到了一次数据发送的后半段,上面这个例子就同时展示了粘包半包。

      如下图假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

    1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
    2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
    3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
    4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

      如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

      如下图所示大致有这么四个情况:

                

      2、粘包半包形成原因

      由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象。

      而对于UDP,本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。

      分包产生的原因就简单的多:可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。

      拆包更具体的原因有三个,分别如下。

    • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小。
    • 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度
    • 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

      总结可知发生TCP粘包或拆包有很多原因,但是常见原因无非就是:

      1、要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。

      2、待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

      3、要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。

      4、接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

    二、粘包、拆包解决办法

      由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

      ①分隔符:在包尾增加分割符,比如回车换行符进行分割,例如FTP协议;linebase包和delimiter包下,分别使用 LineBasedFrameDecoder和DelimiterBasedFrameDecode。

          加分割符的缺点是需要保证消息体内容不能有对应分隔符,否则会导致解析错误。

      ②定长:消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;fixed包下,使用FixedLengthFrameDecoder。

          消息定长的缺点是,即使消息体很少也需要有固定的长度去接收,存在浪费的问题

      ③消息头:将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度,使用LengthFieldBasedFrameDecoder。

      因此有了以上思路,就可以进行实现尝试。 

      1、固定数据包长度FixedLengthFrameDecoder

      客户端代码:

    public class FixNettyClient {
            private ChannelFuture f;
    
        public void connect(int port, String host) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
            try {
                Bootstrap b = new Bootstrap();//客户端启动程序
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(18));
                                socketChannel.pipeline().addLast(new FixNettyClientHandler());
                            }
                        });
    
                f = b.connect(host, port).sync();/*连接到远程节点,阻塞等待直到连接完成*/
                f.channel().closeFuture().sync();/*阻塞,直到channel关闭*/
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            FixNettyClient client = new FixNettyClient();
            client.connect(port, "localhost");
            ChannelFuture cf = client.getF();
        }
    
        public ChannelFuture getF() {
            return f;
        }
    
        public void setF(ChannelFuture f) {
            this.f = f;
        }
    }
    public class FixNettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private static final Logger logger = Logger.getLogger(FixNettyClientHandler.class.getName());
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf msg = null;
            String request = "hello netty server123";
            for (int i = 0; i < 100; i++) {
                msg = Unpooled.buffer(request.length());
                msg.writeBytes(request.getBytes());
                ctx.writeAndFlush(msg);
            }
        }
    @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("accept msg:" + msg.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

      服务端代码:

    public class FixNettyServer {
        public void bind(int port)throws Exception{
            FixNettyServerHandler serverHandler= new FixNettyServerHandler();
    
            EventLoopGroup bossGroup=new NioEventLoopGroup();//selector[]
            EventLoopGroup workGroup=new NioEventLoopGroup();
            try {
                ServerBootstrap b=new ServerBootstrap();//服务端引导程序
                b.group(bossGroup ,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>(){
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(21));   
                                socketChannel.pipeline().addLast(new FixNettyServerHandler());
                            }
                        });
                ChannelFuture f=b.bind(port).sync();
                System.out.println("server start");
                f.channel().closeFuture().sync();
            }catch (Exception e){
    
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception{
            int port=8080;
            new FixNettyServer().bind(port);
        }
    }
    
    public class FixNettyServerHandler extends ChannelInboundHandlerAdapter {
    
        private AtomicInteger readCount =new AtomicInteger(0);
        private AtomicInteger completeCout =new AtomicInteger(0);
    
        public FixNettyServerHandler(){
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg)
        throws  Exception{
            ByteBuf byteBuf= (ByteBuf)msg;
            byteBuf.touch();
            ReferenceCountUtil.touch(msg);
            byte[] bytes=new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            System.out.println("Server Accept:["+new String(bytes,CharsetUtil.UTF_8)+"]"+
                    readCount.incrementAndGet());
    
            ByteBuf response= Unpooled.copiedBuffer(("hello netty client").getBytes());
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx){
            System.out.println("inHandler channelReadComplete:"+ completeCout.incrementAndGet());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            super.channelRegistered(ctx);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
    }

      2、分隔符

      由于其余代码和以上内容均一致,因此只展示出初始化的差异性代码。

      client初始化代码(自定义分隔符):

    public void connect(int port, String host) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
            try {
                Bootstrap b = new Bootstrap();//客户端启动程序
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer(DelimiterNettyClientHandler.SYMBOL.getBytes());
                                socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                                socketChannel.pipeline().addLast(new DelimiterNettyClientHandler());
                            }
                        });
    
                f = b.connect(host, port).sync();/*连接到远程节点,阻塞等待直到连接完成*/
                f.channel().closeFuture().sync();/*阻塞,直到channel关闭*/
            } finally {
                group.shutdownGracefully();
            }
        }

      服务端初始化代码(自定义分隔符):

    public void bind(int port)throws Exception{
            DelimiterNettyServerHandler serverHandler= new DelimiterNettyServerHandler();
    
            EventLoopGroup  bossGroup=new NioEventLoopGroup();//selector[]
            EventLoopGroup workGroup=new NioEventLoopGroup();
            try {
                ServerBootstrap b=new ServerBootstrap();//服务端引导程序
                b.group(bossGroup ,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>(){
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer(DelimiterNettyClientHandler.SYMBOL.getBytes());
                                socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                                socketChannel.pipeline().addLast(new DelimiterNettyServerHandler());
                            }
                        });
                ChannelFuture f=b.bind(port).sync();
                System.out.println("server start");
                f.channel().closeFuture().sync();
            }catch (Exception e){
    
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
    
        }

      还有一种系统默认分隔符的方式,以客户端为例:

    public void connect(int port, String host) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
            try {
                Bootstrap b = new Bootstrap();//客户端启动程序
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                socketChannel.pipeline().addLast(new LineNettyClientHandler());
                            }
                        });
    
                f = b.connect(host, port).sync();/*连接到远程节点,阻塞等待直到连接完成*/
                f.channel().closeFuture().sync();/*阻塞,直到channel关闭*/
            } finally {
                group.shutdownGracefully();
            }
        }

      3、消息头设置

      客户端代码如下:

    public void connect(int port, String host) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
            try {
                Bootstrap b = new Bootstrap();//客户端启动程序
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast("frameEncoder,new LengthFieldPrepender(2));
                                socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                                socketChannel.pipeline().addLast(new FieldNettyClientHandler());
                            }
                        });
    
                f = b.connect(host, port).sync();/*连接到远程节点,阻塞等待直到连接完成*/
                f.channel().closeFuture().sync();/*阻塞,直到channel关闭*/
            } finally {
                group.shutdownGracefully();
            }
        }

      参数如下:

            

      服务端如下:

    public void bind(int port)throws Exception{
            FieldNettyServerHandler serverHandler= new FieldNettyServerHandler();
    
            EventLoopGroup  bossGroup=new NioEventLoopGroup();//selector[]
            EventLoopGroup workGroup=new NioEventLoopGroup();
            try {
                ServerBootstrap b=new ServerBootstrap();//服务端引导程序
                b.group(bossGroup ,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>(){
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
                                socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                                socketChannel.pipeline().addLast(new FieldNettyServerHandler());
                            }
                        });
                ChannelFuture f=b.bind(port).sync();
                System.out.println("server start");
                f.channel().closeFuture().sync();
            }catch (Exception e){
    
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
    
        }

      同时可参考博客:https://blog.csdn.net/fgx_123456/article/details/80031821

  • 相关阅读:
    iOS 手势操作:拖动、捏合、旋转、点按、长按、轻扫、自定义
    一个基于MVVM的TableView组件化实现方案
    代码审查和不良编程习惯
    十二步创建你的第一个JavaScript库
    可简单避免的三个 JavaScript 发布错误
    巧用Javascript将相对路径地址转换为绝对路径
    jquery 事件对象属性小结
    26个Jquery使用小技巧
    应用于网站导航中的 12 个 jQuery 插件
    使用 jQuery 避免鼠标双击
  • 原文地址:https://www.cnblogs.com/jing99/p/12554739.html
Copyright © 2011-2022 走看看