zoukankan      html  css  js  c++  java
  • 1. Netty解决Tcp粘包拆包

    一. TCP粘包问题

    1. 实际发送的消息, 可能会被TCP拆分成很多数据包发送, 也可能把很多消息组合成一个数据包发送

    2. 粘包拆包发生的原因

      (1) 应用程序一次写的字节大小超过socket发送缓冲区大小
      (2) 数据长度超多MSS大小进行分片

     > MSS :  Maximum Segment Size 最大报文段长度, 是TCP数据包数据段的最大长度  
                       MSS值等于收发双方提供的MSS值的最小值, 等于TCP报文长度-TCP首部长度  
    
      (3) 以太网帧的payload大于MTU进行IP分片
    		MTU : 硬件线路上可以传输的最大字节数
    

    二. Netty的解决方法

    1. LineBasedFrameDecoder:
      依次遍历ByteBuf中的可读字节, 发现有" "或者" " , 就把可读位置到该位置的字节看做一条消息.他是用换行符作为分隔符的解码器. 支持配置单行消息最大长度, 若达到最大长度还没出现换行符, 会抛出异常, 并忽略之前的异常数据

    2. StringDecoder: 把接收到的数据流按照编码格式转换成字符串

      public class TimeServer4 {
      	public static void main(String[] args) {
      		new TimeServer4().bind(8112);
      	}
      
      	public void bind(int port){
      		// 配置服务端线程组
      		EventLoopGroup bossGroup = new NioEventLoopGroup();     // 接受线程组
      		EventLoopGroup workerGroup = new NioEventLoopGroup();   // 处理请求的线程组
      		try {
      			ServerBootstrap sbs = new ServerBootstrap();
      			sbs.group(bossGroup,workerGroup)
      					.channel(NioServerSocketChannel.class)
      					.option(ChannelOption.SO_BACKLOG,1024)  // 链接的缓冲队列
      					.childHandler(new ChannelInitializer<SocketChannel>() {
      						@Override
      						protected void initChannel(SocketChannel ch) throws Exception {
      							// 若在1024个字节内没有发现
      或
      报错,然后忽略之前的数据流
      							ch.pipeline().addLast(new LineBasedFrameDecoder(1024));   
      							ch.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
      							ch.pipeline().addLast(new TimeServerHandler());
      						}
      					});
      
      			//绑定端口,同步等待成功
      			ChannelFuture f = sbs.bind(port).sync();
      			f.channel().closeFuture().sync();
      
      		} catch (InterruptedException e) {
      			e.printStackTrace();
      		} finally {
      			bossGroup.shutdownGracefully();
      			workerGroup.shutdownGracefully();
      		}
      	}
      
      	private class TimeServerHandler extends ChannelHandlerAdapter{
      		private int counter;
      
      		@Override
      		public void channelActive(ChannelHandlerContext ctx) throws Exception {
      			System.out.println("链接开启");
      		}
      
      		@Override
      		public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
      	   /*   不加StringDecoder,需要手动转化ByteBuf写到字节数组
      			ByteBuf buf = (ByteBuf) msg;
      			byte[] bytes = new byte[buf.readableBytes()]; 
      			buf.readBytes(bytes);
      			String body = new String(bytes); */
      			String body = (String)msg;    
      			System.out.println("recieve order:"+body+"; counter is:"+ ++counter);
      			if("query time".equalsIgnoreCase(body)){
      				//客户端也注册了LineNasedFrameDecoder,所以服务端发送的消息也要以"
      "或"
      "结尾 
      				String nowTime = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + System.getProperty("line.separator");
      				ByteBuf byteBuf = Unpooled.copiedBuffer(nowTime.getBytes());
      				ctx.writeAndFlush(byteBuf);
      			}else{
      				ByteBuf byteBuf = Unpooled.copiedBuffer("bad order".getBytes());
      				ctx.writeAndFlush(byteBuf);
      			}
      		}
      
      		@Override
      		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      			cause.printStackTrace();
      			ctx.close();
      		}
      	}
      }
      
      public class TimeClient {
      
      	public static void main(String[] args) {
      		new TimeClient().connect(8112,"localhost");
      	}
      
      	public void connect(int port,String host){
      		EventLoopGroup group = new NioEventLoopGroup();
      		try{
      			Bootstrap bs = new Bootstrap();
      			bs.group(group).channel(NioSocketChannel.class)
      					.option(ChannelOption.TCP_NODELAY,true)
      					.handler(new ChannelInitializer<SocketChannel>() {
      						@Override
      						protected void initChannel(SocketChannel ch) throws Exception {
      							ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      							ch.pipeline().addLast(new StringDecoder());
      							ch.pipeline().addLast(new TimeClientHandler());
      						}
      					});
      			//发起异步操作链接
      			ChannelFuture f = bs.connect(host,port).sync();
      			f.channel().closeFuture().sync();
      		} catch (InterruptedException e) {
      			e.printStackTrace();
      		} finally {
      			group.shutdownGracefully();
      		}
      	}
      
      	private class TimeClientHandler extends ChannelHandlerAdapter{
      		private int counter;
      		// 发出的请求报文必须带有"
      "或"
      ",否则服务端的LineBasedFrameDecoder无法解析
      		private byte[] req = ("query time"+ System.getProperty("line.separator")).getBytes();
      
      		@Override
      		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      			String body = (String) msg;
      			System.out.println("Now : "+ body + "the counter is "+ ++counter);
      		}
      
      		@Override
      		public void channelActive(ChannelHandlerContext ctx) throws Exception {
      			ByteBuf msg = null;
      			for (int i = 0; i < 100; i++) {
      				msg = Unpooled.buffer(req.length);  // 创建指定长度的buf
      				msg.writeBytes(req);
      				ctx.writeAndFlush(msg);
      			}
      		}
      
      		@Override
      		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      			cause.printStackTrace();
      			ctx.close();
      		}
      	}
      }
      
      
  • 相关阅读:
    Mybatis原理
    周六上课随记
    第一次外包面试
    复习所想
    如何解决高并发下的超卖问题
    Tomcat架构解析
    即将逝去的25岁
    go 刷算法第一题——反转字符串
    JavaScript杂货
    jdk17新特性
  • 原文地址:https://www.cnblogs.com/72808ljup/p/5257130.html
Copyright © 2011-2022 走看看