zoukankan      html  css  js  c++  java
  • Storm源码分析 Thrift的使用

    1 IDL

    首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service
    然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

    比如对于nimbus service
    在IDL的定义为,

    service Nimbus {
      void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
      void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
      void killTopology(1: string name) throws (1: NotAliveException e);
      void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
      void activate(1: string name) throws (1: NotAliveException e);
      void deactivate(1: string name) throws (1: NotAliveException e);
      void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);
    
      // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
      string beginFileUpload();
      void uploadChunk(1: string location, 2: binary chunk);
      void finishFileUpload(1: string location);
      
      string beginFileDownload(1: string file);
      //can stop downloading chunks when receive 0-length byte array back
      binary downloadChunk(1: string id);
    
      // returns json
      string getNimbusConf();
      // stats functions
      ClusterSummary getClusterInfo();
      TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
      //returns json
      string getTopologyConf(1: string id) throws (1: NotAliveException e);
      StormTopology getTopology(1: string id) throws (1: NotAliveException e);
      StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
    }

    而对应在Nimbus.java的Java代码如下,

    public class Nimbus {
    
      public interface Iface {
    
        public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;
    
        public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;
    
        public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;
    
        public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;
    
        public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;
    
        public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;
    
        public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;
    
        public String beginFileUpload() throws org.apache.thrift7.TException;
    
        public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;
    
        public void finishFileUpload(String location) throws org.apache.thrift7.TException;
    
        public String beginFileDownload(String file) throws org.apache.thrift7.TException;
    
        public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;
    
        public String getNimbusConf() throws org.apache.thrift7.TException;
    
        public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;
    
        public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;
    
        public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;
    
        public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
    
        public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
    
      }

    2 Client

    1. 首先Get Client,

    NimbusClient client = NimbusClient.getConfiguredClient(conf);

    看看backtype.storm.utils下面的client.getConfiguredClient的逻辑,
    只是从配置中取出nimbus的host:port, 并new NimbusClient

        public static NimbusClient getConfiguredClient(Map conf) {
            try {
                String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
                int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
                return new NimbusClient(conf, nimbusHost, nimbusPort);
            } catch (TTransportException ex) {
                throw new RuntimeException(ex);
            }
        }

    NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient
    ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote
    这里看出Thrift对Transport和Protocol的封装
    对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port)
    然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

        public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
            try {
                //locate login configuration 
                Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);
    
                //construct a transport plugin
                ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);
    
                //create a socket with server
                if(host==null) {
                    throw new IllegalArgumentException("host is not set");
                }
                if(port<=0) {
                    throw new IllegalArgumentException("invalid port: "+port);
                }            
                TSocket socket = new TSocket(host, port);
                if(timeout!=null) {
                    socket.setTimeout(timeout);
                }
                final TTransport underlyingTransport = socket;
    
                //establish client-server transport via plugin
                _transport =  transportPlugin.connect(underlyingTransport, host); 
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            _protocol = null;
            if (_transport != null)
                _protocol = new  TBinaryProtocol(_transport);
        }

    2. 调用任意RPC
    那么就看看submitTopologyWithOpts

    client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 

    可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

        public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException
        {
          send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
          recv_submitTopologyWithOpts();
        }
    分两步,
    首先send_submitTopologyWithOpts, 调用sendBase
    接着, recv_submitTopologyWithOpts, 调用receiveBase
      protected void sendBase(String methodName, TBase args) throws TException {
        oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
        args.write(oprot_);
        oprot_.writeMessageEnd();
        oprot_.getTransport().flush();
      }
    
      protected void receiveBase(TBase result, String methodName) throws TException {
        TMessage msg = iprot_.readMessageBegin();
        if (msg.type == TMessageType.EXCEPTION) {
          TApplicationException x = TApplicationException.read(iprot_);
          iprot_.readMessageEnd();
          throw x;
        }
        if (msg.seqid != seqid_) {
          throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
        }
        result.read(iprot_);
        iprot_.readMessageEnd();
      }
    可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定

    3 Server

    Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现
    下面看看在nimbus server端, 是用clojure来写的
    可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer, TBinaryProtocol, Proccessor, 非常简单
    其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

    (defn launch-server! [conf nimbus]
      (validate-distributed-mode! conf)
      (let [service-handler (service-handler conf nimbus)
            options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                        (THsHaServer$Args.)
                        (.workerThreads 64)
                        (.protocolFactory (TBinaryProtocol$Factory.))
                        (.processor (Nimbus$Processor. service-handler))
                        )
           server (THsHaServer. options)]
        (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
        (log-message "Starting Nimbus server...")
        (.serve server)))
  • 相关阅读:
    GetArxPath
    动态链接库
    获取文件名称 消除前面的绝对地址路径
    arx 插入图片
    cstring to utf8
    map 用法
    异常处理
    面向对象 "一"
    configparser模块
    装饰器
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3117385.html
Copyright © 2011-2022 走看看