zoukankan      html  css  js  c++  java
  • Thrift笔记(四)--Thrift client源码分析

    thrift文件

    namespace java com.gxf.demo
    namespace py tutorial
    
    typedef i32 int // We can use typedef to get pretty names for the types we are using
    service MultiplicationService
    {
            int multiply(1:int n1, 2:int n2),
            int add(1:int n1, 2:int n2),
    }
    
    service SubService{
            int sub(1:int n1, 2:int n2),
    }
    
    struct User{
        1: i32 id = 0,
        2: required string name,
    }
    MultiplicationHandler.java
    import org.apache.thrift.TException;
    
    public class MultiplicationHandler implements MultiplicationService.Iface, SubService.Iface {
    
        @Override
        public int multiply(int n1, int n2) throws TException {
            System.out.println("Multiply(" + n1 + "," + n2 + ")");
            return n1 * n2;
        }
    
        @Override
        public int add(int n1, int n2) throws TException {
            System.out.println("add(" + n1 + "," + n2 + ")");
            return n1 + n2;
        }
    
    
        @Override
        public int sub(int n1, int n2) throws TException {
            System.out.println("sub(" + n1 + "," + n2 + ")");
            return n1 - n2;
        }
    }
    MultiplicationServer.java
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TServer.Args;
    import org.apache.thrift.server.TSimpleServer;
    import org.apache.thrift.transport.TServerSocket;
    import org.apache.thrift.transport.TServerTransport;
    
    public class MultiplicationServer {
    
        public static MultiplicationHandler handler;
    
        public static MultiplicationService.Processor processor;
    
        public static void main(String [] args) {
            try {
                handler = new MultiplicationHandler();
                processor = new MultiplicationService.Processor(handler);
                Runnable simple = () -> simple(processor);
    
                new Thread(simple).start();
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    
        public static void simple(MultiplicationService.Processor processor) {
            try {
                TServerTransport serverTransport = new TServerSocket(9090);
                TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));
    
                System.out.println("Starting the simple server...");
                server.serve();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    MultiplicationClient.java
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    public class MultiplicationClient {
    
        public static void main(String [] args) {
            try {
                TTransport transport;
                transport = new TSocket("localhost", 9090);
                transport.open();
                TProtocol protocol = new TBinaryProtocol(transport);
                MultiplicationService.Client client = new MultiplicationService.Client(protocol);
                SubService.Client subClient = new SubService.Client(protocol);
                perform(client, subClient);
                transport.close();
            } catch (TException x) {
                x.printStackTrace();
            }
        }
    
        private static void perform(MultiplicationService.Client client, SubService.Client subClient) throws TException
        {
            int product = client.multiply(3,5);
            System.out.println("3 * 5 = " + product);
            int sum = client.add(4, 6);
            System.out.println("4 + 6 = " + sum);
        }
    }

    跟进client代码

    thrift主要把方法名,参数发送给服务方

    跟进multiply(int n1, int n2)方法

    public int multiply(int n1, int n2) throws org.apache.thrift.TException
        {
          send_multiply(n1, n2);
          return recv_multiply();
        }
    send_multiply(n1, n2);跟进
    public void send_multiply(int n1, int n2) throws org.apache.thrift.TException
        {
          multiply_args args = new multiply_args();
          args.setN1(n1);
          args.setN2(n2);
          sendBase("multiply", args);
        }

    跟进sendBase

    protected void sendBase(String methodName, TBase<?,?> args) throws TException {
        sendBase(methodName, args, TMessageType.CALL);
      }
    protected void sendBaseOneway(String methodName, TBase<?,?> args) throws TException {
        sendBase(methodName, args, TMessageType.ONEWAY);
      }
    private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
        oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
        args.write(oprot_);
        oprot_.writeMessageEnd();
        oprot_.getTransport().flush();
      }

    methodName被封装到Tmessage中,跟进 writeMessageBegin

    public void writeMessageBegin(TMessage message) throws TException {
        if (strictWrite_) {
          int version = VERSION_1 | message.type;
          writeI32(version);
          writeString(message.name);
          writeI32(message.seqid);
        } else {
          writeString(message.name);
          writeByte(message.type);
          writeI32(message.seqid);
        }
      }
    public void writeI32(int i32) throws TException {
        inoutTemp[0] = (byte)(0xff & (i32 >> 24));
        inoutTemp[1] = (byte)(0xff & (i32 >> 16));
        inoutTemp[2] = (byte)(0xff & (i32 >> 8));
        inoutTemp[3] = (byte)(0xff & (i32));
        trans_.write(inoutTemp, 0, 4);
      }
    public void write(byte[] buf, int off, int len) throws TTransportException {
        if (outputStream_ == null) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
        }
        try {
          outputStream_.write(buf, off, len);
        } catch (IOException iox) {
          throw new TTransportException(TTransportException.UNKNOWN, iox);
        }
      }

    这里通过socket outputstream发送

    private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
        oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
        args.write(oprot_);
        oprot_.writeMessageEnd();
        oprot_.getTransport().flush();
      }

    下面参数也是通过类似方法发送给服务端。发送完接收服务端发送的结果

    public int recv_multiply() throws org.apache.thrift.TException
        {
          multiply_result result = new multiply_result();
          receiveBase(result, "multiply");
          if (result.isSetSuccess()) {
            return result.success;
          }
          throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "multiply failed: unknown result");
        }
    protected void receiveBase(TBase<?,?> result, String methodName) throws TException {
        TMessage msg = iprot_.readMessageBegin();
        if (msg.type == TMessageType.EXCEPTION) {
          TApplicationException x = new TApplicationException();
          x.read(iprot_);
          iprot_.readMessageEnd();
          throw x;
        }
        if (msg.seqid != seqid_) {
          throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID,
              String.format("%s failed: out of sequence response: expected %d but got %d", methodName, seqid_, msg.seqid));
        }
        result.read(iprot_);
        iprot_.readMessageEnd();
      }
    private static class multiply_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<multiply_result> {
    
          public void read(org.apache.thrift.protocol.TProtocol iprot, multiply_result struct) throws org.apache.thrift.TException {
            org.apache.thrift.protocol.TField schemeField;
            iprot.readStructBegin();
            while (true)
            {
              schemeField = iprot.readFieldBegin();
              if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
                break;
              }
              switch (schemeField.id) {
                case 0: // SUCCESS
                  if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
                    struct.success = iprot.readI32();
                    struct.setSuccessIsSet(true);
                  } else { 
                    org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
                  }
                  break;
                default:
                  org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
              }
              iprot.readFieldEnd();
            }
            iprot.readStructEnd();
    
            // check for required fields of primitive type, which can't be checked in the validate method
            struct.validate();
          }

    由于方法返回类型是int,这里读取结果主要判断是否成功,result值,以及解析结束。

    后面在看下thrift服务端解析客户端请求代码

  • 相关阅读:
    Windows 上运行 Zookeeper
    【Kubernetes】K8S的默认调度策略--如何保证POD调度按照提交顺序进行?
    rabbitmq crashdump分析
    java.sql.SQLRecoverableException: IO Error: SO Exception was generated
    常见的数据分析模型
    事实表设计
    PHP系列 | PHP curl报错:417
    工具系列 | Ubuntu18.04安装Openssl-1.1.1
    PHP系列 | PHP中使用gRPC extension 扩展安装
    云原生之容器安全实践
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9379573.html
Copyright © 2011-2022 走看看