zoukankan      html  css  js  c++  java
  • Nimbus<三>Storm源码分析--Nimbus启动过程

    Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动
    看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main
    nimbus是用clojure实现的, 但是clojure是基于JVM的, 所以在最终发布的时候会产生nimbus.class,
    所以在用户使用的时候完全可以不知道clojure, 看上去所有都是Java. clojure只是用于提高开发效率而已.

    1. Nimbus启动过程

    bin/storm

    def nimbus(klass="backtype.storm.daemon.nimbus"):
        """Syntax: [storm nimbus]
        Launches the nimbus daemon. This command should be run under 
        supervision with a tool like daemontools or monit. 
        See Setting up a Storm cluster for more information.
        """
        cppaths = [CLUSTER_CONF_DIR]
        jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
            "-Dlogfile.name=nimbus.log",
            "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
        ]
        exec_storm_class(klass, jvmtype="-server", extrajars=cppaths, jvmopts=jvmopts)
    

    storm-core/backtype/storm/daemon/nimbus.clj

    ;; 启动nimbus的主方法
    (defn -main []                      ;; main前面加上-, 表示是public的. 所以bin/storm能直接调用nimbus.clj的main方法
      (-launch (standalone-nimbus)))        ;; 同样launch也是一个public方法. standalone-nimbus是一个方法, clojure对于没有参数的方法可以省略()
    
    (defn -launch [nimbus]              ;; launch的参数是一个Nimbus对象, 所以上面standalone-nimbus方法的返回值是Nimbus
      (launch-server! (read-storm-config) nimbus))
    

    注意在clojure中的函数命名规范,-functionname表示该函数是public的,如上面的-main,调用该函数的时候,不需要加-,使用main即可。
    而与此相对的是defn-,这个表示该函数是私有函数,不可在外部调用。

    1) standalone-nimbus

    nimbus的main, 最终会调到launch-server!, conf参数是调用read-storm-config读出的配置参数,
    而nimbus是INimbus接口(backtype.storm.scheduler.INimbus)的实现, 可以参考standalone-nimbus.
    storm-core/backtype/storm/scheduler/INimbus.java

    public interface INimbus {
        void prepare(Map stormConf, String schedulerLocalDir);
        /**Returns all slots that are available for the next round of scheduling.在下一次调度中可用的槽位
         * A slot is available for scheduling 如果槽位是空闲的且可以被分配的, 或者虽然被使用但可以被重新分配的. 都是可以被调度的
         * if it is free and can be assigned to, or if it is used and can be reassigned. */
        Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments);
    
        // this is called after the assignment is changed in ZK
        void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId);
    
        // map from node id to supervisor details
        String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId);    
        IScheduler getForcedScheduler(); 
    }
    
    ;; 返回一个实现了INimbus接口的对象. 由于不想创建这种类型, 使用reify匿名对象
    (defn standalone-nimbus []                  ;; 没有参数. clojure中[]使用的地方有: let绑定, 方法的参数, vector
      (reify INimbus                                ;; reify: 具体化匿名数据类型: 需要一个实现了某一协议/接口的对象,但是不想创建一个命名的数据类型. 匿名类
        ;; 下面的方式都是INimbus接口的实现方法
        (prepare [this conf local-dir])                 ;; this可以看做是一个隐式参数, prepare方法实际只有2个参数的
        (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
          (->> supervisors
               (mapcat (fn [^SupervisorDetails s]
                         (for [p (.getMeta s)]
                           (WorkerSlot. (.getId s) p))))
               set ))
        (assignSlots [this topology slots])
        (getForcedScheduler [this] nil )
        (getHostName [this supervisors node-id]
          (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
            (.getHost supervisor)))
        ))
    

    这里面有好几个语法点: ->>, mapcat if-let
    mapcat, (mapcat f & colls) 和普通map不同的是, 会对map执行的结果执行concat操作等于(concat (map f &colls))
    依次对colls中的每个集合运用函数f, 最后将每个结果合并起来. (mapcat f collections)的map不是数据结构意义的映射. 而是一个遍历操作.
    普通的map版本是: (map f collection), 用java来描述就是for(Object o : collection) func(o). 集合中的每个元素会作为函数f的参数.
    上面的(mapcat (fn [s] ...))并没有看到collections. 这个要结合->> supervisors来一起分析.
    ->> supervisors (mapcat fun) 实际上等价于(mapcat fun supervisors). 由于mapcat的返回值是map,根据接口的定义返回值是一个集合Collection
    所以(mapcat)表达式后面的set的意思是将(mapcat)表达式的返回值转换为set, (mapcat)表达式的返回值会跟在set后面作为最后一个Item.
    达到连续调用的功能. ->>和->的区别是->是将返回值作为下一个表达式的第二个Item, 而->>是作为下一个表达式的最后一个Item.

    supervisors不是Supervisor列表, 其类型是SupervisorDetails. mapcat后面紧跟的函数的参数类型对应的是collections=supervisors的类型.
    WorkerSlot需要两个参数id和port. 所以这个方法返回的是Collection, 对应接口INimbus的返回类型.

    getHostName的参数supervisors和allSlotsAvailableForScheduling的supervisors是一样的.
    通过supervisors.get(node-id)获取对应的supervisor. 所以我们可以猜测supervisors是一个Map.
    storm-core/backtype/storm/scheduler/SupervisorDetails.java

    public class SupervisorDetails {
        String id;
        String host;                // hostname of this supervisor
        Object meta;
        Object schedulerMeta;       // meta data configured for this supervisor
        Set<Integer> allPorts;      // all the ports of the supervisor  
    }
    

    Nimbus要分配任务给Supervisor上的Worker进行工作, 而每个Supervisor会有多个worker. 配置文件中可以为一个supervisor配置多个slot port.

    2) read-storm-config

    阅读源码其实都会遵循一个范式,那就是程序的入口在哪,配置文件是在什么时候读入的。那么好,现在就来讲配置参数的读入,在上面的-launch函数中,
    已经可以见到用以读取配置文件的函数了,那就是read-storm-config。非常狗血的是, 在 nimbus.clj 中有一个名称非常类似的函数称为read-storm-conf,这个可不是来读取storm cluster的配置信息,它其实是用来读取Topology的配置内容的。read-storm-config定义于config.clj中,此时你会说等等,没见到有地方
    import或是use backtype.storm.config啊。这一切都被包装了,它们统统被放到bootstrap.clj中了。注意到这行没 (bootstrap)
    好了, 上述有关文件引用的疑问解决之后, 还是回到正题, 看看read-storm-config的定义吧。storm默认的配置文件使用的是yaml格式,一定要找到使用yaml parser的地方。
    storm-core/backtype/storm/config.clj

    (defn read-storm-config []
      (let [conf (clojurify-structure (Utils/readStormConfig))] ;; let中参数conf被赋值为右侧的表达式的值. 和java方法参数不同, let中参数可以被计算
        (validate-configs-with-schemas conf)                ;; 对conf进行验证
        conf))                                      ;; 返回这个conf 
    

    真正实现对配置文件storm.yaml进行读取的是由java代码来实现的,readStormConfig定义于Utils.java中。
    storm-core/backtype/storm/utils/Utils.java

        public static Map readDefaultConfig() {
            return findAndReadConfigFile("defaults.yaml", true);
        }
        public static Map readStormConfig() {
            Map ret = readDefaultConfig();              // 首先读取defaults.yaml的配置
            String confFile = System.getProperty("storm.conf.file");
            Map storm;
            if (confFile==null || confFile.equals("")) {
                storm = findAndReadConfigFile("storm.yaml", false); 
            } else {
                storm = findAndReadConfigFile(confFile, true);
            }
            ret.putAll(storm);                          // 其次读取storm.yaml中的配置
            ret.putAll(readCommandLineOpts());          // 最后是命令行的参数, 这个优先级更高
            return ret;
        }
        public static Map findAndReadConfigFile(String name, boolean mustExist) {
                HashSet<URL> resources = new HashSet<URL>(findResources(name));
                URL resource = resources.iterator().next();
                Yaml yaml = new Yaml();
                Map ret = (Map) yaml.load(new InputStreamReader(resource.openStream()));
                if(ret==null) ret = new HashMap();
                return new HashMap(ret);                // 解析storm.yaml文件, 返回HashMap
        }
        public static List<URL> findResources(String name) {
                Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
                List<URL> ret = new ArrayList<URL>();
                while(resources.hasMoreElements()) {
                    ret.add(resources.nextElement());   
                }
                return ret;
        }
    

    终于看到神秘的Yaml了,那么Yaml这个类又是由谁提供的呢,看看Utils.java的 开头部分有这么一句话: import org.yaml.snakeyaml.Yaml;
    再看看在storm-core/project.clj中定义的dependencies: [org.yaml/snakeyaml "1.11"]
    至此,yaml文件的解析及其依赖关系的解决探索完毕。在新版本的storm中使用了maven管理. 可以查看pom.xml

    3) storm.yaml

    conf/storm.yaml

    # storm.zookeeper.servers:
    #     - "server1"
    #     - "server2"
    # 
    # nimbus.host: "nimbus"
    # 
    # ##### These may optionally be filled in:
    #    
    ## List of custom serializations
    # topology.kryo.register:
    #     - org.mycompany.MyType
    #     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
    #
    ## List of custom kryo decorators
    # topology.kryo.decorators:
    #     - org.mycompany.MyDecorator
    #
    ## Locations of the drpc servers
    # drpc.servers:
    #     - "server1"
    #     - "server2"
    
    ## Metrics Consumers
    # topology.metrics.consumer.register:
    #   - class: "backtype.storm.metrics.LoggingMetricsConsumer"
    #     parallelism.hint: 1
    #   - class: "org.mycompany.MyMetricsConsumer"
    #     parallelism.hint: 1
    #     argument:
    #       - endpoint: "metrics-collector.mycompany.org"
    storm.zookeeper.servers:
       - 127.0.0.1
    storm.zookeeper.port: 2181
    nimbus.host: "127.0.0.1"
    storm.local.dir: "/home/hadoop/data/storm"
    supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703
    

    在配置文件中需要至少回答以下三个问题
    1. zookeeper server在哪台机器上运行,具体就来说就是ip地址啦
    2. nimbus在哪运行,可以填写ip地址或域名
    3. 在每台supervisor运行的机器上可以启几个slot,指定这些slot监听的端 口号
    2. thrift RPC
    1) thrift
    网络结点之间的消息交互一般会牵涉到两个基本的问题,
    • 消息通道的建立
    • 消息的编解码
    如果每变化一个需求就手工来重写一次,一是繁琐,二是易错。为了一劳永逸的解决此类问题,神一样的工具就出现了,如google protolbuffer,如thrift.
    thrift的使用步骤如下

    编写后缀名为thrift的文件,使用工具生成对应语言的源码,thrift支持的语言很多的,什么c,c++,java,python等,统统不是问题。
    实现thrift client
    实现thrift server
    thrift server需要实现thrift文件中定义的service接口。更为具体的信息可以通过阅读官方文档来获得。这里有个thrift java的示例.

    (1). 编写thrift文件:add.thrift

    namespace java com.zqh.code.thrift.server   // defines the namespace
    typedef i32 int                         // typedefs to get convenient names for your types
    service AdditionService {               // defines the service to add two numbers
            int add(1:int n1, 2:int n2),            // defines a method
    }
    

    (2). 编译:thrift --gen java add.thrift 会在当前目录生成gen-java/$namespace$/AdditionService
    (3). Service:Interface的实现类

    public class AdditionServiceHandler implements AdditionService.Iface {
        public int add(int n1, int n2) throws TException { return n1 + n2; }
    }
    

    实现类具体实现了thrift文件定义的接口方法.
    (4). Server

    public class MyServer {
        public static void start(AdditionService.Processor<AdditionServiceHandler> processor) {
                TServerTransport serverTransport = new TServerSocket(9090);     // 服务端Socket
                TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));
                System.out.println("Starting the simple server...");
                server.serve();
        }
        public static void main(String[] args) {
            start(new AdditionService.Processor<AdditionServiceHandler>(new AdditionServiceHandler()));
        }
    }
    

    服务端通过TServerSocket暴露出服务端口, 客户端要通过这个端口连接.
    实现类Handler的实例要作为生成的AdditionService.Processor的参数.
    Args需要TServerTransport作为参数, 然后调用processor方法, 该方法需要AdditionServiceProcessor参数.
    这个过程类似于将自定义实现类Handler注册到服务端上. 接着启动服务器.
    (5). Client

    public class AdditionClient {
        public static void main(String[] args) {
                TTransport transport = new TSocket("localhost", 9090);
                transport.open();
                TProtocol protocol = new TBinaryProtocol(transport);
                AdditionService.Client client = new AdditionService.Client(protocol);
                System.out.println(client.add(100, 200));
                transport.close();
        }
    }
    

    客户端要建立到服务端的连接, 需要提供Server的host和port. 根据TTransport构造出和服务端进行通讯的一个协议.
    这个协议传给自动生成的AdditionService的Client内部类, 会生成一个类似服务端的代理对象.
    接着就可以使用这个代理对象调用thrift协议提供的方法.

    分布式测试: 可以在两台机器上测试. 第一二步都需要在两台机器上操作: 编写thrift文件, 编译.
    然后在第一台机器操作3: 自定义实现类; 4: Server. 在第二台机器上操作5: Client. 最后分别运行两台机器的Server和Client.

    2) nimbus thrift server

    有了thrift这个背景,我们再重新拾起上述的代码执行路径。上头讲到程序执行至

    (defn -launch [nimbus]                                  ;; launch的参数是一个Nimbus对象, 所以上面standalone-nimbus方法的返回值是Nimbus
      (launch-server! (read-storm-config) nimbus))
    
    (defn launch-server! [conf nimbus]                      ;; 让nimbus作为一个thrift server运行起来
      (validate-distributed-mode! conf)                     ;; 分布式模式下才会启动thrift server
      (let [service-handler (service-handler conf nimbus)           ;; 自定义实现类, 实现storm.thrift中service Nimbus定义的接口方法
            options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) ;; 服务端的ServerSocket
                        (THsHaServer$Args.)             ;; TServerSocket作为TServer.Args内部类的参数. 创建了Args args对象 ->表示插入第二个位置
                        (.workerThreads 64)                 ;; 上面new Args(TServerSocket)会作为这里的第二个位置, 即args.workerThreads(64)
                        (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                        (.processor (Nimbus$Processor. service-handler))    ;; args作为这里的第二个位置,即调用了args.processor
                                                        ;; new Nimbus.Processor(service-handler), 自定义实现类作为Nimbus.Processor的参数,
                                                        ;; processor会作为参数再传给args.processor()
                        )                               ;; 最终返回的是TServer.AbstractServerArgs, 会作为TServer构造函数的参数
           server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
        (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
        (log-message "Starting Nimbus server...")               ;; 上面添加了一个关闭钩子. 类似回调函数. 当关闭Nimbus的thrift服务时, 会触发这个函数执行
        (.serve server)))                               ;; 启动TServer, 即启动Nimbus的thrift服务
    

    launch-server!说白了,就是让nimbus作为一个thrift server运行起来, 那么storm.thrift中service指定的各个接口函数实现在service-handler中完成。
    对比clojure版本的创建thrift server的过程, 其实和上面java示例是一样的, 只不过换了不同的实现类. 以下是java-clojure的代码对比.
    new AdditionServiceHandler() (service-handler conf nimbus)
    new AdditionService.Processor(new AdditionServiceHandler()) (Nimbus$Processor. service-handler)
    TServerTransport serverTransport = new TServerSocket(9090); (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
    new Args(serverTransport) -> (TNonblockingServerSocket...) (THsHaServer$Args.)
    new Args(serverTransport).processor(processor) -> (TNonblockingServerSocket...) (THsHaServer$Args.) (.processor (Nimbus$Processor. ..))
    TServer server = new TSimpleServer(new Args(serverTransport).processor(processor)); server (THsHaServer… options)
    server.serve(); (.serve server)

    service-handler可是一个大家伙。对比一下 service-handler可以发现,在storm.thrift中的定义的Nimbus服务,
    其接口在 service-handler中一一得以实现。 以下是storm.thrift中关于service Nimbus的声明。
    storm-core/storm.thrift

    namespace java backtype.storm.generated
    service Nimbus {
      void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
      void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws ...;
      void killTopology(1: string name) throws (1: NotAliveException e);
      void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
      void activate(1: string name) throws (1: NotAliveException e);
      void deactivate(1: string name) throws (1: NotAliveException e);
      void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);
    
      // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
      string beginFileUpload();
      void uploadChunk(1: string location, 2: binary chunk);
      void finishFileUpload(1: string location);
      string beginFileDownload(1: string file);
      binary downloadChunk(1: string id);       //can stop downloading chunks when receive 0-length byte array back
    
      string getNimbusConf();                   // returns json
      ClusterSummary getClusterInfo();          // stats functions  
      TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
      string getTopologyConf(1: string id) throws (1: NotAliveException e); //returns json
      StormTopology getTopology(1: string id) throws (1: NotAliveException e);
      StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
    }
    

    这个文件还指定了其他一些struct结构的数据类型, 比如StormTopology, TopologySummary, ClusterSummary, TopologyInfo等.
    编译storm.thrift文件生成的代码在namespace指定的位置: backtype.storm.generated
    storm-core/genthrift.sh

    thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift
    

    现在来回顾下storm的thrift RPC的整体流程.
    1. 编写 storm.thrift
    2. 编译 genthrift.sh, 会在backtype.storm.generated生成Nimbus.java接口类. 其中含有内部类Iface(Service), Processor(Server), Client(Client)
    3. Service服务类: nimbus.clj中的service-handler方法的返回值. 其应该实现Nimbus.Iface接口. 所以service-handler使用reify Nimbus$Iface
    4. Server服务端: launch-server!中创建thrift的TServer, 并启动. 使用了Nimbus.Processor, 传入service-handler自定义服务实现类
    5. Client客户端: StormSubmitter中localNimbus!=null时, 使用NimbusClient即Nimbus.Client调用RPC定义的接口方法

    注意: 对于本地模式, 在StormSubmitter中直接使用Nimbus.Iface localNimbus对象. 这个对象的实现类应该就是service-handler.
    对于分布式模式, StormSubmitter作为客户端, 会通过client调用RPC定义的接口方法. 即storm.thrift中定义的方法. 所以service-handler要实现这些方法!

  • 相关阅读:
    从开发人员角度对软件测试的些许理解
    ObjectiveC的语法
    HttpModule与HttpHandler使用
    我为什么学习HASKELL?
    Linux下C语言编程环境Make命令和Makefile
    一个简单的验证框架
    程序员之路
    ObjectiveC语法之ObjectiveC语言和IOS系统(简介,语法,系统结构)
    Teamcity
    Python进阶 错误处理
  • 原文地址:https://www.cnblogs.com/catkins/p/5252480.html
Copyright © 2011-2022 走看看