zoukankan      html  css  js  c++  java
  • Tcp 粘包以及解决方法

    1. 简介

    1. TCP 是面向连接的,面向流的,提供可靠性服务,收发两端(客户端和服务器端) 都要有一一成对的Socket, 因此,发送端为了将多个发送给接收端的包更有效的发给对方,使用了优化算法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包,这样虽然提高了效率,但是接收端就难于分辨出完整的数据包了。 因为面向流的通信是无消息保护边界的。

    2. 由于TCP 无消息保护边界,需要在接收端处理消息边界问题, 也就是我们所说的粘包拆包问题。

    2. 粘包问题演示

    TcpServer:

    package netty.tcp;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class TcpServer {
    
        private static final Integer PORT = 6666;
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyNettyServerInitializer());
    
            ChannelFuture sync = serverBootstrap.bind(PORT).sync();
            sync.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("服务端启动成功,监听地址: " + PORT);
                    }
                }
            });
        }
    }

    MyNettyServerInitializer

    package netty.tcp;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    
    public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 向管道加入处理器
            ChannelPipeline pipeline = ch.pipeline();
            // 1. 增加一个自定义的handler
            pipeline.addLast(new MyServerHandler());
    
            System.out.println("server is ok~~~~");
        }
    }

    MyServerHandler

    package netty.tcp;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private int count;
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            int i = msg.readableBytes();
            System.out.println("读取到的字节数: " + i);
            // 回显一个消息给客户端
            String msgStr = msg.toString(CharsetUtil.UTF_8);
            String printMsg = "count: " + (++count) + "; msgStr:" + msgStr;
            System.out.println(printMsg);
    
            ctx.channel().writeAndFlush(Unpooled.copiedBuffer(printMsg, CharsetUtil.UTF_8));
        }
    }

    TcpClient

    package netty.tcp;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class TcpClient {
    
        private static final Integer PORT = 6666;
    
        public static void main(String[] args) throws InterruptedException {
            // 创建一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                // 创建一个启动Bootstrap(注意是Netty包下的)
                Bootstrap bootstrap = new Bootstrap();
                // 链式设置参数
                bootstrap.group(eventExecutors) // 设置线程组
                        .channel(NioSocketChannel.class) // 设置通道class
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                // 1. 加入一个自定义的处理器
                                pipeline.addLast(new MyClientHandler());
                            }
                        });
                System.out.println("客户端is ok...");
    
                // 启动客户端连接服务器(ChannelFuture 是netty的异步模型)
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
                // 监听关闭通道
                channelFuture.channel().closeFuture().sync();
            } finally {
                // 关闭
                eventExecutors.shutdownGracefully();
            }
        }
    }

    MyClientHandler

    package netty.tcp;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的 ChannelInboundHandlerAdapter 类
     */
    public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        /**
         * 通道就绪事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 循环发送十条消息
            int count = 10;
            String msg = "client msg ";
            for (int i = 0; i < count; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer(msg + i, CharsetUtil.UTF_8));
            }
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            System.out.println("从客户端 " + ctx.channel().remoteAddress() + " 读取到的消息, long: " + msg);
            // 回显一个消息给客户端
            String msgStr = msg.toString(CharsetUtil.UTF_8);
            System.out.println(msgStr);
        }
    }

    启动服务器端,然后启动两个客户端,查看日志:

    (1) 服务器端:

    服务端启动成功,监听地址: 6666
    server is ok~~~~
    读取到的字节数: 120
    count: 1; msgStr:client msg 0client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8client msg 9
    server is ok~~~~
    读取到的字节数: 12
    count: 1; msgStr:client msg 0
    读取到的字节数: 96
    count: 2; msgStr:client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8
    读取到的字节数: 12
    count: 3; msgStr:client msg 9

    (2) 客户端1

    客户端is ok...
    从客户端 /127.0.0.1:6666 读取到的消息, long: PooledUnsafeDirectByteBuf(ridx: 0, widx: 137, cap: 1024)
    count: 1; msgStr:client msg 0client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8client msg 9

    (3) 客户端2

    客户端is ok...
    从客户端 /127.0.0.1:6666 读取到的消息, long: PooledUnsafeDirectByteBuf(ridx: 0, widx: 171, cap: 1024)
    count: 1; msgStr:client msg 0count: 2; msgStr:client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8count: 3; msgStr:client msg 9

      可以看到发生了消息错乱。

    3. 解决办法

    1. 加入一个消息包装类 TransferMsg

    package netty.tcp;
    
    import lombok.Data;
    
    @Data
    public class TransferMsg {
    
        private Integer length;
    
        private byte[] msg;
    }

    2. TransferMsgEncoder 编码器

    package netty.tcp;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class TransferMsgEncoder extends MessageToByteEncoder<TransferMsg> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, TransferMsg msg, ByteBuf out) throws Exception {
            System.out.println("netty.tcp.TransferMsgEncoder.encode 被调用");
            out.writeInt(msg.getLength());
            out.writeBytes(msg.getMsg());
        }
    }

    3. TransferMsgDecoder 解码器

    package netty.tcp;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ReplayingDecoder;
    
    import java.util.List;
    
    public class TransferMsgDecoder extends ReplayingDecoder<Void> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("netty.tcp.TransferMsgDecoder.decode 被调用");
            int count = in.readInt();
            byte[] bytes = new byte[count];
            in.readBytes(bytes);
    
            TransferMsg transferMsg = new TransferMsg();
            transferMsg.setMsg(bytes);
            transferMsg.setLength(count);
            out.add(transferMsg);
        }
    }

    4. 服务器端修改

    修改MyNettyServerInitializer 加入自己的解码器

    package netty.tcp;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    
    public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 向管道加入处理器
            ChannelPipeline pipeline = ch.pipeline();
            // 1. 增加一个自定义的handler
            pipeline.addLast(new TransferMsgDecoder());
            pipeline.addLast(new MyServerHandler());
    
            System.out.println("server is ok~~~~");
        }
    }

    修改MyServerHandler

    package netty.tcp;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class MyServerHandler extends SimpleChannelInboundHandler<TransferMsg> {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TransferMsg msg) throws Exception {
            Integer length = msg.getLength();
            byte[] msg1 = msg.getMsg();
            String s = new String(msg1);
            System.out.println("读到消息,length: " + length + "	msg:" + s);
        }
    }

    2. 客户端修改

    TcpClient 加入自己的编码器

    package netty.tcp;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class TcpClient {
    
        private static final Integer PORT = 6666;
    
        public static void main(String[] args) throws InterruptedException {
            // 创建一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                // 创建一个启动Bootstrap(注意是Netty包下的)
                Bootstrap bootstrap = new Bootstrap();
                // 链式设置参数
                bootstrap.group(eventExecutors) // 设置线程组
                        .channel(NioSocketChannel.class) // 设置通道class
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                // 1. 加入一个自定义的处理器
                                pipeline.addLast(new TransferMsgEncoder());
                                pipeline.addLast(new MyClientHandler());
                            }
                        });
                System.out.println("客户端is ok...");
    
                // 启动客户端连接服务器(ChannelFuture 是netty的异步模型)
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
                // 监听关闭通道
                channelFuture.channel().closeFuture().sync();
            } finally {
                // 关闭
                eventExecutors.shutdownGracefully();
            }
        }
    }

    MyClientHandler 修改发送的消息格式

    package netty.tcp;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的 ChannelInboundHandlerAdapter 类
     */
    public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        /**
         * 通道就绪事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 循环发送十条消息
            int count = 10;
            String msg = "client msg ";
            String sendMsg = null;
            TransferMsg transferMsg = null;
            for (int i = 0; i < count; i++) {
                transferMsg = new TransferMsg();
                sendMsg = msg + i;
                transferMsg.setMsg(sendMsg.getBytes());
                transferMsg.setLength(sendMsg.getBytes().length);
                ctx.writeAndFlush(transferMsg);
            }
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            System.out.println("从客户端 " + ctx.channel().remoteAddress() + " 读取到的消息, long: " + msg);
            // 回显一个消息给客户端
            String msgStr = msg.toString(CharsetUtil.UTF_8);
            System.out.println(msgStr);
        }
    }

    测试: 启动一个服务器端,然后启动一个客户端,查看服务器日志如下:

    服务端启动成功,监听地址: 6666
    server is ok~~~~
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 0
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 1
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 2
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 3
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 4
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 5
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 6
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 7
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 8
    netty.tcp.TransferMsgDecoder.decode 被调用
    读到消息,length: 12    msg:client msg 9

      如果服务器向客户端返回相同的消息,在服务器端也需要加入自己的编码器;客户端加入自己的解码器。实测解决了粘包问题。

    总结: Netty 解决方法:

    1》 使用自定义协议 + 编解码器来解决

    2》 关键就是解决服务器每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或者少读的问题,从而避免TCP的粘包、拆包

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    PowerDesigner小技巧(整理中)
    将日志(Microsoft.Extensions.Logging)添加到.NET Core控制台应用程序
    VMware Workstation Pro 15.5.0 官方版本及激活密钥
    Git 设置和取消代理(SOCKS5代理)
    笔记
    哈希表(Hash Table)与哈希算法
    Elasticsearch分词
    微服务理论
    Elasticsearch与Mysql数据同步
    go语言常用命令
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14629571.html
Copyright © 2011-2022 走看看