zoukankan      html  css  js  c++  java
  • Netty4 学习笔记之三:粘包和拆包

    前言

    上一篇Netty 心跳 demo 中,了解了Netty中的客户端和服务端之间的心跳。这篇就来讲讲Netty中的粘包和拆包以及相应的处理。

    名词解释

    粘包: 会将消息粘粘起来发送。类似吃米饭,一口吃多个饭粒,而不是一粒一粒的吃。
    拆包: 会将消息拆开,分为多次接受。类似喝饮料,一口一口的喝,而不是一口气喝完。

    简单的来说:
    多次发送较少内容,会发生粘包现象。
    单次发送内容过多,会发生拆包现象。

    我们使用简单的Netty的服务端和客户端demo用来测试粘包和拆包。
    Hello Netty 发送一百次,就会发送粘包现象;
    将《春江花月夜》和《行路难》发送一次就会发送拆包现象;

    示例图:

    粘包:

    这里写图片描述

    拆包:

    这里写图片描述

    解决粘包、拆包

    因为Netty已经提供了几个常用的解码器,帮助我们解决这些问题,所以我们不必再去造轮子了,直接拿来用就好了。

    解决粘包

    在Server服务端使用定长数据帧的解码器 FixedLengthFrameDecoder之后。
    可以明显看到数据已经按照我们所设定的大小分割了。
    这里写图片描述

    解决拆包

    在Server服务端使用字节解码器 LineBasedFrameDecoder 之后。
    由于字节已经超过我们设置的最大的字节数,所以报错了。
    这里写图片描述

    所以,我们发送的字节在设置的范围内的话,就可以看到拆包现象已经解决。
    这里写图片描述

    Netty还提供了一个HttpObjectAggregator类用于解决粘包、拆包现象。
    以下摘自Netty官方文档

    如果对于单条HTTP消息你不想处理多个消息对象,你可以传入 HttpObjectAggregator 到pipline中。HttpObjectAggregator 会将多个消息对象转变为单个 FullHttpRequest 或者 FullHttpResponse。

    使用HttpObjectAggregator 之后
    这里写图片描述

    这里写图片描述

    可以看到,粘包和拆包现象得到了改善。

    那么开始贴代码,几乎和之前的demo一样。

    服务端:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * 
    * Title: NettyServer
    * Description: Netty服务端
    * Version:1.0.0  
    * @author pancm
    * @date 2017年10月8日
     */
    public class NettyServer {
            private static final int port = 6789; //设置服务端端口
            private static  EventLoopGroup group = new NioEventLoopGroup();   // 通过nio方式来接收连接和处理连接   
            private static  ServerBootstrap b = new ServerBootstrap();
    
            /**
             * Netty创建全部都是实现自AbstractBootstrap。
             * 客户端的是Bootstrap,服务端的则是    ServerBootstrap。
             **/
            public static void main(String[] args) throws InterruptedException {
                try {
                    b.group(group);
                    b.channel(NioServerSocketChannel.class);
                    b.childHandler(new NettyServerFilter()); //设置过滤器
                    // 服务器绑定端口监听
                    ChannelFuture f = b.bind(port).sync();
                    System.out.println("服务端启动成功,端口是:"+port);
                    // 监听服务器关闭监听
                    f.channel().closeFuture().sync();
                }catch(Exception e){
                    e.printStackTrace();
                }
                finally {
                    group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程  
                }
            }
    }
    mport io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
      * 
     * Title: HelloServerInitializer
     * Description: Netty 服务端过滤器
     * Version:1.0.0  
    * @author pancm
    * @date 2017年10月8日
      */
    public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
    
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline ph = ch.pipeline();
             // 解码和编码,应和客户端一致
    //       ph.addLast(new FixedLengthFrameDecoder(100));   //定长数据帧的解码器 ,每帧数据100个字节就切分一次。  用于解决粘包问题           
    //         ph.addLast(new LineBasedFrameDecoder(2048));     //字节解码器 ,其中2048是规定一行数据最大的字节数。  用于解决拆包问题
             ph.addLast("aggregator", new HttpObjectAggregator(10*1024*1024)); 
             ph.addLast("decoder", new StringDecoder());
             ph.addLast("encoder", new StringEncoder());
             ph.addLast("handler", new NettyServerHandler());// 服务端业务逻辑
         }
     }
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.net.InetAddress;
    
    /**
     * 
    * Title: HelloServerHandler
    * Description:  服务端业务逻辑 粘包、拆包测试
    * Version:1.0.0  
    * @author pancm
    * @date 2017年10月8日
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        /** 条数 */
        private int count=0; 
        /**
         * 业务逻辑处理
         */
        @Override  
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
               String body = (String)msg;  
               System.out.println("接受的数据是: " + body + ";条数是: " + ++count); 
        }  
    
        /**
         * 建立连接时,返回消息
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
            ctx.writeAndFlush("客户端"+ InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
            super.channelActive(ctx);
        }
    }

    客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.io.IOException;
    /**
     * 
    * Title: NettyClient
    * Description: Netty客户端  粘包、拆包测试
    * Version:1.0.0  
    * @author pancm
    * @date 2017年10月16日
     */
    public class NettyClient {
    
        public static String host = "127.0.0.1";  //ip地址
        public static int port = 6789;          //端口
        /// 通过nio方式来接收连接和处理连接   
        private static EventLoopGroup group = new NioEventLoopGroup(); 
        private static  Bootstrap b = new Bootstrap();
        private static Channel ch;
    
        /**
         * Netty创建全部都是实现自AbstractBootstrap。
         * 客户端的是Bootstrap,服务端的则是    ServerBootstrap。
         **/
        public static void main(String[] args) throws InterruptedException, IOException { 
                System.out.println("客户端成功启动...");
                b.group(group);
                b.channel(NioSocketChannel.class);
                b.handler(new NettyClientFilter()); 
                // 连接服务端
                ch = b.connect(host, port).sync().channel();
                star(3);
        }
    
        public static void star(int i) throws IOException{
            String str="春江潮水连海平,海上明月共潮生。"
                    +"  滟滟随波千万里,何处春江无月明! "
                    +"  江流宛转绕芳甸,月照花林皆似霰;"
                    +"  空里流霜不觉飞,汀上白沙看不见。"
                    +"  江天一色无纤尘,皎皎空中孤月轮。"
                    +"  江畔何人初见月?江月何年初照人?"
                    +"  人生代代无穷已,江月年年望相似。"
                    +"  不知江月待何人,但见长江送流水。"
                    +"  白云一片去悠悠,青枫浦上不胜愁。"
                    +"  谁家今夜扁舟子?何处相思明月楼?"
                    +"  可怜楼上月徘徊,应照离人妆镜台。"
                    +"  玉户帘中卷不去,捣衣砧上拂还来。"
                    +"  此时相望不相闻,愿逐月华流照君。"
                    +"  鸿雁长飞光不度,鱼龙潜跃水成文。"
                    +"  昨夜闲潭梦落花,可怜春半不还家。"
                    +"  江水流春去欲尽,江潭落月复西斜。"
                    +"  斜月沉沉藏海雾,碣石潇湘无限路。"
                    +"  不知乘月几人归,落月摇情满江树。" 
                    +" 噫吁嚱,危乎高哉!蜀道之难,难于上青天!蚕丛及鱼凫,开国何茫然!尔来四万八千岁,不与秦塞通人烟。"
                    +" 西当太白有鸟道,可以横绝峨眉巅。地崩山摧壮士死,然后天梯石栈相钩连。上有六龙回日之高标,下有冲波逆折之回川。"
                    +" 黄鹤之飞尚不得过,猿猱欲度愁攀援。青泥何盘盘,百步九折萦岩峦。扪参历井仰胁息,以手抚膺坐长叹。"
                    +" 问君西游何时还?畏途巉岩不可攀。但见悲鸟号古木,雄飞雌从绕林间。又闻子规啼夜月,愁空山。"
                    +" 蜀道之难,难于上青天,使人听此凋朱颜!连峰去天不盈尺,枯松倒挂倚绝壁。飞湍瀑流争喧豗,砯崖转石万壑雷。"
                    +" 其险也如此,嗟尔远道之人胡为乎来哉!剑阁峥嵘而崔嵬,一夫当关,万夫莫开。"
                    +" 所守或匪亲,化为狼与豺。朝避猛虎,夕避长蛇;磨牙吮血,杀人如麻。锦城虽云乐,不如早还家。"
                    +" 蜀道之难,难于上青天,侧身西望长咨嗟!";
            if(i==1){
                for(int j=0;j<100;j++){
                    str="Hello Netty";
                    ch.writeAndFlush(str);
                }
            }else if(i==2){
                str+=str;
                ch.writeAndFlush(str);
            }else if(i==3){
                //System.getProperty("line.separator") 结束标记
                byte [] bt=(str+System.getProperty("line.separator")).getBytes();
                ByteBuf message = Unpooled.buffer(bt.length);  
                message.writeBytes(bt);  
                ch.writeAndFlush(message);
            }
    
    
            System.out.println("客户端发送数据:"+str+",发送数据的长度:"+str.length());
       }
    
    }
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    /**
     * 
    * Title: NettyClientFilter
    * Description: Netty客户端 过滤器
    * Version:1.0.0  
    * @author pancm
    * @date 2017年10月8日
     */
    public class NettyClientFilter extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline ph = ch.pipeline();
            /*
             * 解码和编码,应和服务端一致
             * */
            ph.addLast("decoder", new StringDecoder());
            ph.addLast("encoder", new StringEncoder());
            ph.addLast("handler", new NettyClientHandler()); //客户端的逻辑
        }
    }
    import java.util.Date;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * 
    * Title: NettyClientHandler
    * Description: 客户端业务逻辑实现
    * Version:1.0.0  
    * @author pancm
    * @date 2017年10月8日
     */
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
         /**
         * 业务逻辑处理   
         */
        @Override  
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
            System.out.println("客户端接受的消息:"+msg);
        }  
        /**
         * 建立连接时
         */
        @Override  
        public void channelActive(ChannelHandlerContext ctx) throws Exception {  
            System.out.println("建立连接时:"+new Date());  
            ctx.fireChannelActive();  
        }  
    
         /**
          * 
          * 关闭连接时
          */
        @Override  
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
            System.out.println("关闭连接时:"+new Date());  
        }  
    }

    该项目我放在github上了,有兴趣的可以看看!https://github.com/xuwujing/Netty

  • 相关阅读:
    教你一招Linux下文本比对方法
    Linux下find与exec的联手干大事
    Linux下Shell日期的格式,你知道几种?
    Linux下Python3.6的安装及避坑指南
    多线程中使用CompletableFuture
    ElasticSearch7.6.2中语法使用(更新中)
    ElasticSearch7.6.2使用_update_by_query语法
    ElasticSearch7.6.2使用_delete_by_query产生版本冲突问题
    filebeat7.6.2进程运行一段时间后自动退出问题解决
    把本地项目提交到gitLab
  • 原文地址:https://www.cnblogs.com/xuwujing/p/7782704.html
Copyright © 2011-2022 走看看