zoukankan      html  css  js  c++  java
  • 高性能NIO通信框架之Netty入门(二)-----TCP粘包/拆包

    一、TCP粘包/拆包解析

           TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

     

    假设客户端分别发送了两个数据包D1D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

    1)服务端分两次读取到了两个独立的数据包,分别是D1D2,没有粘包和拆包;

    2)服务端一次接收到了两个数据包,D1D2粘合在一起,被称为TCP粘包;

    3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;

    4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2D2包的整包。

    如果此时服务端TCP接收滑窗非常小,而数据包D1D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1D2包接收完全,期间发生多次拆包。

    二、TCP粘包/拆包发生的原因

     问题产生的原因有三个,分别如下。

    1)应用程序write写入的字节大小大于套接口发送缓冲区大小;

    2)进行MSS大小的TCP分段;

    3)以太网帧的payload大于MTU进行IP分片。

     

    三、粘包问题的解决策略

    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

    1)消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;

    2)在包尾增加回车换行符进行分割,例如FTP协议;

    3)将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;

    4)更复杂的应用层协议。

    四、Socket缓冲区与滑动窗口

          每个TCP socket在内核中都有一个发送缓冲区(SO_SNDBUF )和一个接收缓冲区(SO_RCVBUF),TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的buffer的填充状态。

    SO_SNDBUF:

    进程发送的数据的时候假设调用了一个send方法,最简单情况(也是一般情况),将数据拷贝进入socket的内核发送缓冲区之中,然后send便会在上层返回。换句话说,send返回之时,数据不一定会发送到对端去(和write写文件有点类似),send仅仅是把应用层buffer的数据拷贝进socket的内核发送buffer中。

    SO_RCVBUF:

    把接受到的数据缓存入内核,应用进程一直没有调用read进行读取的话,此数据会一直缓存在相应socket的接收缓冲区内。再啰嗦一点,不管进程是否读取socket,对端发来的数据都会经由内核接收并且缓存到socket的内核接收缓冲区之中。read所做的工作,就是把内核缓冲区中的数据拷贝到应用层用户的buffer里面,仅此而已。

    滑动窗口:

    TCP连接在三次握手的时候,会将自己的窗口大小(window size)发送给对方,其实就是SO_RCVBUF指定的值。之后在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送。

    每次发送数据后,发送方将自己维护的对方的window size减小,表示对方的SO_RCVBUF可用空间变小。

    当接收方处理开始处理SO_RCVBUF 中的数据时,会将数据从socket 在内核中的接受缓冲区读出,此时接收方的SO_RCVBUF可用空间变大,即window size变大,接受方会以ack消息的方式将自己最新的window size返回给发送方,此时发送方将自己的维护的接受的方的window size设置为ack消息返回的window size。

    此外,发送方可以连续的给接受方发送消息,只要保证对方的SO_RCVBUF空间可以缓存数据即可,即window size>0。当接收方的SO_RCVBUF被填充满时,此时window size=0,发送方不能再继续发送数据,要等待接收方ack消息,以获得最新可用的window size。

    MSS/MTU分片

    MTU (Maxitum Transmission Unit,最大传输单元)是链路层对一次可以发送的最大数据的限制。MSS(Maxitum Segment Size,最大分段大小)是TCP报文中data部分的最大长度,是传输层对一次可以发送的最大数据的限制

     

    数据在传输过程中,每经过一层,都会加上一些额外的信息:

    • 应用层:只关心发送的数据DATA,将数据写入socket在内核中的缓冲区SO_SNDBUF即返回,操作系统会将SO_SNDBUF中的数据取出来进行发送。
    • 传输层:会在DATA前面加上TCP Header(20字节)
    • 网络层:会在TCP报文的基础上再添加一个IP Header,也就是将自己的网络地址加入到报文中。IPv4中IP Header长度是20字节,IPV6中IP Header长度是40字节。
    • 链路层:加上Datalink Header和CRC。会将SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和Type域加入。SMAC+DMAC+Type+CRC总长度为18字节。
    • 物理层:进行传输

    MTU是以太网传输数据方面的限制,每个以太网帧最大不能超过1518bytes。刨去以太网帧的帧头(DMAC+SMAC+Type域)14Bytes和帧尾(CRC校验)4Bytes,那么剩下承载上层协议的地方也就是Data域最大就只能有1500Bytes这个值 我们就把它称之为MTU

    MSS是在MTU的基础上减去网络层的IP Header和传输层的TCP Header的部分,这就是TCP协议一次可以发送的实际应用数据的最大大小。

    MSS = MTU(1500) -IP Header(20 or 40)-TCP Header(20)

     

    五、利用LineBasedFrameDecoder解决TCP粘包问题

     

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    /**
     * Netty 时间服务器服务端TimeServer
     */
    public class TimeServer {
        public void bind(int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
                //绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
                //等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new TimeServerHandler());
            }
        }
    
        public static void main(String[] args) {
            int port = 8080;
            new TimeServer().bind(port);
        }
    
    }

     

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.Date;
    
    
    public class TimeServerHandler extends ChannelHandlerAdapter{
        private int counter = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("The time server receive order : "+body+" ; the counter is : "+ (++counter));
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
            currentTime = currentTime + System.getProperty("line.separator");
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class TimeClient {
        public void connect(int port,String host) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        socketChannel.pipeline().addLast(new StringDecoder());
                        socketChannel.pipeline().addLast(new TimeClientHandler());
                    }
                });
                //发起异步连接操作
                ChannelFuture f = b.connect(host,port).sync();
                //等待客户端连接操作
                f.channel().closeFuture().sync();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            int port = 8080;
            new TimeClient().connect(port,"127.0.0.1");
        }
    }

     

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * Created by ThinkPad on 2019/6/17.
     */
    public class TimeClientHandler extends ChannelHandlerAdapter {
        private int counter;
        private byte[] req;
    
        public TimeClientHandler() {
             req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
            counter = 0;
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf message = null;
            for (int i = 0; i < 100; i++) {
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("Now is : " + body + "; the counter is : " + (++counter));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    六、LineBasedFrameDecoder原理分析

    LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“ ”或者“ ”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

    StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的handler。LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

    如果发送的消息不是以换行符结束的该怎么办呢?或者没有回车换行符,靠消息头中的长度字段来分包怎么办?是不是需要自己写半包解码器?答案是否定的,Netty提供了多种支持TCP粘包/拆包的解码器,用来满足用户的不同诉求。

     

  • 相关阅读:
    WeTypecho程序配置
    XX人事系统.nsi
    query-validate 插件
    数据库操作技巧 之 oracle连表update、跨库查询、恢复被删除数据、解决锁表
    Oracle中添加银行家四舍五入
    Java生成MD5的方法,简单封装并转为32位小写
    springMVC中使用oracle批量插入的书写方法
    前端ajax能访问到后台的controller中但是前端报错404
    远程连接Oracle 数据库连接报错ORA-12638身份检索失败
    SQL state [72000]; error code [1013]; ORA-03111: 通信通道收到中断; java.sql.SQLException: ORA-01745: 无效的主机/绑定变量名;java.sql.SQLException: ORA-01013: 用户请求取消当前的操作
  • 原文地址:https://www.cnblogs.com/lovegrace/p/11048435.html
Copyright © 2011-2022 走看看