zoukankan      html  css  js  c++  java
  • Thrift笔记(三)--Thrift框架通信源码分析

    Thrift 客户端调用RPC的Demo

    public static void main(String[] args) throws Exception {
            TTransport transport = new TSocket("127.0.0.1", 7912);
            TProtocol protocol = new TBinaryProtocol(transport);
            // 创建client
            com.gxf.thrift.HelloWordService.Client client = new com.gxf.thrift.HelloWordService.Client(protocol);
            transport.open();  // 建立连接
            // 第一种请求类型
            com.gxf.thrift.Request request = new com.gxf.thrift.Request()
                    .setType(com.gxf.thrift.RequestType.SAY_HELLO).setName("guanxiangfei").setAge(24);
            System.out.println(client.doAction(request));
            // 第二种请求类型
            request.setType(com.gxf.thrift.RequestType.QUERY_TIME).setName("guanxiangfei");
            System.out.println(client.doAction(request));
    
            transport.close();  // 请求结束,断开连接
        }

    这里可以很清楚看到分层设计,以及层层封装。transport, proctol这里网上有人说是用了装饰模式

    跟进transport.open()代码

      /**
       * Opens the transport for reading/writing.
       *
       * @throws TTransportException if the transport could not be opened
       */
      public abstract void open()
        throws TTransportException;

    这里是个抽象类TTransport的抽象方法,看下实现类TSocekt实现方法

    /**
       * Connects the socket, creating a new socket object if necessary.
       */
      public void open() throws TTransportException {
        if (isOpen()) {
          throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
        }
    
        if (host_ == null || host_.length() == 0) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
        }
        if (port_ <= 0 || port_ > 65535) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + port_);
        }
    
        if (socket_ == null) {
          initSocket();
        }
    
        try {
          socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
          inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
          outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
        } catch (IOException iox) {
          close();
          throw new TTransportException(TTransportException.NOT_OPEN, iox);
        }
      }

    这里可以看出,用socket建立了连接,为发送序列化数据做准备

    继续跟进代码

    client.doAction(request)

    继续

    public String doAction(Request request) throws RequestException, org.apache.thrift.TException
        {
          send_doAction(request);
          return recv_doAction();
        }

    这里是thrift编译器或者说代码生成器,生成的框架代码

    第一个应该是发送序列化数据,第二个方法应该是接收服务端数据。这里分析第一个方法,发送序列化数据。继续跟进代码

    public void send_doAction(Request request) throws org.apache.thrift.TException
        {
          doAction_args args = new doAction_args();
          args.setRequest(request);
          sendBase("doAction", args);
        }

    这里request实体被设置到了args里面,继续跟进sendBase

    protected void sendBase(String methodName, TBase<?,?> args) throws TException {
        sendBase(methodName, args, TMessageType.CALL);
      }

    继续跟进

    private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
        oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
        args.write(oprot_);
        oprot_.writeMessageEnd();
        oprot_.getTransport().flush();
      }

    这是TServerClient.java不是生成代码,是thrift框架代码。这里可以看到RPC方法,args,以及请求类型应该需要被发送给服务端

    这里可以看出,先会写一个消息开始的TMessage开始符号, 然后写args,最后写TMessage结束标识,刷新缓存。这应该是thrift发送消息的协议,先发送什么,后发送什么

    继续跟进writeMessageBegin()方法

    这里跟进的是TBinaryProtocol的实现

    public void writeMessageBegin(TMessage message) throws TException {
        if (strictWrite_) {
          int version = VERSION_1 | message.type;
          writeI32(version);
          writeString(message.name);
          writeI32(message.seqid);
        } else {
          writeString(message.name);
          writeByte(message.type);
          writeI32(message.seqid);
        }
      }

    这里可以看出写入消息Begin的内容,version, name, seqid等

    继续跟进writeI32(version)看下 一个整型传输的协议

    public void writeI32(int i32) throws TException {
        inoutTemp[0] = (byte)(0xff & (i32 >> 24));
        inoutTemp[1] = (byte)(0xff & (i32 >> 16));
        inoutTemp[2] = (byte)(0xff & (i32 >> 8));
        inoutTemp[3] = (byte)(0xff & (i32));
        trans_.write(inoutTemp, 0, 4);
      }

    这里可以看出,用了4个byte来存放一个int。然后调用tranport发送。这里Demo里用的应该是TSocket发送。int用4个byte发送应该也是正常的,一个int占4个字节

    接着跟进tran_.write()方法。TSocket的父类是TIOStreamTransport。进入TIOStreamTransport

    /**
       * Writes to the underlying output stream if not null.
       */
      public void write(byte[] buf, int off, int len) throws TTransportException {
        if (outputStream_ == null) {
          throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
        }
        try {
          outputStream_.write(buf, off, len);
        } catch (IOException iox) {
          throw new TTransportException(TTransportException.UNKNOWN, iox);
        }
      }

    这里的outputStream在new TSocket()的时候初始化了,这里也可以看出Demo用的是socket这种阻塞io发送的。当然thrift也支持非阻塞的ChannelSocekt,文件等。

    回到写args这一块我还要在看看,好像要跟到生成的代码中。

    总结:

    1. 底层发送序列化文件用的是socket, channelsocket等

    2. thrift有一套自己的协议,如先发送messagebegin标识, int占几个字节等

  • 相关阅读:
    lr 增强窗格中,如何生成调试信息?
    lr 自带的例子,如何进行关联,通过代码的函数进行实现
    lr11 录制脚本时候,无法自动启动ie,查了网上很多方法都未解决?
    loadrunner11 录制脚步不成功,在录制概要出现“No Events were detected”,浮动窗口总是显示“0 Events”,解决办法
    loadrunner11 安装及破解教程来自百度文库
    安装loadrunner11 ,出现如下错误如何解决?
    回收站数据删除了,如何进行恢复?
    网管工作方面——————打印机删除了然后开机重启他依然存在,如何解决
    Windows 不能在 本地计算机 启动 SQL Server 服务 错误代码126
    Sorry, the page you are looking for is currently unavailable. Please try again later. Nginx
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9216772.html
Copyright © 2011-2022 走看看