zoukankan      html  css  js  c++  java
  • RPC-Thrift(三)

      TProtocol

        TProtocol定义了消息怎么进行序列化和反序列化的。

        TProtocol的类结构图如下:

        

        TBinaryProtocol:二进制编码格式;

        TCompactProtocol:高效率,密集的二进制编码格式,使用了zigzag压缩算法,使用了类似于ProtocolBuffer的Variable-Length Quantity (VLQ) 编码方式;

        TTupleProtocol:继承于TCompactProtocol,C++中不支持,Java中支持;

        TJSONProtocol:JSON格式;

        TProtocolFactory接口:只有getProtocol一个方法,用于通过一个TTransport对象创建一个TProtocol对象;TProtocolFactory接口的实现类都在TProtocol子类中。

        TProtocol

          TProtocol是所有消息协议的父类,定义了公用的抽象方法。

    public abstract class TProtocol {
      @SuppressWarnings("unused")
      private TProtocol() {}
      protected TTransport trans_;//Transport
      protected TProtocol(TTransport trans) {
        trans_ = trans;
      }
      public TTransport getTransport() {
        return trans_;
      }
      /**
       * Writing methods.写方法
       */
      //TMessage表示一个请求,发送请求时调用的方法
      public abstract void writeMessageBegin(TMessage message) throws TException;
      public abstract void writeMessageEnd() throws TException;
      //TStruct表示一个对象,写对象时调用的方法
      public abstract void writeStructBegin(TStruct struct) throws TException;
      public abstract void writeStructEnd() throws TException;
      //TField表示一个字段,写字段时调用的方法
      public abstract void writeFieldBegin(TField field) throws TException;
      public abstract void writeFieldEnd() throws TException;
      //写字段结束标志调用的方法
      public abstract void writeFieldStop() throws TException;
      //写TMap时调用的方法
      public abstract void writeMapBegin(TMap map) throws TException;
      public abstract void writeMapEnd() throws TException;
      //写TList时调用的方法
      public abstract void writeListBegin(TList list) throws TException;
      public abstract void writeListEnd() throws TException;
      //写TSet时调用的方法
      public abstract void writeSetBegin(TSet set) throws TException;
      public abstract void writeSetEnd() throws TException;
      //String和基本数据类型的写
      public abstract void writeBool(boolean b) throws TException;
      public abstract void writeByte(byte b) throws TException;
      public abstract void writeI16(short i16) throws TException;
      public abstract void writeI32(int i32) throws TException;
      public abstract void writeI64(long i64) throws TException;
      public abstract void writeDouble(double dub) throws TException;
      public abstract void writeString(String str) throws TException;
      //将buf中的数据写出
      public abstract void writeBinary(ByteBuffer buf) throws TException;
    
      /**
       * Reading methods.读方法
       */
      public abstract TMessage readMessageBegin() throws TException;
      public abstract void readMessageEnd() throws TException;
      public abstract TStruct readStructBegin() throws TException;
      public abstract void readStructEnd() throws TException;
      public abstract TField readFieldBegin() throws TException;
      public abstract void readFieldEnd() throws TException;
      public abstract TMap readMapBegin() throws TException;
      public abstract void readMapEnd() throws TException;
      public abstract TList readListBegin() throws TException;
      public abstract void readListEnd() throws TException;
      public abstract TSet readSetBegin() throws TException;
      public abstract void readSetEnd() throws TException;
      public abstract boolean readBool() throws TException;
      public abstract byte readByte() throws TException;
      public abstract short readI16() throws TException;
      public abstract int readI32() throws TException;
      public abstract long readI64() throws TException;
      public abstract double readDouble() throws TException;
      public abstract String readString() throws TException;
      public abstract ByteBuffer readBinary() throws TException;
    
      public void reset() {}
      public Class<? extends IScheme> getScheme() {
        return StandardScheme.class;
      }
    }

        TBinaryProtocol

          TBinaryProtocol是二进制编码。

    public class TBinaryProtocol extends TProtocol {
      private static final TStruct ANONYMOUS_STRUCT = new TStruct();
      //版本号
      protected static final int VERSION_MASK = 0xffff0000;//掩码
      protected static final int VERSION_1 = 0x80010000;//版本号
    
      protected boolean strictRead_ = false;//是否严格读
      protected boolean strictWrite_ = true;//是否严格写
    
      protected int readLength_;
      protected boolean checkReadLength_ = false;
    
      //工厂类
      public static class Factory implements TProtocolFactory {
        protected boolean strictRead_ = false;
        protected boolean strictWrite_ = true;
        protected int readLength_;
        public Factory() {
          this(false, true);
        }
        public Factory(boolean strictRead, boolean strictWrite) {
          this(strictRead, strictWrite, 0);
        }
        public Factory(boolean strictRead, boolean strictWrite, int readLength) {
          strictRead_ = strictRead;
          strictWrite_ = strictWrite;
          readLength_ = readLength;
        }
        //通过TTransport实例获取一个TBinaryProtocol实例
        public TProtocol getProtocol(TTransport trans) {
          TBinaryProtocol proto = new TBinaryProtocol(trans, strictRead_, strictWrite_);
          if (readLength_ != 0) {
            proto.setReadLength(readLength_);
          }
          return proto;
        }
      }
      public TBinaryProtocol(TTransport trans) {
        this(trans, false, true);
      }
      public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) {
        super(trans);
        strictRead_ = strictRead;
        strictWrite_ = strictWrite;
      }
      /**
       * 写方法
       */
      //发送一个请求,最终转换为对基本数据类型的写
      public void writeMessageBegin(TMessage message) throws TException {
        if (strictWrite_) {//
          int version = VERSION_1 | message.type;//版本号和消息类型与运算
          writeI32(version);//调用writeI32写版本号
          writeString(message.name);//调用writeString写方法名
          writeI32(message.seqid);//调用writeI32写序列号
        } else {
          writeString(message.name);
          writeByte(message.type);
          writeI32(message.seqid);
        }
      }
      public void writeMessageEnd() {}
      public void writeStructBegin(TStruct struct) {}
      public void writeStructEnd() {}
      //写字段方法,最终转换为对基本数据类型的写
      public void writeFieldBegin(TField field) throws TException {
        writeByte(field.type);
        writeI16(field.id);
      }
      public void writeFieldEnd() {}
      public void writeFieldStop() throws TException {
        writeByte(TType.STOP);
      }
      //写Map
      public void writeMapBegin(TMap map) throws TException {
        writeByte(map.keyType);//写Key类型
        writeByte(map.valueType);//写value类型
        writeI32(map.size);//写map大小
      }
      public void writeMapEnd() {}
      //写List
      public void writeListBegin(TList list) throws TException {
        writeByte(list.elemType);//写元素类型
        writeI32(list.size);//写list大小
      }
      public void writeListEnd() {}
      //写Set
      public void writeSetBegin(TSet set) throws TException {
        writeByte(set.elemType);//写元素类型
        writeI32(set.size);//写Set大小
      }
      public void writeSetEnd() {}
      //写bool转换为写writeByte
      public void writeBool(boolean b) throws TException {
        writeByte(b ? (byte)1 : (byte)0);
      }
      private byte [] bout = new byte[1];
      public void writeByte(byte b) throws TException {
        bout[0] = b;
        trans_.write(bout, 0, 1);
      }
      private byte[] i16out = new byte[2];
      public void writeI16(short i16) throws TException {
        i16out[0] = (byte)(0xff & (i16 >> 8));
        i16out[1] = (byte)(0xff & (i16));
        trans_.write(i16out, 0, 2);
      }
      private byte[] i32out = new byte[4];
      public void writeI32(int i32) throws TException {
        i32out[0] = (byte)(0xff & (i32 >> 24));
        i32out[1] = (byte)(0xff & (i32 >> 16));
        i32out[2] = (byte)(0xff & (i32 >> 8));
        i32out[3] = (byte)(0xff & (i32));
        trans_.write(i32out, 0, 4);
      }
      private byte[] i64out = new byte[8];
      public void writeI64(long i64) throws TException {
        i64out[0] = (byte)(0xff & (i64 >> 56));
        i64out[1] = (byte)(0xff & (i64 >> 48));
        i64out[2] = (byte)(0xff & (i64 >> 40));
        i64out[3] = (byte)(0xff & (i64 >> 32));
        i64out[4] = (byte)(0xff & (i64 >> 24));
        i64out[5] = (byte)(0xff & (i64 >> 16));
        i64out[6] = (byte)(0xff & (i64 >> 8));
        i64out[7] = (byte)(0xff & (i64));
        trans_.write(i64out, 0, 8);
      }
      //写Double转换为writeI64
      public void writeDouble(double dub) throws TException {
        writeI64(Double.doubleToLongBits(dub));
      }
      //写String
      public void writeString(String str) throws TException {
        try {
          byte[] dat = str.getBytes("UTF-8");//转换为字节数组
          writeI32(dat.length);//写数组长度
          trans_.write(dat, 0, dat.length);//写数据
        } catch (UnsupportedEncodingException uex) {
          throw new TException("JVM DOES NOT SUPPORT UTF-8");
        }
      }
      //写ByteBuffer
      public void writeBinary(ByteBuffer bin) throws TException {
        int length = bin.limit() - bin.position();
        writeI32(length);
        trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
      }
    
      /**
       * Reading methods.读方法
       */
      //读一个请求,与写请求对应
      public TMessage readMessageBegin() throws TException {
        int size = readI32();//读取消息的头部(4字节),可能是版本号和消息类型的组合,也可能直接是消息方法名
        if (size < 0) {
          //如果小于0,就是二进制为第一位以1开头,说明是带有版本号的
          //校验版本号是否正确
          int version = size & VERSION_MASK;
          if (version != VERSION_1) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
          }
          //三个参数依次为方法名、消息类型、消息序列号
          return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
        } else {
          if (strictRead_) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
          }
          //readStringBody(size)为方法名,readByte()为消息类型,readI32()为消息序列号
          return new TMessage(readStringBody(size), readByte(), readI32());
        }
      }
      public void readMessageEnd() {}
      public TStruct readStructBegin() {
        return ANONYMOUS_STRUCT;
      }
      public void readStructEnd() {}
      public TField readFieldBegin() throws TException {
        byte type = readByte();
        short id = type == TType.STOP ? 0 : readI16();
        return new TField("", type, id);
      }
      public void readFieldEnd() {}
      public TMap readMapBegin() throws TException {
        return new TMap(readByte(), readByte(), readI32());
      }
      public void readMapEnd() {}
      public TList readListBegin() throws TException {
        return new TList(readByte(), readI32());
      }
      public void readListEnd() {}
      public TSet readSetBegin() throws TException {
        return new TSet(readByte(), readI32());
      }
      public void readSetEnd() {}
      public boolean readBool() throws TException {
        return (readByte() == 1);
      }
      private byte[] bin = new byte[1];
      public byte readByte() throws TException {
        if (trans_.getBytesRemainingInBuffer() >= 1) {
          byte b = trans_.getBuffer()[trans_.getBufferPosition()];
          trans_.consumeBuffer(1);
          return b;
        }
        readAll(bin, 0, 1);
        return bin[0];
      }
      private byte[] i16rd = new byte[2];
      public short readI16() throws TException {
        byte[] buf = i16rd;
        int off = 0;
        if (trans_.getBytesRemainingInBuffer() >= 2) {
          buf = trans_.getBuffer();
          off = trans_.getBufferPosition();
          trans_.consumeBuffer(2);
        } else {
          readAll(i16rd, 0, 2);
        }
        return
          (short)
          (((buf[off] & 0xff) << 8) |
           ((buf[off+1] & 0xff)));
      }
      private byte[] i32rd = new byte[4];
      public int readI32() throws TException {
        byte[] buf = i32rd;
        int off = 0;
        if (trans_.getBytesRemainingInBuffer() >= 4) {
          buf = trans_.getBuffer();
          off = trans_.getBufferPosition();
          trans_.consumeBuffer(4);
        } else {
          readAll(i32rd, 0, 4);
        }
        return
          ((buf[off] & 0xff) << 24) |
          ((buf[off+1] & 0xff) << 16) |
          ((buf[off+2] & 0xff) <<  8) |
          ((buf[off+3] & 0xff));
      }
      private byte[] i64rd = new byte[8];
      public long readI64() throws TException {
        byte[] buf = i64rd;
        int off = 0;
        if (trans_.getBytesRemainingInBuffer() >= 8) {
          buf = trans_.getBuffer();
          off = trans_.getBufferPosition();
          trans_.consumeBuffer(8);
        } else {
          readAll(i64rd, 0, 8);
        }
        return
          ((long)(buf[off]   & 0xff) << 56) |
          ((long)(buf[off+1] & 0xff) << 48) |
          ((long)(buf[off+2] & 0xff) << 40) |
          ((long)(buf[off+3] & 0xff) << 32) |
          ((long)(buf[off+4] & 0xff) << 24) |
          ((long)(buf[off+5] & 0xff) << 16) |
          ((long)(buf[off+6] & 0xff) <<  8) |
          ((long)(buf[off+7] & 0xff));
      }
      public double readDouble() throws TException {
        return Double.longBitsToDouble(readI64());
      }
      public String readString() throws TException {
        int size = readI32();
        if (trans_.getBytesRemainingInBuffer() >= size) {
          try {
            String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
            trans_.consumeBuffer(size);
            return s;
          } catch (UnsupportedEncodingException e) {
            throw new TException("JVM DOES NOT SUPPORT UTF-8");
          }
        }
        return readStringBody(size);
      }
      public String readStringBody(int size) throws TException {
        try {
          checkReadLength(size);
          byte[] buf = new byte[size];
          trans_.readAll(buf, 0, size);
          return new String(buf, "UTF-8");
        } catch (UnsupportedEncodingException uex) {
          throw new TException("JVM DOES NOT SUPPORT UTF-8");
        }
      }
      public ByteBuffer readBinary() throws TException {
        int size = readI32();
        checkReadLength(size);
        if (trans_.getBytesRemainingInBuffer() >= size) {
          ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size);
          trans_.consumeBuffer(size);
          return bb;
        }
        byte[] buf = new byte[size];
        trans_.readAll(buf, 0, size);
        return ByteBuffer.wrap(buf);
      }
      private int readAll(byte[] buf, int off, int len) throws TException {
        checkReadLength(len);
        return trans_.readAll(buf, off, len);
      }
      public void setReadLength(int readLength) {
        readLength_ = readLength;
        checkReadLength_ = true;
      }
      protected void checkReadLength(int length) throws TException {
        if (length < 0) {
          throw new TException("Negative length: " + length);
        }
        if (checkReadLength_) {
          readLength_ -= length;
          if (readLength_ < 0) {
            throw new TException("Message length exceeded: " + length);
          }
        }
      }
    }

          其中TMessage表示一个请求,看一下TMessage的结构。

    public final class TMessage {
      public TMessage() {
        this("", TType.STOP, 0);
      }
      public TMessage(String n, byte t, int s) {
        name = n;
        type = t;
        seqid = s;
      }
      public final String name;//方法名
      public final byte type;//消息类型
      public final int seqid;//消息序列号
      @Override
      public String toString() {
        return "<TMessage name:'" + name + "' type: " + type + " seqid:" + seqid + ">";
      }
      @Override
      public boolean equals(Object other) {
        if (other instanceof TMessage) {
          return equals((TMessage) other);
        }
        return false;
      }
      public boolean equals(TMessage other) {
        return name.equals(other.name) && type == other.type && seqid == other.seqid;
      }
    }

          消息类型定义如下:

    public final class TMessageType {
      public static final byte CALL  = 1;//客户端请求
      public static final byte REPLY = 2;//服务端响应
      public static final byte EXCEPTION = 3;//服务端返回异常
      public static final byte ONEWAY = 4;//单向RPC,客户端请求不要求服务端响应
    }

           TField表示一个字段,TField结构如下:

    public class TField {
      public TField() {
        this("", TType.STOP, (short)0);
      }
      public TField(String n, byte t, short i) {
        name = n;
        type = t;
        id = i;
      }
      public final String name;//字段名
      public final byte   type;//字段类型
      public final short  id;//该字段在对象中的序号,与Thrift文件中的序号一致
      public String toString() {
        return "<TField name:'" + name + "' type:" + type + " field-id:" + id + ">";
      }
      public boolean equals(TField otherField) {
        return type == otherField.type && id == otherField.id;
      }
    }

          Thrift定义的数据类型

    public final class TType {
      public static final byte STOP   = 0;
      public static final byte VOID   = 1;
      public static final byte BOOL   = 2;
      public static final byte BYTE   = 3;
      public static final byte DOUBLE = 4;
      public static final byte I16    = 6;
      public static final byte I32    = 8;
      public static final byte I64    = 10;
      public static final byte STRING = 11;
      public static final byte STRUCT = 12;
      public static final byte MAP    = 13;
      public static final byte SET    = 14;
      public static final byte LIST   = 15;
      public static final byte ENUM   = 16;
    }

      TProcessor

        TProcessor是服务端Thrift框架转入用户逻辑的关键。先看一下类结构图。TProcessor对TServer中一次请求的InputProtocol和OutputTProtocol进行操作,也就是从InputProtocol中读出Client的请求数据,向OutputProtcol中写入用户逻辑的返回值。

        

        TProcessor接口:只有一个抽象方法process();

        TBaseProcessor类:实现了TProcessor接口,给出了process()方法的具体实现;

        Processor类:通过thrift文件自动生成的代码,继承了TBaseProcessor类,在Processor类内部为每一个方法生成了一个类,如sayHello类;

        TProcessorFactory类:一个工厂类,返回一个TProcessor单例;

        ProcessFunction类:处理函数抽象类。

        HelloService类:由thrift文件自动生成的类,包含内部类Processor类、参数类、返回结果类等等。

        

        TProcessor

    public interface TProcessor {
      public boolean process(TProtocol in, TProtocol out)
        throws TException;
    }

        TBaseProcessor

    public abstract class TBaseProcessor<I> implements TProcessor {
      private final I iface;//业务逻辑实现的接口,接口中的方法即thrift文件中定义的方法
      private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;//处理方法Map,key为方法名,Value为方法对象
      protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
        this.iface = iface;
        this.processMap = processFunctionMap;
      }
      @Override
      public boolean process(TProtocol in, TProtocol out) throws TException {
        TMessage msg = in.readMessageBegin();//读出客户端发的请求
        ProcessFunction fn = processMap.get(msg.name);//根据方法名从processMap中找到处理方法
        if (fn == null) {
          //如果找不到该方法,返回异常
          TProtocolUtil.skip(in, TType.STRUCT);
          in.readMessageEnd();
          TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
          out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
          x.write(out);
          out.writeMessageEnd();
          out.getTransport().flush();
          return true;
        }
        fn.process(msg.seqid, in, out, iface);//处理请求
        return true;
      }
    }

        ProcessFunction

           Thrift将方法抽象为ProcessFunction类,每一个方法都会生成一个ProcessFunction类的子类。

    public abstract class ProcessFunction<I, T extends TBase> {
      private final String methodName;//方法名
      public ProcessFunction(String methodName) {
        this.methodName = methodName;
      }
      //处理请求
      public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
        T args = getEmptyArgsInstance();//获取一个参数实例(TBase子类的参数实例),由子类实现
        try {
          args.read(iprot);//从iprot中读取参数,具体实现由thrift文件自动生成(见sayHello_args类)
        } catch (TProtocolException e) {
          //读取参数异常,返回异常
          iprot.readMessageEnd();
          TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
          oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        iprot.readMessageEnd();
        TBase result = getResult(iface, args);//获取处理结果,此时调用业务逻辑,具体实现由thrift文件自动生成(见sayHello类)
        oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
        result.write(oprot);//将result写到oprot,具体实现由thrift文件自动生成(见sayHello_result类)
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
      }
      //获取处理结果,调用业务逻辑,由子类实现
      protected abstract TBase getResult(I iface, T args) throws TException;
      //获取一个参数实例,由子类实现
      protected abstract T getEmptyArgsInstance();
      public String getMethodName() {
        return methodName;
      }
    }

         Processor

      public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
        private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
        public Processor(I iface) {
          super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
        }
        protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
          super(iface, getProcessMap(processMap));
        }
        //初始化processMap,在Processor初始化时会调用该方法
        private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
          processMap.put("sayHello", new sayHello());
          return processMap;
        }
        //每个方法生成一个类
        private static class sayHello<I extends Iface> extends org.apache.thrift.ProcessFunction<I, sayHello_args> {
          public sayHello() {
            super("sayHello");
          }
          //获取空参数实例
          protected sayHello_args getEmptyArgsInstance() {
            return new sayHello_args();
          }
          //获取返回结果
          protected sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {
            sayHello_result result = new sayHello_result();
            result.success = iface.sayHello(args.paramJson);//调用业务逻辑接口的sayHello方法,
            return result;
          }
        }
      }

       

      最后总结一下TProcessor的处理流程:

        1)TServer接收到请求后,调用TProcessor的process(TProtocol in, TProtocol out)方法进行处理;

        2)TProcessor通过in.readMessageBegin()获取客户端请求,并根据请求方法名找到对应的ProcessFunction实例;

        3)调用ProcessFunction的process方法;

          首先从inTProtocol中读取参数,

          然后通过调用getResult(iface, args)方法调用业务逻辑,获取到返回结果,

          最后将返回结果写入到outTProtocol。

        

        

    参考资料

      Apache Thrift设计概要

      Thrift源码分析(二)-- 协议和编解码

      Thrift RPC详解

  • 相关阅读:
    SQL Server XML数据解析
    c# XML和实体类之间相互转换(序列化和反序列化)
    C#解析XML详解(XPath以及带命名空间NameSpace)
    Jquery实现按钮点击遮罩加载,处理完后恢复
    jquery控制div随滚动条滚动效果
    asp.net中利用Jquery+Ajax+Json实现无刷新分页(二)
    easyUI tree点击文字展开节点
    201805牛客模拟考
    策略模式
    非线程安全演变成线程安全---原子性与加锁机制
  • 原文地址:https://www.cnblogs.com/zaizhoumo/p/8252870.html
Copyright © 2011-2022 走看看