zoukankan      html  css  js  c++  java
  • Netty拆包和粘包

    Netty拆包和粘包

    概述

    在基于流的传输里如TCP/IP,接收到的数据会被先存储到以恶搞socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这就不能保证远程写入的数据会被准确地读取。

    假设操作系统的TCP/TP协议栈已经接收了3个数据包:
    在这里插入图片描述
    由于基于流传输的协议的性质,在应用程序里读取数据的时候有很大的可能性被分成下面的片段
    在这里插入图片描述

    场景模拟

    Client端

    Client模拟

    public class MyClient {
        public static void main(String[] args) throws Exception {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MyClientHandler());
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("localhost", 9999).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    

    MyClientHandler

    public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private int count = 0;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            //在这里因为是将ByteBuf缓冲区中所有的数据读取出来了,又因为TCP发送缓冲区可能等有
           //好几个数据包的时候才发送,所以这里会出现拆包粘包的现象
            byte[] buffer = new byte[msg.readableBytes()];
            msg.readBytes(buffer);
            String message = new String(buffer,Charset.forName("utf-8"));
            System.out.println("客户端接收到的消息内容: " + message);
            System.out.println("客户端接收到的消息数量:" + (++count));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println(cause.getMessage());
            ctx.close();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //客户端一旦启动,发送10条数据
            for (int i = 0; i < 10; i++) {
                ByteBuf buffer = Unpooled.copiedBuffer("sent ffrom client ", Charset.forName("utf-8"));
    //            try {
    //                TimeUnit.SECONDS.sleep(2);
    //            }catch (Exception e){}
                ctx.writeAndFlush(buffer);
            }
        }
    }
    

    Server端

    public class MyServer {
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.DEBUG))
                        .childHandler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("myhandler",new MyServerHandler());
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
    
                System.out.println("=================服务端开始工作================");
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private int count = 0;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            byte[] buffer = new byte[msg.readableBytes()];
            msg.readBytes(buffer);
    
            String message = new String(buffer, Charset.forName("utf-8"));
            System.out.println("服务端接收到的消息内容: " + message);
            System.out.println("服务端接收到消息的数量: " + (++count));
    
            ByteBuf response = Unpooled.copiedBuffer(UUID.randomUUID().toString() + "==", Charset.forName("utf-8"));
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println(cause.getMessage());
            ctx.close();
        }
    }
    

    结果分析

    //第一次
    //服务端
    服务端接收到的消息内容: sent ffrom client sent ffrom client sent ffrom client sent ffrom client sent ffrom client sent ffrom client sent ffrom client sent ffrom client sent ffrom client sent ffrom client 
    服务端接收到消息的数量: 1
    
    //客户端
    客户端接收到的消息内容: 110785f4-7b6f-4f35-8037-409502b03975==
    客户端接收到的消息数量:1
    
    //重启客户端
    //服务端
    服务端接收到的消息内容: sent ffrom client 
    服务端接收到消息的数量: 1
    服务端接收到的消息内容: sent ffrom client 
    服务端接收到消息的数量: 2
    服务端接收到的消息内容: sent ffrom client sent ffrom client 
    服务端接收到消息的数量: 3
    服务端接收到的消息内容: sent ffrom client sent ffrom client sent ffrom client sent ffrom client 
    服务端接收到消息的数量: 4
    服务端接收到的消息内容: sent ffrom client sent ffrom client 
    服务端接收到消息的数量: 5
    
    //客户端
    客户端接收到的消息内容: 6dfd6a0e-e45d-4ccd-82e3-d98e4fb06d6c==48b2f586-8ebb-439d-bdd1-e0a8afd3fae0==186c775d-a6c6-41d2-9e1b-acdb5278c9cb==ed8dd522-a0e0-40b9-b5a9-c1589226aede==9a7bc9ff-d6ca-4017-b8b7-d4e1c8a83a35==
    客户端接收到的消息数量:1
    

    解决方案

    sleep

    在每次发送的数据时都sleep一小段实现

    for (int i = 0; i < 10; i++) {
        ByteBuf buffer = Unpooled.copiedBuffer("sent ffrom client ", Charset.forName("utf-8"));
        /*try {
            TimeUnit.SECONDS.sleep(1);
        }catch (Exception e){}*/
        ctx.writeAndFlush(buffer);
    }
    try {
       TimeUnit.MILLISECONDS.sleep(200);
    }catch (Exception e){}
    ctx.writeAndFlush(response);
    

    自定义解码器

    PersonProtocol

    public class PersonProtocol {
        private int length;
        private byte[] content;
        public int getLength() {
            return length;
        }
        public void setLength(int length) {
            this.length = length;
        }
        public byte[] getContent() {
            return content;
        }
        public void setContent(byte[] content) {
            this.content = content;
        }
    }
    
    public class MyPersonDecoder extends ReplayingDecoder<Void> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("MyPersonDecoder decode invoked");
            //获取消息的长度
            int length = in.readInt();
            byte[] content = new byte[length];
            in.readBytes(content);
    
            PersonProtocol personProtocol = new PersonProtocol();
            personProtocol.setLength(length);
            personProtocol.setContent(content);
            out.add(personProtocol);
        }
    }
    
    public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> {
        @Override
        protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
            System.out.println("MyPersonEncoder encode invoked");
            out.writeInt(msg.getLength());
            out.writeBytes(msg.getContent());
        }
    }
    

    Client端

    //在MyClient中
     ch.pipeline().addLast(new MyPersonDecoder());
     ch.pipeline().addLast(new MyPersonEncoder());
    
    public class MyClientHandler extends SimpleChannelInboundHandler<PersonProtocol> {
    
        private int count = 0;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
            int length = msg.getLength();
            byte[] content = msg.getContent();
    
            System.out.println("客户端收到的消息: ");
            System.out.println("长度: " + length);
            System.out.println("内容: " + new String(content, Charset.forName("utf-8")));
            System.out.println("服务端接收到消息数量: " + (++count));
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客户端被激活");
            for (int i = 0; i < 10; i++) {
                String message = "sent from client ";
                int length = message.getBytes("utf-8").length;
                byte[] content = message.getBytes("utf-8");
                PersonProtocol personProtocol = new PersonProtocol();
                personProtocol.setLength(length);
                personProtocol.setContent(content);
    
                ctx.writeAndFlush(personProtocol);
            }
        }
    }
    

    Server端

    //MyServer
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new MyPersonDecoder());
    pipeline.addLast(new MyPersonEncoder());
    
    public class MyServerHandler extends SimpleChannelInboundHandler<PersonProtocol> {
    
        private int count = 0;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
            int length = msg.getLength();
            byte[] content = msg.getContent();
    
            System.out.println("服务端接收到的数据: ");
            System.out.println("长度: " + length);
            System.out.println("内容: " + new String(content, Charset.forName("utf-8")));
    
            System.out.println("服务端接收到消息数量: " + (++count));
    
            String response = UUID.randomUUID().toString();
            int responseLength = response.getBytes("utf-8").length;
            byte[] responseContent = response.getBytes("utf-8");
    
            PersonProtocol personProtocol = new PersonProtocol();
            personProtocol.setLength(responseLength);
            personProtocol.setContent(responseContent);
    
            ctx.writeAndFlush(personProtocol);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
  • 相关阅读:
    mysqllog
    清理:db上面的过期的binlog,释放磁盘空间。 (转)
    linux下shell命令trap
    mvc
    uci随笔
    luci 随笔
    shell脚本 整数比较
    lua学习
    OPENWRT make menuconfig错误之一
    openwrt 中make的使用
  • 原文地址:https://www.cnblogs.com/liuligang/p/10544278.html
Copyright © 2011-2022 走看看