一. TCP粘包问题
-
实际发送的消息, 可能会被TCP拆分成很多数据包发送, 也可能把很多消息组合成一个数据包发送
-
粘包拆包发生的原因
(1) 应用程序一次写的字节大小超过socket发送缓冲区大小
(2) 数据长度超多MSS大小进行分片
> MSS : Maximum Segment Size 最大报文段长度, 是TCP数据包数据段的最大长度
MSS值等于收发双方提供的MSS值的最小值, 等于TCP报文长度-TCP首部长度
(3) 以太网帧的payload大于MTU进行IP分片
MTU : 硬件线路上可以传输的最大字节数
二. Netty的解决方法
-
LineBasedFrameDecoder:
依次遍历ByteBuf中的可读字节, 发现有" "或者" " , 就把可读位置到该位置的字节看做一条消息.他是用换行符作为分隔符的解码器. 支持配置单行消息最大长度, 若达到最大长度还没出现换行符, 会抛出异常, 并忽略之前的异常数据 -
StringDecoder: 把接收到的数据流按照编码格式转换成字符串
public class TimeServer4 { public static void main(String[] args) { new TimeServer4().bind(8112); } public void bind(int port){ // 配置服务端线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接受线程组 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理请求的线程组 try { ServerBootstrap sbs = new ServerBootstrap(); sbs.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) // 链接的缓冲队列 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 若在1024个字节内没有发现 或 报错,然后忽略之前的数据流 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder(Charset.forName("utf-8"))); ch.pipeline().addLast(new TimeServerHandler()); } }); //绑定端口,同步等待成功 ChannelFuture f = sbs.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class TimeServerHandler extends ChannelHandlerAdapter{ private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("链接开启"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws /* 不加StringDecoder,需要手动转化ByteBuf写到字节数组 ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String body = new String(bytes); */ String body = (String)msg; System.out.println("recieve order:"+body+"; counter is:"+ ++counter); if("query time".equalsIgnoreCase(body)){ //客户端也注册了LineNasedFrameDecoder,所以服务端发送的消息也要以" "或" "结尾 String nowTime = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + System.getProperty("line.separator"); ByteBuf byteBuf = Unpooled.copiedBuffer(nowTime.getBytes()); ctx.writeAndFlush(byteBuf); }else{ ByteBuf byteBuf = Unpooled.copiedBuffer("bad order".getBytes()); ctx.writeAndFlush(byteBuf); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } }
public class TimeClient { public static void main(String[] args) { new TimeClient().connect(8112,"localhost"); } public void connect(int port,String host){ EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bs = new Bootstrap(); bs.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); //发起异步操作链接 ChannelFuture f = bs.connect(host,port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } private class TimeClientHandler extends ChannelHandlerAdapter{ private int counter; // 发出的请求报文必须带有" "或" ",否则服务端的LineBasedFrameDecoder无法解析 private byte[] req = ("query time"+ System.getProperty("line.separator")).getBytes(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("Now : "+ body + "the counter is "+ ++counter); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; for (int i = 0; i < 100; i++) { msg = Unpooled.buffer(req.length); // 创建指定长度的buf msg.writeBytes(req); ctx.writeAndFlush(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } }