zoukankan      html  css  js  c++  java
  • ByteBuf使用实例

      之前我们有个netty5的拆包解决方案(参加netty5拆包问题解决实例),现在我们采用另一种思路,不需要新增LengthFieldBasedFrameDecoder,直接修改NettyMessageDecoder:

    package com.wlf.netty.nettyapi.msgpack;
    
    import com.wlf.netty.nettyapi.constant.Delimiter;
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    public class NettyMessageDecoder extends ByteToMessageDecoder {
    
        /**
         * 消息体字节大小:分割符字段4字节+长度字段4字节+请求类型字典1字节+预留字段1字节=10字节
         */
        private static final int HEAD_LENGTH = 10;
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    
            while (true) {
    
                // 标记字节流开始位置
                byteBuf.markReaderIndex();
    
                // 若读取到分割标识,说明读取当前字节流开始位置了
                if (byteBuf.readInt() == Delimiter.DELIMITER) {
                    break;
                }
    
                // 重置读索引为0
                byteBuf.resetReaderIndex();
    
                // 长度校验,字节流长度至少10字节,小于10字节则等待下一次字节流过来
                if (byteBuf.readableBytes() < HEAD_LENGTH) {
                    byteBuf.resetReaderIndex();
                    return;
                }
            }
    
            // 2、获取data的字节流长度
            int dataLength = byteBuf.readInt();
    
            // 校验数据包是否全部发送过来,总字节流长度(此处读取的是除去delimiter和length之后的总长度)-
            // type和reserved两个字节=data的字节流长度
            int totalLength = byteBuf.readableBytes();
            if ((totalLength - 2) < dataLength) {
    
                // 长度校验,字节流长度少于数据包长度,说明数据包拆包了,等待下一次字节流过来
                byteBuf.resetReaderIndex();
                return;
            }
    
            // 3、请求类型
            byte type = byteBuf.readByte();
    
            // 4、预留字段
            byte reserved = byteBuf.readByte();
    
    
            // 5、数据包内容
            byte[] data = null;
            if (dataLength > 0) {
                data = new byte[dataLength];
                byteBuf.readBytes(data);
            }
    
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            header.setDelimiter(Delimiter.DELIMITER);
            header.setLength(dataLength);
            header.setType(type);
            header.setReserved(reserved);
            nettyMessage.setHeader(header);
            nettyMessage.setData(data);
    
            list.add(nettyMessage);
    
            // 回收已读字节
            byteBuf.discardReadBytes();
        }
    }

      我们的改动很小,只不过将原来的读索引改为标记索引,然后在拆包时退出方法前重置读索引,这样下次数据包过来,我们的读索引依然从0开始,delimiter的标记就可以读出来,而不会陷入死循环了。

      ByteBuf是ByteBuffer的进化版,ByteBuffer(参见ByteBuffer使用实例)才一个索引,读写模式需要通过flip来转换,而ByteBuf有两个索引,readerIndex读索引和writerIndex写索引,读写转换无缝连接,青出于蓝而胜于蓝:

          +-------------------+------------------+------------------+
          | discardable bytes |  readable bytes  |  writable bytes  |
          |                           |     (CONTENT)    |                         |
          +-------------------+------------------+------------------+
          |                           |                            |                         |
          0      <=      readerIndex   <=   writerIndex    <=    capacity

      既然有两个索引,那么标记mask、重置reset必然也是两两对应,上面的代码中我们只需要用到读标记和读重置。

      我们把客户端handler也修改下,先把LengthFieldBasedFrameDecoder去掉:

    // channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));

      再让数据包更大一些:

        /**
         * 构造PCM请求消息体
         *
         * @return
         */
        private byte[] buildPcmData() throws Exception {
            byte[] resultByte = longToBytes(System.currentTimeMillis());
    
            // 读取一个本地文件
            String AUDIO_PATH = "D:\input\test_1.pcm";
            try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
    
                int len = -1;
                byte[] content = new byte[1024];
                while((len = raf.read(content)) != -1)
                {
                   resultByte = addAll(resultByte, content);
                }
            }
    
            return resultByte;
        }
                

      再debug下看看,第一次解析客户端发送的数据,读取1024字节,我们可以看到读索引是8(delimiter+length=8),写索引就是1024,我们的大包里有3939116个字节,去掉10个字节的header,剩下小包是3939106::

       第二次再读1024,代码已经执行reset重置读索引了,所以读索引由8改为0,写索引累增到2048:

       第三次再读1024,写索引继续累增到3072:

       最后一次发1024,写索引已经到达3939116,大包传输结束了:

       从上面看出,我们对ByteBuf的capacity一直在翻倍,读指针一直标记在大包的起始位置0,这样做的目的是每次都能读取小包的长度length(3939106),拿来跟整个ByteBuf的长度作比较,只要它取到的小包没到达到length,我们就继续接受新包,写索引不停的累加,直到整个大包长度>=3939116(也就是小包>=3939106),这时我们开始移动读索引,将字节流写入对象,最后回收已读取的字节(调用discardReaderBytes方法):

      BEFORE discardReadBytes()

          +-------------------+------------------+------------------+
          | discardable bytes |  readable bytes  |  writable bytes  |
          +-------------------+------------------+------------------+
          |                         |                      |                            |
          0      <=      readerIndex   <=   writerIndex    <=    capacity


      AFTER discardReadBytes()

          +------------------+--------------------------------------+
          |  readable bytes  |    writable bytes (got more space)   |
          +------------------+--------------------------------------+
          |                        |                                               |
    readerIndex (0) <= writerIndex (decreased)        <=        capacity

      其他方法参见测试类:

    package com.wlf.netty.nettyserver;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import org.junit.Assert;
    import org.junit.Test;
    
    public class ByteBufTest {
        @Test
        public void byteBufTest() {
            ByteBuf byteBuf = Unpooled.buffer(10);
            byteBuf.writeInt(0xabef0101);
            byteBuf.writeInt(1024);
            byteBuf.writeByte((byte) 1);
            byteBuf.writeByte((byte) 0);
    
            // 开始读取
            printDelimiter(byteBuf);
            printLength(byteBuf);
    
            // 派生一个ByteBuf,取剩下2个字节,但读索引不动
            ByteBuf duplicatBuf = byteBuf.duplicate();
            printByteBuf(byteBuf);
    
            // 派生一个ByteBuf,取剩下2个字节,读索引动了
            ByteBuf sliceBuf = byteBuf.readSlice(2);
            printByteBuf(byteBuf);
    
            // 两个派生的对象其实是一样的
            Assert.assertEquals(duplicatBuf, sliceBuf);
        }
    
        private void printDelimiter(ByteBuf buf) {
            int newDelimiter = buf.readInt();
            System.out.printf("delimeter: %s
    ", Integer.toHexString(newDelimiter));
            printByteBuf(buf);
        }
    
        private void printLength(ByteBuf buf) {
            int length = buf.readInt();
            System.out.printf("length: %d
    ", length);
            printByteBuf(buf);
        }
    
        private void printByteBuf(ByteBuf buf) {
            System.out.printf("reader Index: %d, writer Index: %d, capacity: %d
    ", buf.readerIndex(), buf.writerIndex(), buf.capacity());
        }
    }

      输出:

    delimeter: abef0101
    reader Index: 4, writer Index: 10, capacity: 10
    length: 1024
    reader Index: 8, writer Index: 10, capacity: 10
    reader Index: 8, writer Index: 10, capacity: 10
    reader Index: 10, writer Index: 10, capacity: 10
  • 相关阅读:
    编码原则 之 Once and Only Once
    编码原则 之 Stable Dependencies
    分布式锁
    DTS(待了解)
    BPMN(待了解)
    criteo marketing api 相关
    enum & json 之间的转换
    bootstrap:modal & iframe
    记Ubuntu Mongodb 和 Mysql的安装与使用
    齐次和非齐次线性方程组的解法
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/11737415.html
Copyright © 2011-2022 走看看