zoukankan      html  css  js  c++  java
  • netty 对 protobuf 协议的解码与包装探究(2)

    netty 默认支持protobuf 的封装与解码,如果通信双方都使用netty则没有什么障碍,但如果客户端是其它语言(C#)则需要自己仿写与netty一致的方式(解码+封装),提前是必须很了解netty是如何进行封装与解码的。这里主要通过读源码主要类ProtobufVarint32FrameDecoder(解码)+ProtobufVarint32LengthFieldPrepender(封装) 来解析其原理与实现。

    文章来源http://www.cnblogs.com/tankaixiong

    一,支持protobuf 协议的默认实现

    //配置服务端NIO线程组  
            EventLoopGroup bossGroup = new NioEventLoopGroup();  
            EventLoopGroup workerGroup = new NioEventLoopGroup();  
            try{  
                ServerBootstrap b = new ServerBootstrap();  
                b.group(bossGroup, workerGroup)  
                    .channel(NioServerSocketChannel.class)  
                    .option(ChannelOption.SO_BACKLOG, 1024)  
                    .handler(new LoggingHandler(LogLevel.INFO))  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
      
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            ch.pipeline()  
                            .addLast(new ProtobufVarint32FrameDecoder())                          
                            .addLast(new ProtobufDecoder(  
                                    SubscribeReqProto.SubscribeReq.getDefaultInstance()))                         
                            .addLast(new ProtobufVarint32LengthFieldPrepender())                          
                            .addLast(new ProtobufEncoder())                       
                            .addLast(new SubReqServerHandler());                          
                        }  
                          
                    });  
                //绑定端口,同步等待成功  
                ChannelFuture f = b.bind(port).sync();  
                //等待服务端监听端口关闭  
                f.channel().closeFuture().sync();  
                  
            }finally{  
                //退出时释放资源  
                bossGroup.shutdownGracefully();  
                workerGroup.shutdownGracefully();  
            }

    以上是提供的默认实现。关键在于ProtobufVarint32FrameDecoder,ProtobufVarint32LengthFieldPrepender类。

    二,ProtobufVarint32LengthFieldPrepender 编码类

    An encoder that prepends the the Google Protocol Buffers 128 Varints integer length field.

    * BEFORE DECODE (300 bytes)       AFTER DECODE (302 bytes)
    * +---------------+               +--------+---------------+
    * | Protobuf Data |-------------->| Length | Protobuf Data |
    * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
    * +---------------+               +--------+---------------+

    从类的说明来看, proto 消息格式如:Length + Protobuf Data (消息头+消息数据) 方式,这里特别需要注意的是头长使用的是varints方式不是int ,消息头描述消息数据体的长度。为了更减少传输量,消息头采用的是varint 格式。

    什么是varint?

    文章来源http://www.cnblogs.com/tankaixiongVarint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。 Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。其他的 7 个 bit 都用来表示数字。因此小于 128 的数字都可以用一个 byte 表示。大于 128 的数字,会用两个字节。

    更多可参见我上篇文章

    最大的区别是消息头它不是固定长度(常见是的使用INT 4个字节固定长度),Varint它用一个或多个字节来表示一个数字决定它不是固定长度!

    ProtobufVarint32LengthFieldPrepender 类的主要方法如下:

    @Override
        protected void encode(
                ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
            int bodyLen = msg.readableBytes();
            int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
            out.ensureWritable(headerLen + bodyLen);
    
            CodedOutputStream headerOut =
                    CodedOutputStream.newInstance(new ByteBufOutputStream(out), headerLen);
            headerOut.writeRawVarint32(bodyLen);
            headerOut.flush();
    
            out.writeBytes(msg, msg.readerIndex(), bodyLen);
        }

    CodedOutputStream 主要是针对与varints相关操作类。 先看是如何写消息头的,得到bodyLen 消息体长度然后调用computeRawVarint32Size()计算需要多少个字节,

    public static int computeRawVarint32Size(final int value) {
        if ((value & (0xffffffff <<  7)) == 0) return 1;
        if ((value & (0xffffffff << 14)) == 0) return 2;
        if ((value & (0xffffffff << 21)) == 0) return 3;
        if ((value & (0xffffffff << 28)) == 0) return 4;
        return 5;
      }

    0xffffffff << 7 二进制表示11111111111111111111111110000000 ,当与value &计算=0则表示value最大只会是000000000000000000000001111111,一个字节足以。

    通过&运算得出使用多少个字节就可以表示当前数字。左移7位是与Varint定义相关,第一位需要保留给标识(1表示后续的 byte 也是该数字的一部分,0则结束)。要表示 int 32位 和多加的每个字节第一个标识位,多出了4位,所以就最大会有5个字节。

    得到了varints值,然后如何写入out? 再看关键方法writeRawVarint32()。

    public void writeRawVarint32(int value) throws IOException {
        while (true) {
          //0x7F为127
          if ((value & ~0x7F) == 0) {//是否小于127,小于则一个字节就可以表示了
            writeRawByte(value);
            return;
          } else {
            writeRawByte((value & 0x7F) | 0x80);//因不于小127,加一高位标识
            value >>>= 7;//右移7位,再递归
          }
        }
      }
        /** Write a single byte. */
      public void writeRawByte(final byte value) throws IOException {
        if (position == limit) {
          refreshBuffer();
        }
    
        buffer[position++] = value;
      }
      
      private void refreshBuffer() throws IOException {
        if (output == null) {
          // We're writing to a single buffer.
          throw new OutOfSpaceException();
        }
    
        // Since we have an output stream, this is our buffer
        // and buffer offset == 0
        output.write(buffer, 0, position);
        position = 0;
      }

    byte 的取值(-128~127) , 0x7F为127 , 0x80为128

    循环取后7位,如果小于127则结束,不小于第一位加标识位1。 因为循环右移所以,实际位置颠倒了,解码时需要倒过来再拼接。

    消息头因为是varint32可变字节,所以比较复杂些,消息体简单直接writeBytes即可。

    二,ProtobufVarint32FrameDecoder 解码类

    同样对应CodedOutputStream有CodedInputStream类,操作解码时的varints。

    @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            in.markReaderIndex();
            final byte[] buf = new byte[5];
            for (int i = 0; i < buf.length; i ++) {
                if (!in.isReadable()) {
                    in.resetReaderIndex();
                    return;
                }
    
                buf[i] = in.readByte();
                if (buf[i] >= 0) {
                    int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
                    if (length < 0) {
                        throw new CorruptedFrameException("negative length: " + length);
                    }
    
                    if (in.readableBytes() < length) {
                        in.resetReaderIndex();
                        return;
                    } else {
                        out.add(in.readBytes(length));
                        return;
                    }
                }
            }
    
            // Couldn't find the byte whose MSB is off.
            throw new CorruptedFrameException("length wider than 32-bit");
        }

    前面说明了最大长度为5个字节所以这里声明了5个长度的字节来读取消息头。

    buf[i] >= 0 这里为什么是>0然后就可以解码了呢?

    还是这句话:varints第一位表示后续的byte是否是该数字的一部分!

    如果字节第一位为1则表示后续还有字节是表示消息头,当这个字节的第一位为1则这个字节肯定是负数(字节最高位表示正负),大于等于0表示描述消息体长度的数字已经读完了。

    然后调用readRawVarint32() 还原成int ,与之前 writeRawVarint32()反其道而行。

    public int readRawVarint32() throws IOException {
       byte tmp = readRawByte();
       if (tmp >= 0) {
         return tmp;
       }
       int result = tmp & 0x7f;
       if ((tmp = readRawByte()) >= 0) {
         result |= tmp << 7;
       } else {
         result |= (tmp & 0x7f) << 7;
         if ((tmp = readRawByte()) >= 0) {
           result |= tmp << 14;
         } else {
           result |= (tmp & 0x7f) << 14;
           if ((tmp = readRawByte()) >= 0) {
             result |= tmp << 21;
           } else {
             result |= (tmp & 0x7f) << 21;
             result |= (tmp = readRawByte()) << 28;
             if (tmp < 0) {
               // Discard upper 32 bits.
               for (int i = 0; i < 5; i++) {
                 if (readRawByte() >= 0) {
                   return result;
                 }
               }
               throw InvalidProtocolBufferException.malformedVarint();
             }
           }
         }
       }
       return result;
     }

    取第N字节左移7*N位或|第一个字节拼接,实现了倒序拼接,最后得到了消息体长度。然后根据得到的消息体长度读取数据,如果消息体长度不够则回滚到markReaderIndex,等待数据。

    四,总结

    文章来源http://www.cnblogs.com/tankaixiong本文主要详细介绍了netty 对 protobuf 协议的解码与包装。重点在消息头 varint32的 算法表示上进行了说明。了解了varint32在协议中的实现,方便应用在其语言对接。

     

     

  • 相关阅读:
    javascript数据类型判断
    Week04面向对象设计与继承
    201621044079《Java程序设计》第1周学习总结
    201621044079 week05继承、多态、抽象类与接口
    201621044079《Java程序设计》第二周学习总结
    Week03面向对象入门
    201621044079WEEK06接口、内部类
    202020211 20209320 《Linux内核原理与分析》第一周作业
    第二天:PowerShell别名
    第一天:powershell外部命令
  • 原文地址:https://www.cnblogs.com/tankaixiong/p/6366043.html
Copyright © 2011-2022 走看看