zoukankan      html  css  js  c++  java
  • libthrift0.9.0解析(二)之TSimpleServer

    TSimpleServer简单实现Tserver,代码如下。

    /**
     * Simple singlethreaded server for testing.
     *
     */
    public class TSimpleServer extends TServer {
    
      private static final Logger LOGGER = LoggerFactory.getLogger(TSimpleServer.class.getName());
    
      private boolean stopped_ = false;
    
      public TSimpleServer(AbstractServerArgs args) {
        super(args);
      }
    
      public void serve() {
        stopped_ = false;
        try {
          serverTransport_.listen(); // 开启监听
        } catch (TTransportException ttx) {
          LOGGER.error("Error occurred during listening.", ttx);
          return;
        }
    
        // Run the preServe event
        if (eventHandler_ != null) {
          eventHandler_.preServe(); // 前处理切入
        }
    
        setServing(true);
    
        while (!stopped_) {
          TTransport client = null;
          TProcessor processor = null;
          TTransport inputTransport = null;
          TTransport outputTransport = null;
          TProtocol inputProtocol = null;
          TProtocol outputProtocol = null;
          ServerContext connectionContext = null;
          try {
            client = serverTransport_.accept(); // 获得一个client transport
            if (client != null) { // 以下代码根据获取的client transport层层封装
              processor = processorFactory_.getProcessor(client); // 根据client获取processor
              inputTransport = inputTransportFactory_.getTransport(client); // 根据client获取input transport
              outputTransport = outputTransportFactory_.getTransport(client); // 根据client获取output transport
              inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); // 根据input transport获取input protocol
              outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); // 根据output transport获取output protocol
              if (eventHandler_ != null) {// 处理前切入eventHandler根据input/output protocol生成connectionContext
                connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
              }
              while (true) {// 一直循环处理同一个用户的请求,processor.process永远返回true,只有抛出异常时(即用户端主动关闭连接)才会继续接收下一个用户的请求。
                if (eventHandler_ != null) {// 切入eventHandler处理前面生成的connectionContext
                  eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
                }
                if(!processor.process(inputProtocol, outputProtocol)) {// 调用processor对input/output protocol进行处理
                  break;
                }
              }
            }
          } catch (TTransportException ttx) {
            // Client died, just move on
          } catch (TException tx) {
            if (!stopped_) {
              LOGGER.error("Thrift error occurred during processing of message.", tx);
            }
          } catch (Exception x) {
            if (!stopped_) {
              LOGGER.error("Error occurred during processing of message.", x);
            }
          }
    
          if (eventHandler_ != null) {
            eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
          }
    
          if (inputTransport != null) {
            inputTransport.close();
          }
    
          if (outputTransport != null) {
            outputTransport.close();
          }
    
        }
        setServing(false);
      }
    
      public void stop() {
        stopped_ = true;
        serverTransport_.interrupt();
      }
    }
    

    首先,开启监听,然后在处理之前面向切面切入前处理流程 eventHandler_.preServe();最后进入接收处理流程。

    处理时关键看第54行processor.process()函数。

    TProcessor为一单接口,仅含process方法。其子类thrift不再提供,而是根据用户自定义的thrift IDL文件自动生成。然而thrift提供了一个抽象类TBaseProcessor<I>封装了一些常用的实现,该类实现了TProcessor接口,框架自动生成的Processor都继承了该基类。

    TBaseProcessor<I>代码如下:

    public abstract class TBaseProcessor<I> implements TProcessor {
      private final I iface; // 真正的处理函数实现
      private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;
    
      protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
        this.iface = iface;
        this.processMap = processFunctionMap;
      }
    
      public Map<String,ProcessFunction<I, ? extends TBase>> getProcessMapView() {
        return Collections.unmodifiableMap(processMap);
      }
    
      @Override
      public boolean process(TProtocol in, TProtocol out) throws TException {
        TMessage msg = in.readMessageBegin(); // 获取请求信息,主要包含函数名信息。
        ProcessFunction fn = processMap.get(msg.name); // 根据函数名信息获取相应的处理函数。
        if (fn == null) { // 若没有相关的处理函数,写入异常并返回。
          TProtocolUtil.skip(in, TType.STRUCT);
          in.readMessageEnd();
          TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
          out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
          x.write(out);
          out.writeMessageEnd();
          out.getTransport().flush();
          return true;
        }
        fn.process(msg.seqid, in, out, iface); // 处理请求
        return true;
      }
    }
    

    ProcessFunction为一抽象类,其子类也是根据IDL自动生成。与IDL中的函数一一对应,为代理处理器。

    TBase为IDL中定义的types的基类。T封装了处理函数的参数信息,其子类也是框架根据IDL自动生成。

    public abstract class ProcessFunction<I, T extends TBase> {
      private final String methodName;
    
      private static final Logger LOGGER = LoggerFactory.getLogger(ProcessFunction.class.getName());
    
      public ProcessFunction(String methodName) {
        this.methodName = methodName;
      }
    
      public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
        T args = getEmptyArgsInstance(); // 获取一个空的参数封装
        try {
          args.read(iprot); // 从input protol中读取所有参数到args中
        } catch (TProtocolException e) {
          iprot.readMessageEnd();
          TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
          oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        iprot.readMessageEnd(); // 读取参数完毕
        TBase result = null;
    
        try {
          result = getResult(iface, args); // 获取处理结果
        } catch(Throwable th) {
          LOGGER.error("Internal error processing " + getMethodName(), th);
          TApplicationException x = new TApplicationException(TApplicationException.INTERNAL_ERROR, 
            "Internal error processing " + getMethodName());
          oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
    
        if(!isOneway()) {// true表示单向操作不许要返回信息,即函数为void类型。false表示需要返回值,则启用output protol写入结果。
          oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
          result.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
        }
      }
    
      protected abstract boolean isOneway();
    
      public abstract TBase getResult(I iface, T args) throws TException;
    
      public abstract T getEmptyArgsInstance();
    
      public String getMethodName() {
        return methodName;
      }
    }
    

     到此为止,基本了解了libthrift的大致框架。在处理函数中主要是与protocol打交道,protocol又是根据input/output transport生成,而input/output transport又是根据client transport生成。总形成三层结构,底层是client transport由server transport的accept函数获得,中层是input/output transport ,顶层是input/output protocol。然而三层之间并不是完全透明不可见的,典型如oprot.getTransport().flush() 对上层暴露了底层,也无伤大雅。

    下节介绍protocol以及transport。

  • 相关阅读:
    tcpdump命令详解
    Python isdecimal()方法
    Python-Tkinter几何布局管理
    Python choice() 函数
    Python中的join()函数的用法
    PLSQL连接虚拟机中的Oracle数据库
    卸载oracle
    teradata学习
    teradata在虚拟机安装客户端sql Assistant
    oracle面试
  • 原文地址:https://www.cnblogs.com/wuseguang/p/4031173.html
Copyright © 2011-2022 走看看