zoukankan      html  css  js  c++  java
  • Storm源码分析 Thrift的使用

    1 IDL

    首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service
    然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

    比如对于nimbus service
    在IDL的定义为,

    service Nimbus {
      void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
      void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
      void killTopology(1: string name) throws (1: NotAliveException e);
      void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
      void activate(1: string name) throws (1: NotAliveException e);
      void deactivate(1: string name) throws (1: NotAliveException e);
      void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);
    
      // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
      string beginFileUpload();
      void uploadChunk(1: string location, 2: binary chunk);
      void finishFileUpload(1: string location);
      
      string beginFileDownload(1: string file);
      //can stop downloading chunks when receive 0-length byte array back
      binary downloadChunk(1: string id);
    
      // returns json
      string getNimbusConf();
      // stats functions
      ClusterSummary getClusterInfo();
      TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
      //returns json
      string getTopologyConf(1: string id) throws (1: NotAliveException e);
      StormTopology getTopology(1: string id) throws (1: NotAliveException e);
      StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
    }

    而对应在Nimbus.java的Java代码如下,

    public class Nimbus {
    
      public interface Iface {
    
        public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;
    
        public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;
    
        public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;
    
        public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;
    
        public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;
    
        public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;
    
        public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;
    
        public String beginFileUpload() throws org.apache.thrift7.TException;
    
        public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;
    
        public void finishFileUpload(String location) throws org.apache.thrift7.TException;
    
        public String beginFileDownload(String file) throws org.apache.thrift7.TException;
    
        public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;
    
        public String getNimbusConf() throws org.apache.thrift7.TException;
    
        public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;
    
        public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;
    
        public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;
    
        public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
    
        public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
    
      }

    2 Client

    1. 首先Get Client,

    NimbusClient client = NimbusClient.getConfiguredClient(conf);

    看看backtype.storm.utils下面的client.getConfiguredClient的逻辑,
    只是从配置中取出nimbus的host:port, 并new NimbusClient

        public static NimbusClient getConfiguredClient(Map conf) {
            try {
                String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
                int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
                return new NimbusClient(conf, nimbusHost, nimbusPort);
            } catch (TTransportException ex) {
                throw new RuntimeException(ex);
            }
        }

    NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient
    ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote
    这里看出Thrift对Transport和Protocol的封装
    对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port)
    然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

        public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
            try {
                //locate login configuration 
                Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);
    
                //construct a transport plugin
                ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);
    
                //create a socket with server
                if(host==null) {
                    throw new IllegalArgumentException("host is not set");
                }
                if(port<=0) {
                    throw new IllegalArgumentException("invalid port: "+port);
                }            
                TSocket socket = new TSocket(host, port);
                if(timeout!=null) {
                    socket.setTimeout(timeout);
                }
                final TTransport underlyingTransport = socket;
    
                //establish client-server transport via plugin
                _transport =  transportPlugin.connect(underlyingTransport, host); 
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            _protocol = null;
            if (_transport != null)
                _protocol = new  TBinaryProtocol(_transport);
        }

    2. 调用任意RPC
    那么就看看submitTopologyWithOpts

    client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 

    可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

        public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException
        {
          send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
          recv_submitTopologyWithOpts();
        }
    分两步,
    首先send_submitTopologyWithOpts, 调用sendBase
    接着, recv_submitTopologyWithOpts, 调用receiveBase
      protected void sendBase(String methodName, TBase args) throws TException {
        oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
        args.write(oprot_);
        oprot_.writeMessageEnd();
        oprot_.getTransport().flush();
      }
    
      protected void receiveBase(TBase result, String methodName) throws TException {
        TMessage msg = iprot_.readMessageBegin();
        if (msg.type == TMessageType.EXCEPTION) {
          TApplicationException x = TApplicationException.read(iprot_);
          iprot_.readMessageEnd();
          throw x;
        }
        if (msg.seqid != seqid_) {
          throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
        }
        result.read(iprot_);
        iprot_.readMessageEnd();
      }
    可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定

    3 Server

    Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现
    下面看看在nimbus server端, 是用clojure来写的
    可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer, TBinaryProtocol, Proccessor, 非常简单
    其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

    (defn launch-server! [conf nimbus]
      (validate-distributed-mode! conf)
      (let [service-handler (service-handler conf nimbus)
            options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                        (THsHaServer$Args.)
                        (.workerThreads 64)
                        (.protocolFactory (TBinaryProtocol$Factory.))
                        (.processor (Nimbus$Processor. service-handler))
                        )
           server (THsHaServer. options)]
        (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
        (log-message "Starting Nimbus server...")
        (.serve server)))
  • 相关阅读:
    萌新向Python数据分析及数据挖掘 第三章 机器学习常用算法 第三节 梯度下降法 (上)理解篇
    萌新向Python数据分析及数据挖掘 第三章 机器学习常用算法 第二节 线性回归算法 (下)实操篇
    萌新向Python数据分析及数据挖掘 第三章 机器学习常用算法 第二节 线性回归算法 (上)理解篇
    萌新向Python数据分析及数据挖掘 第三章 机器学习常用算法 第一节 KNN算法 (下)实操篇
    萌新向Python数据分析及数据挖掘 第三章 机器学习常用算法 第一节 KNN算法 (上)理解篇
    萌新向Python数据分析及数据挖掘 第二章 pandas 第五节 Getting Started with pandas
    Oracle数据库安装和授权
    c# 如何获取JSON文件以及如何获取Config文件(framework 和 net .Core)
    C#Core查询数据库存储EXCEL文件
    如何在WINDOW系统下编译P12证书制作
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3117385.html
Copyright © 2011-2022 走看看