zoukankan      html  css  js  c++  java
  • Apache Thrift系列详解(三)

    前言

    Thrift支持二进制压缩格式,以及json格式数据的序列化反序列化。开发人员可以更加灵活的选择协议的具体形式。协议是可自由扩展的,新版本的协议,完全兼容老的版本!

    正文

    数据交换格式简介

    当前流行的数据交换格式可以分为如下几类:

    (一) 自解析型

    序列化的数据包含完整的结构, 包含了field名称value。比如xml/json/java serizable,大百度的mcpack/compack,都属于此类。即调整不同属性的顺序序列化/反序列化不造成影响。

    (二) 半解析型

    序列化的数据,丢弃了部分信息, 比如field名称, 但引入了index(常常是id+type的方式)来对应具体属性。这方面的代表有google protobuf/thrift也属于此类。

    (三) 无解析型

    传说中大百度的infpack实现,就是借助该种方式来实现,丢弃了很多有效信息性能/压缩比最好,不过向后兼容需要开发做一定的工作, 详情不知。

    交换格式类型优点缺点
    Xml 文本 易读 臃肿,不支持二进制数据类型
    JSON 文本 易读 丢弃了类型信息,比如"score":100,对score类型是int/double解析有二义性, 不支持二进制数据类型
    Java serizable 二进制 使用简单 臃肿,只限制在JAVA领域
    Thrift 二进制 高效 不易读,向后兼容有一定的约定限制
    Google Protobuf 二进制 高效 不易读,向后兼容有一定的约定限制

    Thrift的数据类型

    1. 基本类型:   bool: 布尔值   byte: 8位有符号整数   i16: 16位有符号整数   i32: 32位有符号整数   i64: 64位有符号整数   double: 64位浮点数   string: UTF-8编码的字符串   binary: 二进制串
    2. 结构体类型:   struct: 定义的结构体对象
    3. 容器类型:   list: 有序元素列表   set: 无序无重复元素集合   map: 有序的key/value集合
    4. 异常类型:   exception: 异常类型
    5. 服务类型:   service: 具体对应服务的类

    Thrift的序列化协议

    Thrift可以让用户选择客户端服务端之间传输通信协议的类别,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。为节约带宽提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目/产品中的实际需求。常用协议有以下几种:

    • TBinaryProtocol:二进制编码格式进行数据传输
    • TCompactProtocol:高效率的、密集二进制编码格式进行数据传输
    • TJSONProtocol: 使用JSON文本的数据编码协议进行数据传输
    • TSimpleJSONProtocol:只提供JSON只写的协议,适用于通过脚本语言解析

    Thrift的序列化测试

    (a). 首先编写一个简单的thrift文件pair.thrift

    struct Pair {
        1: required string key
        2: required string value
    }
    复制代码

    这里标识了required的字段,要求在使用时必须正确赋值,否则运行时会抛出TProtocolException异常。缺省和指定为optional时,则运行时不做字段非空校验。

    (b). 编译并生成java源代码:

    thrift -gen java pair.thrift
    复制代码

    (c). 编写序列化和反序列化的测试代码:

    • 序列化测试,将Pair对象写入文件中
    private static void writeData() throws IOException, TException {
        Pair pair = new Pair();
        pair.setKey("key1").setValue("value1");
        FileOutputStream fos = new FileOutputStream(new File("pair.txt"));
        pair.write(new TBinaryProtocol(new TIOStreamTransport(fos)));
        fos.close();
    }
    复制代码
    • 反序列化测试,从文件中解析生成Pair对象
    private static void readData() throws TException, IOException {
        Pair pair = new Pair();
        FileInputStream fis = new FileInputStream(new File("pair.txt"));
        pair.read(new TBinaryProtocol(new TIOStreamTransport(fis)));
        System.out.println("key => " + pair.getKey());
        System.out.println("value => " + pair.getValue());
        fis.close();
    }
    复制代码

    (d) 观察运行结果,正常输出表明序列化反序列化过程正常完成。

    Thrift协议源码

    (一) writeData()分析

    首先查看thrift序列化机制,即数据写入实现,这里采用二进制协议TBinaryProtocol,切入点为pair.write(TProtocol)

    查看scheme()方法,决定采用元组计划(TupleScheme)还是标准计划(StandardScheme)来实现序列化,默认采用的是标准计划StandardScheme

    标准计划(StandardScheme)下的write()方法:

    这里完成了几步操作:

    (a). 根据Thrift IDL文件中定义了required的字段验证字段是否正确赋值。

    public void validate() throws org.apache.thrift.TException {
      // check for required fields
      if (key == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'key' was not present! Struct: " + toString());
      }
      if (value == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'value' was not present! Struct: " + toString());
      }
    }
    复制代码

    (b). 通过writeStructBegin()记录写入结构开始标记

    public void writeStructBegin(TStruct struct) {}
    复制代码

    (c). 逐一写入Pair对象的各个字段,包括字段字段开始标记字段的值字段结束标记

    if (struct.key != null) {
      oprot.writeFieldBegin(KEY_FIELD_DESC);
      oprot.writeString(struct.key);
      oprot.writeFieldEnd();
    }
    // 省略...
    复制代码

    (1). 首先是字段开始标记,包括typefield-idtype是字段的数据类型的标识号field-idThrift IDL定义的字段次序,比如说key为1,value为2。

    public void writeFieldBegin(TField field) throws TException {
      writeByte(field.type);
      writeI16(field.id);
    }
    复制代码

    Thrift提供了TType,对不同的数据类型(type)提供了唯一标识的typeID

    public final class TType {
        public static final byte STOP   = 0;   // 数据读写完成
        public static final byte VOID   = 1;   // 空值
        public static final byte BOOL   = 2;   // 布尔值
        public static final byte BYTE   = 3;   // 字节
        public static final byte DOUBLE = 4;   // 双精度浮点型
        public static final byte I16    = 6;   // 短整型
        public static final byte I32    = 8;   // 整型
        public static final byte I64    = 10;  // 长整型
        public static final byte STRING = 11;  // 字符串类型
        public static final byte STRUCT = 12;  // 引用类型
        public static final byte MAP    = 13;  // Map
        public static final byte SET    = 14;  // 集合
        public static final byte LIST   = 15;  // 列表
        public static final byte ENUM   = 16;  // 枚举
    }
    复制代码

    (2). 然后是写入字段的值,根据字段的数据类型又归纳为以下实现:writeByte()writeBool()writeI32()writeI64()writeDouble()writeString()writeBinary()方法。

    TBinaryProtocol通过一个长度为8byte字节数组缓存写入读取的临时字节数据。

    private final byte[] inoutTemp = new byte[8];
    复制代码

    **常识1:**16进制的介绍。以0x开始的数据表示16进制,0xff换成十进制为255。在16进制中,A、B、C、D、E、F这五个字母来分别表示10、11、12、13、14、15。

    16进制十进制:f表示15。第n位的权值为16的n次方,由右到左从0位起:0xff = 1516^1 + 1516^0 = 255 16进制二进制再变十进制:0xff = 1111 1111 = 2^8 - 1 = 255

    **常识2:**位运算符的使用。>>表示代表右移符号,如:int i=15; i>>2的结果是3,移出的部分将被抛弃。而<<表示左移符号,与>>刚好相反。

    转为二进制的形式可能更好理解,0000 1111(15)右移2位的结果是0000 0011(3),0001 1010(18)右移3位的结果是0000 0011(3)。

    • writeByte():写入单个字节数据。
    public void writeByte(byte b) throws TException {
      inoutTemp[0] = b;
      trans_.write(inoutTemp, 0, 1);
    }
    复制代码
    • writeBool():写入布尔值数据。
    public void writeBool(boolean b) throws TException {
      writeByte(b ? (byte)1 : (byte)0);
    }
    复制代码
    • writeI16():写入短整型short类型数据。
    public void writeI16(short i16) throws TException {
      inoutTemp[0] = (byte)(0xff & (i16 >> 8));
      inoutTemp[1] = (byte)(0xff & (i16));
      trans_.write(inoutTemp, 0, 2);
    }
    复制代码
    • writeI32():写入整型int类型数据。
    public void writeI32(int i32) throws TException {
      inoutTemp[0] = (byte)(0xff & (i32 >> 24));
      inoutTemp[1] = (byte)(0xff & (i32 >> 16));
      inoutTemp[2] = (byte)(0xff & (i32 >> 8));
      inoutTemp[3] = (byte)(0xff & (i32));
      trans_.write(inoutTemp, 0, 4);
    }
    复制代码
    • writeI64():写入长整型long类型数据。
    public void writeI64(long i64) throws TException {
      inoutTemp[0] = (byte)(0xff & (i64 >> 56));
      inoutTemp[1] = (byte)(0xff & (i64 >> 48));
      inoutTemp[2] = (byte)(0xff & (i64 >> 40));
      inoutTemp[3] = (byte)(0xff & (i64 >> 32));
      inoutTemp[4] = (byte)(0xff & (i64 >> 24));
      inoutTemp[5] = (byte)(0xff & (i64 >> 16));
      inoutTemp[6] = (byte)(0xff & (i64 >> 8));
      inoutTemp[7] = (byte)(0xff & (i64));
      trans_.write(inoutTemp, 0, 8);
    }
    复制代码
    • writeDouble():写入双浮点型double类型数据。
    public void writeDouble(double dub) throws TException {
      writeI64(Double.doubleToLongBits(dub));
    }
    复制代码
    • writeString():写入字符串类型,这里先写入字符串长度,再写入字符串内容
    public void writeString(String str) throws TException {
      try {
        byte[] dat = str.getBytes("UTF-8");
        writeI32(dat.length);
        trans_.write(dat, 0, dat.length);
      } catch (UnsupportedEncodingException uex) {
        throw new TException("JVM DOES NOT SUPPORT UTF-8");
      }
    }
    复制代码
    • writeBinary:写入二进制数组类型数据,这里数据输入是NIO中的ByteBuffer类型。
    public void writeBinary(ByteBuffer bin) throws TException {
      int length = bin.limit() - bin.position();
      writeI32(length);
      trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
    }
    复制代码

    (3). 每个字段写入完成后,都需要记录字段结束标记

    public void writeFieldEnd() {}
    复制代码

    (d). 当所有的字段都写入以后,需要记录字段停止标记

    public void writeFieldStop() throws TException {
      writeByte(TType.STOP);
    }
    复制代码

    (e). 当所有数据写入完成后,通过writeStructEnd()记录写入结构完成标记

    public void writeStructEnd() {}
    复制代码

    (二) readData()分析

    查看thrift反序列化机制,即数据读取实现,同样采用二进制协议TBinaryProtocol,切入点为pair.read(TProtocol)

    数据读取数据写入一样,也是采用的标准计划StandardScheme标准计划(StandardScheme)下的read()方法:

    这里完成的几步操作:

    (a). 通过readStructBegin读取结构开始标记

    iprot.readStructBegin();
    复制代码

    (b). 循环读取结构中的所有字段数据Pair对象中,直到读取到org.apache.thrift.protocol.TType.STOP为止。iprot.readFieldBegin()指明开始读取下一个字段的前需要读取字段开始标记

    while (true) {
      schemeField = iprot.readFieldBegin();
      if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
        break;
      }
      // 字段的读取,省略...
    }
    复制代码

    (c). 根据Thrift IDL定义的field-id读取对应的字段,并赋值到Pair对象中,并设置Pair对象相应的字段为已读状态(前提:字段在IDL中被定义为required)。

    switch (schemeField.id) {
      case 1: // KEY
        if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
          struct.key = iprot.readString();
          struct.setKeyIsSet(true);
        } else {
          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
        }
        break;
      case 2: // VALUE
        if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
          struct.value = iprot.readString();
          struct.setValueIsSet(true);
        } else {
          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
        }
        break;
      default:
        org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
    }
    复制代码

    关于读取字段的值,根据字段的数据类型也分为以下实现:readByte()readBool()readI32()readI64()readDouble()readString()readBinary()方法。

    • readByte():读取单个字节数据。
    public byte readByte() throws TException {
      if (trans_.getBytesRemainingInBuffer() >= 1) {
        byte b = trans_.getBuffer()[trans_.getBufferPosition()];
        trans_.consumeBuffer(1);
        return b;
      }
      readAll(inoutTemp, 0, 1);
      return inoutTemp[0];
    }
    复制代码
    • readBool():读取布尔值数据。
    public boolean readBool() throws TException {
      return (readByte() == 1);
    }
    复制代码
    • readI16():读取短整型short类型数据。
    public short readI16() throws TException {
      byte[] buf = inoutTemp;
      int off = 0;
    
      if (trans_.getBytesRemainingInBuffer() >= 2) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(2);
      } else {
        readAll(inoutTemp, 0, 2);
      }
    
      return (short) (((buf[off] & 0xff) << 8) |
                     ((buf[off+1] & 0xff)));
    }
    复制代码
    • readI32():读取整型int类型数据。
    public int readI32() throws TException {
      byte[] buf = inoutTemp;
      int off = 0;
    
      if (trans_.getBytesRemainingInBuffer() >= 4) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(4);
      } else {
        readAll(inoutTemp, 0, 4);
      }
      return ((buf[off] & 0xff) << 24) |
             ((buf[off+1] & 0xff) << 16) |
             ((buf[off+2] & 0xff) <<  8) |
             ((buf[off+3] & 0xff));
    }
    复制代码
    • readI64():读取长整型long类型数据。
    public long readI64() throws TException {
      byte[] buf = inoutTemp;
      int off = 0;
    
      if (trans_.getBytesRemainingInBuffer() >= 8) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(8);
      } else {
        readAll(inoutTemp, 0, 8);
      }
    
      return ((long)(buf[off]   & 0xff) << 56) |
             ((long)(buf[off+1] & 0xff) << 48) |
             ((long)(buf[off+2] & 0xff) << 40) |
             ((long)(buf[off+3] & 0xff) << 32) |
             ((long)(buf[off+4] & 0xff) << 24) |
             ((long)(buf[off+5] & 0xff) << 16) |
             ((long)(buf[off+6] & 0xff) <<  8) |
             ((long)(buf[off+7] & 0xff));
    }
    复制代码
    • readDouble():读取双精度浮点double类型数据。
    public double readDouble() throws TException {
      return Double.longBitsToDouble(readI64());
    }
    复制代码
    • readString():读取字符串类型的数据,首先读取并校验4字节的字符串长度,然后检查NIO缓冲区中是否有对应长度的字节未消费。如果有,直接从缓冲区中读取;否则,从传输通道中读取数据。
    public String readString() throws TException {
      int size = readI32();
      checkStringReadLength(size);
    
      if (trans_.getBytesRemainingInBuffer() >= size) {
        try {
          String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
          trans_.consumeBuffer(size);
          return s;
        } catch (UnsupportedEncodingException e) {
          throw new TException("JVM DOES NOT SUPPORT UTF-8");
        }
      }
    
      return readStringBody(size);
    }
    复制代码

    如果是从传输通道中读取数据,查看readStringBody()方法:

    public String readStringBody(int size) throws TException {
      try {
        byte[] buf = new byte[size];
        trans_.readAll(buf, 0, size);
        return new String(buf, "UTF-8");
      } catch (UnsupportedEncodingException uex) {
        throw new TException("JVM DOES NOT SUPPORT UTF-8");
      }
    }
    复制代码
    • readBinary():读取二进制数组类型数据,和字符串读取类似,返回一个ByteBuffer字节缓存对象。
    public ByteBuffer readBinary() throws TException {
      int size = readI32();
      checkStringReadLength(size);
    
      if (trans_.getBytesRemainingInBuffer() >= size) {
        ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size);
        trans_.consumeBuffer(size);
        return bb;
      }
    
      byte[] buf = new byte[size];
      trans_.readAll(buf, 0, size);
      return ByteBuffer.wrap(buf);
    }
    复制代码

    (d). 每个字段数据读取完成后,都需要再读取一个字段结束标记

    public void readFieldEnd() {}
    复制代码

    (e). 当所有字段读取完成后,需要通过readStructEnd()再读入一个结构完成标记

    public void readStructEnd() {}
    复制代码

    (f). 读取结束后,同样需要校验在Thrift IDL中定义为required的字段是否为空,是否合法。

    public void validate() throws org.apache.thrift.TException {
      // check for required fields
      if (key == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'key' was not present! Struct: " + toString());
      }
      if (value == null) {
        throw new org.apache.thrift.protocol.TProtocolException("Required field 'value' was not present! Struct: " + toString());
      }
    }
    复制代码

    总结

    其实到这里,对于Thrift序列化机制反序列化机制具体实现高效性,相信各位已经有了比较深入的认识!




  • 相关阅读:
    mybaits 时间查询DATE_FORMAT
    spring AOP
    Sqlserver 分页
    @PathVariable注解使用
    easyui 无限级数tree[menulist1 = GetMenuList(sm2,menulist1);]
    查询重复数据group by menu_id having count(menu_id)>1
    SQL把a表字段数据存到b表字段 update,,insert
    毕向东讲解(摘)—12.线程通信,解决安全问题
    URL的加密解密方法
    web项目中的浏览器行为和服务器行为
  • 原文地址:https://www.cnblogs.com/sea520/p/12165790.html
Copyright © 2011-2022 走看看