zoukankan      html  css  js  c++  java
  • Thrift笔记(三)--Thrift框架通信源码分析

    Thrift 客户端调用RPC的Demo

    public static void main(String[] args) throws Exception {
            TTransport transport = new TSocket("127.0.0.1", 7912);
            TProtocol protocol = new TBinaryProtocol(transport);
            // 创建client
            com.gxf.thrift.HelloWordService.Client client = new com.gxf.thrift.HelloWordService.Client(protocol);
            transport.open();  // 建立连接
            // 第一种请求类型
            com.gxf.thrift.Request request = new com.gxf.thrift.Request()
                    .setType(com.gxf.thrift.RequestType.SAY_HELLO).setName("guanxiangfei").setAge(24);
            System.out.println(client.doAction(request));
            // 第二种请求类型
            request.setType(com.gxf.thrift.RequestType.QUERY_TIME).setName("guanxiangfei");
            System.out.println(client.doAction(request));
    
            transport.close();  // 请求结束,断开连接
        }

    这里可以很清楚看到分层设计,以及层层封装。transport, proctol这里网上有人说是用了装饰模式

    跟进transport.open()代码

      /**
       * Opens the transport for reading/writing.
       *
       * @throws TTransportException if the transport could not be opened
       */
      public abstract void open()
        throws TTransportException;

    这里是个抽象类TTransport的抽象方法,看下实现类TSocekt实现方法

    /**
       * Connects the socket, creating a new socket object if necessary.
       */
      public void open() throws TTransportException {
        if (isOpen()) {
          throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
        }
    
        if (host_ == null || host_.length() == 0) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
        }
        if (port_ <= 0 || port_ > 65535) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + port_);
        }
    
        if (socket_ == null) {
          initSocket();
        }
    
        try {
          socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
          inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
          outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
        } catch (IOException iox) {
          close();
          throw new TTransportException(TTransportException.NOT_OPEN, iox);
        }
      }

    这里可以看出,用socket建立了连接,为发送序列化数据做准备

    继续跟进代码

    client.doAction(request)

    继续

    public String doAction(Request request) throws RequestException, org.apache.thrift.TException
        {
          send_doAction(request);
          return recv_doAction();
        }

    这里是thrift编译器或者说代码生成器,生成的框架代码

    第一个应该是发送序列化数据,第二个方法应该是接收服务端数据。这里分析第一个方法,发送序列化数据。继续跟进代码

    public void send_doAction(Request request) throws org.apache.thrift.TException
        {
          doAction_args args = new doAction_args();
          args.setRequest(request);
          sendBase("doAction", args);
        }

    这里request实体被设置到了args里面,继续跟进sendBase

    protected void sendBase(String methodName, TBase<?,?> args) throws TException {
        sendBase(methodName, args, TMessageType.CALL);
      }

    继续跟进

    private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
        oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
        args.write(oprot_);
        oprot_.writeMessageEnd();
        oprot_.getTransport().flush();
      }

    这是TServerClient.java不是生成代码,是thrift框架代码。这里可以看到RPC方法,args,以及请求类型应该需要被发送给服务端

    这里可以看出,先会写一个消息开始的TMessage开始符号, 然后写args,最后写TMessage结束标识,刷新缓存。这应该是thrift发送消息的协议,先发送什么,后发送什么

    继续跟进writeMessageBegin()方法

    这里跟进的是TBinaryProtocol的实现

    public void writeMessageBegin(TMessage message) throws TException {
        if (strictWrite_) {
          int version = VERSION_1 | message.type;
          writeI32(version);
          writeString(message.name);
          writeI32(message.seqid);
        } else {
          writeString(message.name);
          writeByte(message.type);
          writeI32(message.seqid);
        }
      }

    这里可以看出写入消息Begin的内容,version, name, seqid等

    继续跟进writeI32(version)看下 一个整型传输的协议

    public void writeI32(int i32) throws TException {
        inoutTemp[0] = (byte)(0xff & (i32 >> 24));
        inoutTemp[1] = (byte)(0xff & (i32 >> 16));
        inoutTemp[2] = (byte)(0xff & (i32 >> 8));
        inoutTemp[3] = (byte)(0xff & (i32));
        trans_.write(inoutTemp, 0, 4);
      }

    这里可以看出,用了4个byte来存放一个int。然后调用tranport发送。这里Demo里用的应该是TSocket发送。int用4个byte发送应该也是正常的,一个int占4个字节

    接着跟进tran_.write()方法。TSocket的父类是TIOStreamTransport。进入TIOStreamTransport

    /**
       * Writes to the underlying output stream if not null.
       */
      public void write(byte[] buf, int off, int len) throws TTransportException {
        if (outputStream_ == null) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
        }
        try {
          outputStream_.write(buf, off, len);
        } catch (IOException iox) {
          throw new TTransportException(TTransportException.UNKNOWN, iox);
        }
      }

    这里的outputStream在new TSocket()的时候初始化了,这里也可以看出Demo用的是socket这种阻塞io发送的。当然thrift也支持非阻塞的ChannelSocekt,文件等。

    回到写args这一块我还要在看看,好像要跟到生成的代码中。

    总结:

    1. 底层发送序列化文件用的是socket, channelsocket等

    2. thrift有一套自己的协议,如先发送messagebegin标识, int占几个字节等

  • 相关阅读:
    Golang初学者的资源整理
    Mesos和kubernetes
    Go by Example
    dbus 和 policykit 实例篇(python) ()转
    CentOS7卸载KDE桌面(转)
    Please read “Security” section of the manual to find out how to run mysqld as root!错误解决(转)
    yum使用总结(转)
    Linux rpm 命令参数使用详解[介绍和应用](转)
    nginx+lua+redis高并发应用建设
    python网络爬虫进入(一)——简单的博客爬行动物
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9216772.html
Copyright © 2011-2022 走看看