zoukankan      html  css  js  c++  java
  • Netty(4)Stream by codec(粘包与拆包)

     TCP/IP,传输的是byte[],将byte[]放入队列中。可能会发生粘包和拆包。

    比如,客户端向服务端发送了2条消息,分别为D1,D2,可能产生的情况,如下图所示:

    情况一:正常的。

    情况二:粘包。

    情况三:拆包。即:部分数据不是一次完整发送的,而是分了至少2次发送。

    如本例,D2拆成了D2_1和D2_2,这是拆包。

    服务端分2次收到包,第一次收到了D1和D2_1包,这是粘包;服务端第二次收到了D2_2包,这是拆包。

    回到Time client例子,存在相同的问题。4字节的int很小,很少发生粘包或拆包。但是,如果并发量大时,可能会发生。
    最简单的方法是创建一个内部全局的(只为了多次接收放入相同buffer)buffer,等待,直到4个字节全部接受。以下修改了TimeClientHandler解决此问题。

    第一种解决方法:全局buffer,累积。

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import lombok.extern.slf4j.Slf4j;
    import cn.hutool.core.date.DateUtil;
    @Slf4j
    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        private ByteBuf buf;
        
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            buf = ctx.alloc().buffer(4);//(1)
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            buf.release();//(1)
            buf = null;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf m = (ByteBuf) msg;
            buf.writeBytes(m);//(2)
            m.release();
            if (buf.readableBytes() >= 4) {//(3)
                long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L)*1000L;
                log.info("{}",DateUtil.date(currentTimeMillis));
                ctx.close();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    1、ChannelHandler

    • ChannelHandler有2个监听器方法(生命周期):handlerAdded()和handlerRemoved()。当该handler被添加到pipeline中时,被触发;当该handler从pipeline删除时,被触发。该handler,执行顺序:channelAdd()--->channelRead()--->channelRemove()。
    • 只要任务不重,即不会阻塞很久,可以在这2个方法中做一些初始化的工作。比如本例,channelAdd()中初始分配一个4字节的ByteBuf,在channelRemove()中释放ByteBuf。

    2、首先,接收到的所有字节均被写入buf。
    3、然后,handler必须要检查是否够4个字节(本例),如果不够(拆包),则当有剩下的数据来时,Netty会再次调用该channelRead()方法,直到4个字节都接收到为止

    第二种解决方法:使用解码器

    尽管第一种方法解决了粘包和拆包问题,但是,代码臃肿。因为,可以向pipeline中添加多个handler,因此,我们可以将TimeClientHandler分割成2个handler:
    1)、TimeDecoder
    2)、上节里的TimeClientHandler版本。

    TimeDecoder.java

    import java.util.List;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    public class TimeDecoder extends ByteToMessageDecoder {//(1)
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//(2)
            if (in.readableBytes() < 4) {
                return ;//(3)
            }
            out.add(in.readBytes(4));//(4)
        }
    }

    1、ByteToMessageDecoder 实现了ChannelInboundHandler


    2、ByteToMessageDecoder,当调用decode()时,其内部实现了“累积”功能的buffer,即不用自己在写全局buffer了,当接收到新数据时,会向该buffer中写入。
    3、decode():比如,一个包分2次传的,则会调用2次decode()。第一次即使不够4个字节,也会存入其内部“累积”buffer。我们的decode()方法中,return即可。
    4、decode()中,一旦添加了一个obj到“out”,意味着该decoder已经成功将消息解码了,即解决了粘包拆包问题。此时,ByteToMessageDecoder将会丢弃掉“累积”buffer中已读的消息。ByteToMessageDecoder将会不断调用decode(),直到添加“空”到out为止。

    TimeClientHandler.java(上节time中的TimeClientHandler)

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import lombok.extern.slf4j.Slf4j;
    import cn.hutool.core.date.DateUtil;
    @Slf4j
    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf m = (ByteBuf) msg;
            try {
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L)*1000L;
                log.info("{}",DateUtil.date(currentTimeMillis));
                ctx.close();
            } finally {
                m.release();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    然后,

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
    }

    执行顺序:TimeDecoder.decode(ctx, ByteBuf in, List<Object> out)--->TimeClientHandler.channelRead(ctx, Object msg)。

    上例中,从TimeDecoder传给TimeClientHandler的依然是ByteBuf,既然是int,那我们可以直接传递int吗?可以,如下:

    TimeDecoder.java,修改成

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//(2)
            if (in.readableBytes() < 4) {
                return ;//(3)
            }
            //out.add(in.readBytes(4));//(4)
            out.add(in.readUnsignedInt());//(4)
        }

    TimeClientHandler.java,修改成

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            long m = (long) msg;
            long currentTimeMillis = (m - 2208988800L)*1000L;
            log.info("{}",DateUtil.date(currentTimeMillis));
            ctx.close();
    }

    ReplayingDecoder是一个更加简单的decoder。可以代替ByteToMessageDecoder,只需要修改TimeDecoder,如下:

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ReplayingDecoder;
    
    import java.util.List;
    
    public class TimeDecoder extends ReplayingDecoder<Void> {//(1)
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//(2)
            out.add(in.readUnsignedInt());
        }
    }

    其他代码都不用动。

    最终输出:

    服务端:18:43:45.824 [nioEventLoopGroup-3-4] -549056671

    客户端:

    18:43:45.854 [nioEventLoopGroup-2-1] 2018-09-14 18:43:45
    18:43:45.929 [main] client channel is closed.

  • 相关阅读:
    Windows Server 2012 R2的安装(GUI桌面版本)
    CentOS安装-(CentOS7)最小化安装
    【转】Makefile步步为营
    【转载】人工智能必备数学知识
    【萌新向】cartographer_ros最新安装指南 2019-12
    Ubuntu 16.04 允许进行vnc远程控制【转】
    UVW平台运动控制算法以及matlab仿真
    Java语言基础13—IO
    Java数组元素去重(不使用集合)
    java基础12—集合类
  • 原文地址:https://www.cnblogs.com/yaoyuan2/p/9648324.html
Copyright © 2011-2022 走看看