zoukankan      html  css  js  c++  java
  • Netty3:分隔符和定长解码器

    回顾TCP粘包/拆包问题解决方案

    上文详细说了TCP粘包/拆包问题产生的原因及解决方式,并以LineBasedFrameDecoder为例演示了粘包/拆包问题的实际解决方案,本文再介绍两种粘包/拆包问题的解决方案:分隔符和定长解码器。在开始本文之前,先回顾一下解决粘包/拆包问题的几个方式:

    • 消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息,将计数器重置,重新读取下一个消息
    • 将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛
    • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
    • 通过在消息头中定义长度字段来标志消息的总长度

    Netty对上面4种方式做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便,有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。

    基于DelimiterBasedFrameDecoder的TCP粘包/拆包解决方案

    使用DelimiterBasedFrameDecoder,我们可以自动完成以分隔符作为码流结束标志的消息的解码,下面通过例子来学习下DelimiterBasedFrameDecoder的使用。

    首先写一个EchoServer,和TimeServer差不多,主要是加上了DelimiterBasedFrameDecoder,分隔符定为"¥_":

     1 public class EchoServer {
     2 
     3     public void bind(int port) throws Exception {
     4         // NIO线程组
     5         EventLoopGroup bossGroup = new NioEventLoopGroup();
     6         EventLoopGroup workerGroup = new NioEventLoopGroup();
     7         
     8         try {
     9             ServerBootstrap b = new ServerBootstrap();
    10             b.group(bossGroup, workerGroup)
    11                 .channel(NioServerSocketChannel.class)
    12                 .option(ChannelOption.SO_BACKLOG, 1024)
    13                 .childHandler(new ChildChannelHandler());
    14             
    15             // 绑定端口,同步等待成功
    16             ChannelFuture f = b.bind(port).sync();
    17             // 等待服务端监听端口关闭
    18             f.channel().closeFuture().sync();
    19         } finally {
    20             // 优雅退出,释放线程池资源
    21             bossGroup.shutdownGracefully();
    22             workerGroup.shutdownGracefully();
    23         }
    24     }
    25     
    26     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    27         @Override
    28         protected void initChannel(SocketChannel arg0) throws Exception {
    29             ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    30             
    31             arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    32             arg0.pipeline().addLast(new StringDecoder());
    33             arg0.pipeline().addLast(new EchoServerHandler());
    34         }
    35     }
    36     
    37 }

    接着是EchoServerHandler,将接收到的消息打印出来,并记录收到的消息次数:

     1 public class EchoServerHandler extends ChannelHandlerAdapter {
     2 
     3     private int counter = 0;
     4     
     5     @Override
     6     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     7         String body = (String)msg;
     8         System.out.println("This is " + ++counter + " times receive client:[" + body + "]");
     9         
    10         body += "$_";
    11         ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
    12         ctx.writeAndFlush(echo);
    13     }
    14     
    15     @Override
    16     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    17         cause.printStackTrace();
    18         ctx.close();
    19     }
    20     
    21 }

    收到消息之后,将消息加上"$_"回给客户端。看下客户端EchoClient的写法,也是一样,加上DelimiterBasedFrameDecoder:

     1 public class EchoClient {
     2 
     3     public void connect(int port, String host) throws Exception {
     4         EventLoopGroup group = new NioEventLoopGroup();
     5         try {
     6             Bootstrap b = new Bootstrap();
     7             
     8             b.group(group)
     9                 .channel(NioSocketChannel.class)
    10                 .option(ChannelOption.TCP_NODELAY, true)
    11                 .handler(new ChannelInitializer<SocketChannel>() {
    12                     protected void initChannel(SocketChannel ch) throws Exception {
    13                         ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    14                         
    15                         ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    16                         ch.pipeline().addLast(new StringDecoder());
    17                         ch.pipeline().addLast(new EchoClientHandler());
    18                     };
    19                 });
    20             
    21             // 发起异步连接操作
    22             ChannelFuture f = b.connect(host, port).sync();
    23             // 等待客户端连接关闭
    24             f.channel().closeFuture().sync();
    25         } finally {
    26             // 优雅退出,释放NIO线程组
    27             group.shutdownGracefully();
    28         }
    29     }
    30     
    31 }

    写一个EchoClientHandler,发送10条消息到Server并记录从Server回来的数据:

     1 public class EchoClientHandler extends ChannelHandlerAdapter {
     2 
     3     private int counter;
     4     
     5     private static final String ECHO_REQ = "Hi, RickyXu, Welcome to Netty.$_";
     6     
     7     public EchoClientHandler() {
     8         
     9     }
    10     
    11     @Override
    12     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    13         for (int i = 0; i < 10; i++) {
    14             ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    15         }
    16     }
    17     
    18     @Override
    19     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    20         System.out.println("This is" + ++counter + " times receive server:[" + msg + "]");
    21     }
    22     
    23     @Override
    24     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    25         ctx.flush();
    26     }
    27     
    28     @Override
    29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    30         cause.printStackTrace();
    31         ctx.close();
    32     }
    33     
    34 }

    先运行服务端代码再运行客户端代码,看下服务端收到的数据为:

    This is 1 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 2 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 3 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 4 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 5 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 6 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 7 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 8 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 9 times receive client:[Hi, RickyXu, Welcome to Netty.]
    This is 10 times receive client:[Hi, RickyXu, Welcome to Netty.]

    一模一样收到10条且末尾的分隔符被忽略,客户端收到的响应为:

    This is 1 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 2 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 3 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 4 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 5 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 6 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 7 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 8 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 9 times receive server:[Hi, RickyXu, Welcome to Netty.]
    This is 10 times receive server:[Hi, RickyXu, Welcome to Netty.]

    同样收到了10条数据且忽略了末尾的换行符。

    这样我们就通过一个示例演示了使用DelimiterBasedFrameDecoder解决TCP粘包/拆包,下面看一下使用FixedLengthFrameDecoder解决TCP粘包/拆包的示例。

    基于FixedLengthFrameDecoder的TCP粘包/拆包解决方案

    FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常实用,

    同样看一下例子,先写一个EchoServer,加入FixedLengthFrameDecoder:

    public class EchoServer {
    
        public void bind(int port) throws Exception {
            // NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
                
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
        
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel arg0) throws Exception {
                arg0.pipeline().addLast(new FixedLengthFrameDecoder(20));
                arg0.pipeline().addLast(new StringDecoder());
                arg0.pipeline().addLast(new EchoServerHandler());
            }
        }
        
    }

    接着下一下EchoServerHandler,EchoServerHandler比较简单:

    public class EchoServerHandler extends ChannelHandlerAdapter {
    
         @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Receive client:[" + msg + "]");
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
        
    }

    这里只打印接收到的数据,因为我们使用telnet来模拟发送请求而不是写一个EchoClient。首先telnet一下localhost 8080:

    接着使用"Ctrl+]":

    输入回车,这样就变成了回显模式,即输入什么看到什么,简单说一下telnet原理:

    1. 建立与服务器的TCP连接
    2. 从键盘上接收输入的字符
    3. 把输入的字符变成标准格式并发送给服务器
    4. 从服务器接收输出的信息
    5. 输出的信息显示在屏幕/控制台上

    注意一下第2点+第3点,这里是键盘上输入一个字符就会发送这个字符到服务端的,对这点有疑问的可以在FixedLengthFrameDecoder的decode方法上打断点,就可以看到每输入一个字符,断点就会进入一次。

    接着我们"0123456789"这样一直循环输入,FixedLengthFrameDecoder设置的Length是20,按照上面的解释,这样的话应该输入两轮"0123456789"即第二个9输入之后,控制台上就会打印了:

    看一下控制台:

    Receive client:[01234567890123456789]

    没毛病,证明了FixedLengthFrameDecoder确实是按照定长接收包的,如果收到的包是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下个包到达之后进行拼包,直到读取到一个完整的包

  • 相关阅读:
    ubuntu安装jdk的两种方法
    LeetCode 606. Construct String from Binary Tree (建立一个二叉树的string)
    LeetCode 617. Merge Two Binary Tree (合并两个二叉树)
    LeetCode 476. Number Complement (数的补数)
    LeetCode 575. Distribute Candies (发糖果)
    LeetCode 461. Hamming Distance (汉明距离)
    LeetCode 405. Convert a Number to Hexadecimal (把一个数转化为16进制)
    LeetCode 594. Longest Harmonious Subsequence (最长的协调子序列)
    LeetCode 371. Sum of Two Integers (两数之和)
    LeetCode 342. Power of Four (4的次方)
  • 原文地址:https://www.cnblogs.com/xrq730/p/8733888.html
Copyright © 2011-2022 走看看