zoukankan      html  css  js  c++  java
  • netty权威指南学习笔记五——分隔符和定长解码器的应用

        TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,通常采用以下4中方式:

      1. 消息长度固定,累计读取到长度综合为定长LEN的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据报;
      2. 将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;
      3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的分隔符;
      4. 通过在消息头中定义长度字段来标识消息的总长度。

      DelimiterBaseFrameDecoder——分隔符解码器,FixedLengthFrameDecoder——定长解码器

    下面我们采用#为分隔符进行代码练习运行。

    EchoServer服务端代码

     1 package com.decoder;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.buffer.ByteBuf;
     5 import io.netty.buffer.Unpooled;
     6 import io.netty.channel.ChannelFuture;
     7 import io.netty.channel.ChannelInitializer;
     8 import io.netty.channel.ChannelOption;
     9 import io.netty.channel.nio.NioEventLoopGroup;
    10 import io.netty.channel.socket.SocketChannel;
    11 import io.netty.channel.socket.nio.NioServerSocketChannel;
    12 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    13 import io.netty.handler.codec.string.StringDecoder;
    14 import io.netty.handler.logging.LogLevel;
    15 import io.netty.handler.logging.LoggingHandler;
    16 
    17 public class EchoServer {
    18     public void bind(int port) throws InterruptedException {
    19         NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    20         NioEventLoopGroup workGroup = new NioEventLoopGroup();
    21         try {
    22             ServerBootstrap b = new ServerBootstrap();
    23             b.group(bossGroup,workGroup)
    24                     .channel(NioServerSocketChannel.class)
    25                     .option(ChannelOption.SO_BACKLOG,100)
    26                     .childHandler(new LoggingHandler(LogLevel.INFO))
    27                     .childHandler(new ChannelInitializer<SocketChannel>() {
    28                         @Override
    29                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    30                             ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());//创建一个分隔符,确定为结束标志
    31                             socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
    32                                     .addLast(new StringDecoder())
    33                                     .addLast(new EchoServerHandler());
    34                         }
    35                     });
    36 //          绑定端口,同步等待成功
    37             ChannelFuture f = b.bind(port).sync();
    38 //          等待服务端监听端口关闭
    39             f.channel().closeFuture().sync();
    40         } finally {
    41             bossGroup.shutdownGracefully();
    42             workGroup.shutdownGracefully();
    43         }
    44     }
    45     public static void main(String[] args) throws InterruptedException {
    46         int port = 8080;
    47         if(args.length>0&&args!=null){
    48             port = Integer.parseInt(args[0]);
    49         }
    50         new EchoServer().bind(port);
    51 
    52     }
    53 }

    服务端处理IO代码

     1 package com.decoder;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.buffer.Unpooled;
     5 import io.netty.channel.ChannelHandlerContext;
     6 import io.netty.channel.ChannelInboundHandlerAdapter;
     7 
     8 public class EchoServerHandler extends ChannelInboundHandlerAdapter {
     9     int count;
    10     @Override
    11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    12         String body = (String) msg;
    13         System.out.println("This is"+ ++count +" times server receive client request.");
    14         body += "#";
    15         ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
    16         ctx.writeAndFlush(echo);
    17     }
    18 
    19     @Override
    20     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    21         ctx.flush();
    22     }
    23 
    24     @Override
    25     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    26         ctx.close();
    27     }
    28 }

    客户端发送消息代码

     1 package com.decoder;
     2 
     3 import io.netty.bootstrap.Bootstrap;
     4 import io.netty.buffer.ByteBuf;
     5 import io.netty.buffer.Unpooled;
     6 import io.netty.channel.ChannelFuture;
     7 import io.netty.channel.ChannelInitializer;
     8 import io.netty.channel.ChannelOption;
     9 import io.netty.channel.nio.NioEventLoopGroup;
    10 import io.netty.channel.socket.SocketChannel;
    11 import io.netty.channel.socket.nio.NioSocketChannel;
    12 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    13 import io.netty.handler.codec.string.StringDecoder;
    14 
    15 public class EchoClient {
    16     public void connection(int port,String host) throws InterruptedException {
    17         NioEventLoopGroup workGroup = new NioEventLoopGroup();
    18         try {
    19             Bootstrap b = new Bootstrap();
    20             b.group(workGroup)
    21                     .channel(NioSocketChannel.class)
    22                     .option(ChannelOption.TCP_NODELAY,true)
    23                     .handler(new ChannelInitializer<SocketChannel>() {
    24                         @Override
    25                         protected void initChannel(SocketChannel socketChannel) throws Exception {
    26                             ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
    27                             socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
    28                                     .addLast(new StringDecoder())
    29                                     .addLast(new EchoClientHandler());
    30 //
    31                         }
    32                     });
    33 //            发起异步连接操作
    34             ChannelFuture f = b.connect(host,port).sync();
    35 //                          等待客户端链路关闭
    36             f.channel().closeFuture().sync();
    37         } finally {
    38             workGroup.shutdownGracefully();
    39         }
    40     }
    41     public static void main(String[] args) throws InterruptedException {
    42         int port = 8080;
    43         if(args.length>0&&args!=null){
    44             System.out.println(args[0]);
    45             port = Integer.parseInt(args[0]);
    46         }
    47         new EchoClient().connection(port,"127.0.0.1");
    48     }
    49 }

    客户端处理IO代码

     1 package com.decoder;
     2 
     3 import io.netty.buffer.Unpooled;
     4 import io.netty.channel.ChannelHandlerContext;
     5 import io.netty.channel.ChannelInboundHandlerAdapter;
     6 
     7 public class EchoClientHandler extends ChannelInboundHandlerAdapter {
     8     private int count;
     9     static final String ECHO_REQ = "hello,zuixiaoyao,welcome here!#";
    10 
    11     @Override
    12     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    13         for(int i=0;i<10;i++){
    14             ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    15         }
    16     }
    17 
    18     @Override
    19     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    20         String body = (String) msg;
    21         System.out.println("this is client receive msg"+ ++count +"times:【"+body+"】");
    22     }
    23 
    24     @Override
    25     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    26         super.channelReadComplete(ctx);
    27     }
    28 
    29     @Override
    30     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    31         super.exceptionCaught(ctx, cause);
    32     }
    33 }

    运行结果

    服务端

    客户端

     若采用定长解码器,运行上面代码看看会发生什么,我们只需要对上面服务器中解码器换为定长解码器即可,解码器最大长度设置为20,看看

    修改的服务端代码如下:

     1  @Override
     2                         protected void initChannel(SocketChannel socketChannel) throws Exception {
     3                             ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());//创建一个分隔符,确定为结束标志
     4                             socketChannel.pipeline()
     5 //                                    .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
     6 //                                    修改为定长解码器
     7                                     .addLast(new FixedLengthFrameDecoder(20))
     8                                     .addLast(new StringDecoder())
     9                                     .addLast(new EchoServerHandler());
    10                         }

    运行后如下:

    服务端结果

    客户端结果

    我们发现所有运行返回的代码都不超过20字符。这就是按照定长解析的,但是解析的比较乱,具体的原理还需深入学习后才知道,暂时不表。

    按书上的操作,输入一行超过20字符的命令请求时,只返回一个20字符定长的数据显示。

  • 相关阅读:
    Java8简单的本地缓存实现
    Java堆内存详解
    拖拽实现备忘:拖拽drag&拖放drop事件浅析
    微信小程序下拉刷新PullDownRefresh的一些坑
    ES6里let、const、var区别总结
    nodejs大文件分片加密解密
    node+js实现大文件分片上传
    大文件上传前台分片后后台合并的问题
    fs.appendFileSync使用说明,nodejs中appendFile与writeFile追加内容到文件区别
    JS中的单线程与多线程、事件循环与消息队列、宏任务与微任务
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9346628.html
Copyright © 2011-2022 走看看