zoukankan      html  css  js  c++  java
  • 架构师养成记--20.netty的tcp拆包粘包问题

    问题描述

    比如要发ABC DEFG HIJK 这一串数据,其中ABC是一个包,DEFG是一个包,HIJK是一个包。由于TCP是基于流发送的,所以有可能出现ABCD EFGH 这种情况,那么ABC和D就粘包了,DEFG被拆开了。

    解决方案

    1、消息定长,例如报文大小控制为200,如果不够就空位补全

    2、在包结尾加特殊字符进行分割,如$_

    3、消息分为消息头和消息体,在消息中包含消息长度等字段,然后进行消息逻辑处理。

    分隔符方案

    服务端

    import java.nio.ByteBuffer;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class Server {
    
        public static void main(String[] args) throws Exception{
            //1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            //2 创建服务器辅助类
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             .option(ChannelOption.SO_SNDBUF, 32*1024)
             .option(ChannelOption.SO_RCVBUF, 32*1024)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //设置特殊分隔符
                    ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                    //设置字符串形式的解码
                    sc.pipeline().addLast(new StringDecoder());
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            //4 绑定连接
            ChannelFuture cf = b.bind(8765).sync();
            
            //等待服务器监听端口关闭
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
            
        }
        
    }
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(" server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String request = (String)msg;
            System.out.println("Server :" + msg);
            String response = "服务器响应:" + msg + "$_";
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
            ctx.close();
        }
    
    
    
    
    }

    客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
            
            EventLoopGroup group = new NioEventLoopGroup();
            
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //
                    ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                    sc.pipeline().addLast(new StringDecoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
            
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
            
            
            //等待客户端端口关闭
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
            
        }
    }
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                String response = (String)msg;
                System.out.println("Client: " + response);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                String response = (String)msg;
                System.out.println("Client: " + response);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
  • 相关阅读:
    mysql 远程登陆不上
    hdu 5339 Untitled【搜索】
    SqlServer 书目
    passwordauthentication yes
    oracle 11g RAC ocfs2
    Oracle 11g RAC database on ASM, ACFS or OCFS2
    CentOS ips bonding
    Oracle 11g RAC features
    openStack 王者归来之 trivial matters
    openstack windows 2008 img
  • 原文地址:https://www.cnblogs.com/sigm/p/6358423.html
Copyright © 2011-2022 走看看