zoukankan      html  css  js  c++  java
  • storm启动nimbus源码分析-nimbus.clj

    nimbus是storm集群的"控制器",是storm集群的重要组成部分。我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus。bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数:

    nimbus函数
    def nimbus(klass="backtype.storm.daemon.nimbus"):
       """Syntax: [storm nimbus]

       Launches the nimbus daemon. This command should be run under
       supervision with a tool like daemontools or monit.

       See Setting up a Storm cluster for more information.
       (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
       """
       cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
       jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
           "-Dlogfile.name=nimbus.log",
           "-Dlog4j.configuration=storm.log.properties",
       ]
       exec_storm_class(
           klass,
           jvmtype="-server",
           extrajars=cppaths,
           jvmopts=jvmopts)

    klass参数的默认值为backtype.storm.daemon.nimbus,backtype.storm.daemon.nimbus标识一个java类。STORM_DIR标识storm的安装目录,cppaths集合存放了log4j配置文件路径和storm配置文件storm.yaml路径,jvmopts存放传递给jvm的参数,包括log4j配文件路径、storm.yaml路径、log4j日志名称和log4j配置文件名称。

    exec_storm_class函数的逻辑比较简单,具体实现如下:

    exec_storm_class函数
    def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):  
       global CONFFILE  
       all_args = [  
           "java", jvmtype, get_config_opts(),  
           "-Dstorm.home=" + STORM_DIR,  
           "-Djava.library.path=" + confvalue("java.library.path", extrajars),  
           "-Dstorm.conf.file=" + CONFFILE,  
           "-cp", get_classpath(extrajars),  
       ] + jvmopts + [klass] + list(args)  
       print "Running: " + " ".join(all_args)  
       if fork:  
           os.spawnvp(os.P_WAIT, "java", all_args)  
       else:  
           os.execvp("java", all_args) # replaces the current process and never returns

    get_config_opts()获取jvm的默认配置信息,confvalue("java.library.path", extrajars)获取storm使用的本地库JZMQ加载路径,get_classpath(extrajars)获取所有依赖jar包的完整路径,然后拼接一个java -cp命令运行klass的main方法。
    klass默认值为backtype.storm.daemon.nimbus,所以exec_storm_class函数最终调用backtype.storm.daemon.nimbus类的main方法。

    backtype.storm.daemon.nimbus类定义在nimbus.clj文件中,定义如下:

    backtype.storm.daemon.nimbus类
    (ns backtype.storm.daemon.nimbus
     (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
     (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
     (:import [org.apache.thrift.exception])
     (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
     (:import [java.nio ByteBuffer])
     (:import [java.io FileNotFoundException])
     (:import [java.nio.channels Channels WritableByteChannel])
     (:use [backtype.storm.scheduler.DefaultScheduler])
     (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
               Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
     (:use [backtype.storm bootstrap util])
     (:use [backtype.storm.config :only [validate-configs-with-schemas]])
     (:use [backtype.storm.daemon common])
     (:gen-class
       :methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]]))
       ...
       ;; 其他方法
       ...
       (defn -main []
     (-launch (standalone-nimbus)))

    :gen-class指示Clojure生成Java类backtype.storm.daemon.nimbus,并且声明一个静态方法launch,launch方法接收一个实现backtype.storm.scheduler.INimbus接口的实例作为参数。launch函数的参数是由standalone-nimbus函数生成的。standalone-nimbus函数定义如下:返回一个实现INimbus接口的实例。

    standalone-nimbus函数
    (defn standalone-nimbus []
     ;; 实现INimbus接口
     (reify INimbus
       ;; prepare函数为空实现
       (prepare [this conf local-dir]
         )
       ;; allSlotsAvailableForScheduling获取所有可用的slot集合
       (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
         ;; supervisors标识集群所有supervisor的详细信息对象SupervisorDetails的集合
         (->> supervisors
              ;; 遍历supervisors,为supervisor的每个port生成对应的WorkerSlot对象,WorkerSlot包含两个属性节点id和port
              (mapcat (fn [^SupervisorDetails s]
                        (for [p (.getMeta s)]
                          (WorkerSlot. (.getId s) p))))
              set ))
       (assignSlots [this topology slots]
         )
       (getForcedScheduler [this]
         nil )
       ;; 获取supervisor主机名
       (getHostName [this supervisors node-id]
         (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
           (.getHost supervisor)))
       ))

    launch函数定义如下:

    launch函数
    (defn -launch [nimbus]
     ;;
     ;; read-storm-config函数用于读取storm集群的配置信息,参见其定义部分
     (launch-server! (read-storm-config) nimbus))
    launch-server!函数定义如下:  
    (defn launch-server! [conf nimbus]
     ;; 判断是否是分布式模式,如果是本地模式则抛出IllegalArgumentException
     (validate-distributed-mode! conf)
     ;; service-handler函数是由宏defserverfn定义的,返回一个实现了Nimbus类中的Iface接口的实例,Nimbus类是由thrift框架自动生成的,Iface接口封装了service Nimbus的全部接口。
     ;; nimbus thrift server端提供的接口服务都是由这个实例实现的。service-handler函数参见其定义部分,service Nimbus参见storm.thrift
     ;; service-handler绑定实现了Nimbus类中的Iface接口的实例
     (let [service-handler (service-handler conf nimbus)
           options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                       (THsHaServer$Args.)
                       (.workerThreads 64)
                       (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                       (.processor (Nimbus$Processor. service-handler))
                       )
          server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
       (add-shutdown-hook-with-force-kill-in-1-sec (fn []
                                                     (.shutdown service-handler)
                                                     (.stop server)))
       (log-message "Starting Nimbus server...")
       (.serve server)))

    read-storm-config定义如下: 

    read-storm-config函数
    (defn read-storm-config  
     []  
     ;; conf绑定storm集群配置信息
     (let [conf (clojurify-structure (Utils/readStormConfig))]  
       ;; validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息
       (validate-configs-with-schemas conf)  
       conf))

    read-storm-config函数调用了backtype.storm.utils.Utils类的静态方法readStormConfig,如下:

    readStormConfig方法
    public static Map readStormConfig() {  
           // 调用readDefaultConfig从defaults.yaml配置文件读取默认配置信息存入ret
           Map ret = readDefaultConfig();
           // 获取用户自定义配置文件路径
           String confFile = System.getProperty("storm.conf.file");  
           Map storm;  
           if (confFile==null || confFile.equals("")) {  
               storm = findAndReadConfigFile("storm.yaml", false);  
           } else {
               // 读取用户自定义配置信息
               storm = findAndReadConfigFile(confFile, true);  
           }  
           // 将用户自定义的配置信息覆盖更新到ret中
           ret.putAll(storm);
           // 将命令行方式提供的配置信息覆盖更新到ret中
           ret.putAll(readCommandLineOpts());  
           // 返回覆盖更新后的配置信息ret
           return ret;  
       }

    service-handler函数定义如下:

    defserverfn是一个宏,(defserverfn service-handler [conf inimbus] ... )返回一个名字为service-handler函数。宏扩展是在编译时进行的

    service-handler函数
    (defserverfn service-handler [conf inimbus]
     ;; 调用inimbus的prepare方法,inimbus是standalone-nimbus函数返回的实现INimbus接口的实例,当前版本prepare方法为空实现
     (.prepare inimbus conf (master-inimbus-dir conf))
     ;; 打印日志信息
     (log-message "Starting Nimbus with conf " conf)
     ;; nimbus绑定了一个map,这个map保存了nimbus端所必需的"属性",详见nimbus-data函数定义部分
     (let [nimbus (nimbus-data conf inimbus)]
       ;; 调用nimbus这个map中保存的backtype.storm.nimbus.DefaultTopologyValidator对象的prepare方法,通过查看backtype.storm.nimbus.DefaultTopologyValidator类,我们可以发现prepare默认为空实现
       (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
       ;; cleanup-corrupt-topologies!函数的主要功能就是将在nimbus服务器{storm.local.dir}/nimbus/stormdist/路径中不存在的topology id从zookeeper的/storms/路径中删除,即删除在nimbus服务器上缺失jar包、topology信息和配置信息的当前正在运行的topology,
       ;; cleanup-corrupt-topologies!函数参见其定义部分
       (cleanup-corrupt-topologies! nimbus)
       ;; 更新当前storm集群上topology的状态
       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
         ;; transition!函数主要功能就是负责topology状态转换,规定了当topology由一种状态转换成另一种新状态时,需要做哪些处理操作,参见其定义部分
         (transition! nimbus storm-id :startup))
       ;; 通过schedule-recurring函数向storm定时器添加了一个"周期任务"检查心跳,重新分配任务,清理不活跃的topology,mk-assignments函数的主要功能就是检查心跳和重新分配任务。关于storm定时器详细分析请见"storm定时器timer源码分析";关于mk-assignments函数请见"storm任务分配源码分析"
       ;; do-cleanup函数主要功能就是清理不活跃的topology,请参加其定义部分
       (schedule-recurring (:timer nimbus)
                           0
                           (conf NIMBUS-MONITOR-FREQ-SECS)
                           (fn []
                             (when (conf NIMBUS-REASSIGN)
                               (locking (:submit-lock nimbus)
                                 (mk-assignments nimbus)))
                             (do-cleanup nimbus)
                             ))
       ;; Schedule Nimbus inbox cleaner
       ;; 通过schedule-recurring函数向storm定时器添加一个"周期任务"删除nimbus服务器上的过期jar包
       (schedule-recurring (:timer nimbus)
                           0
                           (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
                           (fn []
                             (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
                             ))    
       (reify Nimbus$Iface
         ;; submitTopologyWithOpts函数负责topology的提交,有关该函数的详细分析请参见"storm源码分析之topology提交过程"
         (^void submitTopologyWithOpts
           [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
            ^SubmitOptions submitOptions]
           (try
             (assert (not-nil? submitOptions))
             (validate-topology-name! storm-name)
             (check-storm-active! nimbus storm-name false)
             (let [topo-conf (from-json serializedConf)]
               (try
                 (validate-configs-with-schemas topo-conf)
                 (catch IllegalArgumentException ex
                   (throw (InvalidTopologyException. (.getMessage ex)))))
               (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
                          storm-name
                          topo-conf
                          topology))
             (swap! (:submitted-count nimbus) inc)
             (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
                   storm-conf (normalize-conf
                               conf
                               (-> serializedConf
                                   from-json
                                   (assoc STORM-ID storm-id)
                                 (assoc TOPOLOGY-NAME storm-name))
                               topology)
                   total-storm-conf (merge conf storm-conf)
                   topology (normalize-topology total-storm-conf topology)
                   storm-cluster-state (:storm-cluster-state nimbus)]
               (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
               (log-message "Received topology submission for " storm-name " with conf " storm-conf)
               ;; lock protects against multiple topologies being submitted at once and
               ;; cleanup thread killing topology in b/w assignment and starting the topology
               (locking (:submit-lock nimbus)
                 (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
                 (.setup-heartbeats! storm-cluster-state storm-id)
                 (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
                                                 TopologyInitialStatus/ACTIVE :active}]
                   (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
                 (mk-assignments nimbus)))
             (catch Throwable e
               (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
               (throw e))))
         ;; submitTopology函数调用了submitTopologyWithOpts函数
         (^void submitTopology
           [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
           (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                    (SubmitOptions. TopologyInitialStatus/ACTIVE)))
         ;; killTopology函数见名知意,调用了killTopologyWithOpts函数
         (^void killTopology [this ^String name]
           (.killTopologyWithOpts this name (KillOptions.)))
         ;; storm-name绑定kill的topology名称,KillOptions是一个thrift数据结构,只有个属性wait_secs,表示延迟多长时间执行kill
         (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
           ;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常
           (check-storm-active! nimbus storm-name true)
           ;; 如果设置了延迟时间,wait-amt绑定延迟时间
           (let [wait-amt (if (.is_set_wait_secs options)
                            (.get_wait_secs options)                        
                            )]
             ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:kill状态,:kill状态是一个"临时状态",最终修改topology状态为:killed,:killed状态为"持久状态"
             ;; 通过state-transitions函数我们可以知道无论从哪种状态转换到:kill状态,都将调用kill-transition函数,kill-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:killed状态
             ;; 转换到:remove状态,这时将调用remove-storm!函数清理topology
             (transition-name! nimbus storm-name [:kill wait-amt] true)
             ))
         ;; rebalance函数可以重新设置topology的进程数和各个component的并行度,RebalanceOptions是thirft数据结构,有三个属性rebalance的延迟时间、新的进程数,新的并行度
         (^void rebalance [this ^String storm-name ^RebalanceOptions options]
           ;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常
           (check-storm-active! nimbus storm-name true)
           ;; 如果设置了延迟时间,wait-amt绑定延迟时间
           (let [wait-amt (if (.is_set_wait_secs options)
                            (.get_wait_secs options))
                 ;; 如果设置了新的进程数,num-workers绑定新进程数
                 num-workers (if (.is_set_num_workers options)
                               (.get_num_workers options))
                 ;; 如果设置了新的组件并行度,executor-overrides绑定新组件并行度
                 executor-overrides (if (.is_set_num_executors options)
                                      (.get_num_executors options)
                                      {})]
             (doseq [[c num-executors] executor-overrides]
               (when (<= num-executors 0)
                 (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
                 ))
             ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:rebalance状态,:rebalance状态是一个"临时状态",最终修改topology状态为:rebalancing,:rebalancing状态为"持久状态"
             ;; 通过state-transitions函数我们可以知道只允许从:active和:inactive状态转换到:rebalance状态,并调用rebalance-transition函数,rebalance-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:rebalancing状态
             ;; 转换到:do-rebalance状态,并调用do-rebalance函数(重新设置topology的进程数和组件并行度,然后调用mk-assignments函数重新进行任务分配),然后将topology状态修改成:rebalancing的前一个状态
             (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
             ))
           ;; 激活topology,将topology状态修改成:active,处理过程与killTopologyWithOpts、rebalance相似
         (activate [this storm-name]
           (transition-name! nimbus storm-name :activate true)
           )
         ;; 将topology状态修改成:inactive,deactivate处理过程与activate相似
         (deactivate [this storm-name]
           (transition-name! nimbus storm-name :inactivate true))
         ;; beginFileUpload()函数获取nimbus存放jar的目录
         (beginFileUpload [this]
           (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
             (.put (:uploaders nimbus)
                   fileloc
                   (Channels/newChannel (FileOutputStream. fileloc)))
             (log-message "Uploading file from client to " fileloc)
             fileloc
             ))
         ;; 上传jar包文件
         (^void uploadChunk [this ^String location ^ByteBuffer chunk]
           (let [uploaders (:uploaders nimbus)
                 ^WritableByteChannel channel (.get uploaders location)]
             (when-not channel
               (throw (RuntimeException.
                       "File for that location does not exist (or timed out)")))
             (.write channel chunk)
             (.put uploaders location channel)
             ))
         ;; 上传jar包完成,关闭Channel
         (^void finishFileUpload [this ^String location]
           (let [uploaders (:uploaders nimbus)
                 ^WritableByteChannel channel (.get uploaders location)]
             (when-not channel
               (throw (RuntimeException.
                       "File for that location does not exist (or timed out)")))
             (.close channel)
             (log-message "Finished uploading file from client: " location)
             (.remove uploaders location)
             ))
         ;; 获取文件输入流
         (^String beginFileDownload [this ^String file]
           (let [is (BufferFileInputStream. file)
                 id (uuid)]
             (.put (:downloaders nimbus) id is)
             id
             ))
         ;; 读取文件
         (^ByteBuffer downloadChunk [this ^String id]
           (let [downloaders (:downloaders nimbus)
                 ^BufferFileInputStream is (.get downloaders id)]
             (when-not is
               (throw (RuntimeException.
                       "Could not find input stream for that id")))
             (let [ret (.read is)]
               (.put downloaders id is)
               (when (empty? ret)
                 (.remove downloaders id))
               (ByteBuffer/wrap ret)
               )))
         ;; 获取storm集群配置信息
         (^String getNimbusConf [this]
           (to-json (:conf nimbus)))
         ;; 获取topology配置信息
         (^String getTopologyConf [this ^String id]
           (to-json (try-read-storm-conf conf id)))
         ;; 获取StormTopology
         (^StormTopology getTopology [this ^String id]
           (system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))

         (^StormTopology getUserTopology [this ^String id]
           (try-read-storm-topology conf id))
               ;; 获取当前集群的汇总信息包括supervisor汇总信息,nimbus启动时间,所有活跃topology汇总信息
         (^ClusterSummary getClusterInfo [this]
           (let [storm-cluster-state (:storm-cluster-state nimbus)
                       ;; supervisor-infos绑定supervisor id->SupervisorInfo对象键值对的map
                 ;; SupervisorInfo定义:(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
                 supervisor-infos (all-supervisor-info storm-cluster-state)
                 ;; TODO: need to get the port info about supervisors...
                 ;; in standalone just look at metadata, otherwise just say N/A?
                 ;; 根据SupervisorInfo数据创建SupervisorSummary数据
                 supervisor-summaries (dofor [[id info] supervisor-infos]
                                             (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
                                                   ]
                                               (SupervisorSummary. (:hostname info)
                                                                   (:uptime-secs info)
                                                                   (count ports)
                                                                   (count (:used-ports info))
                                                                   id )
                                               ))
                 ;; nimbus-uptime绑定nimbus启动时间                              
                 nimbus-uptime ((:uptime nimbus))
                 ;; bases绑定集群上所有活跃topology的StormBase数据集合
                 bases (topology-bases storm-cluster-state)
                 ;; topology-summaries绑定活跃topology的TopologySummary数据
                 topology-summaries (dofor [[id base] bases]
                                           (let [assignment (.assignment-info storm-cluster-state id nil)]
                                             (TopologySummary. id
                                                               (:storm-name base)
                                                               (->> (:executor->node+port assignment)
                                                                    keys
                                                                    (mapcat executor-id->tasks)
                                                                    count)
                                                               (->> (:executor->node+port assignment)
                                                                    keys
                                                                    count)                                                            
                                                               (->> (:executor->node+port assignment)
                                                                    vals
                                                                    set
                                                                    count)
                                                               (time-delta (:launch-time-secs base))
                                                               (extract-status-str base))
                                             ))]
             ;; 创建ClusterSummary数据
             (ClusterSummary. supervisor-summaries
                              nimbus-uptime
                              topology-summaries)
             ))
         ;; 获取指定storm-id的topology的TopologyInfo数据
         (^TopologyInfo getTopologyInfo [this ^String storm-id]
           ;; storm-cluster-state绑定StormClusterState对象
           (let [storm-cluster-state (:storm-cluster-state nimbus)
                 ;; task->component绑定任务id->组件名称键值对的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
                 task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id))
                 ;; bases绑storm-id的StormBase
                 base (.storm-base storm-cluster-state storm-id nil)
                 ;; assignment绑定该topology的AssignmentInfo信息,(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
                 assignment (.assignment-info storm-cluster-state storm-id nil)
                 ;; beats绑定该topology所有executor-id->心跳信息的map
                 beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
                 ;; all-components绑定该topology所有component-id集合
                 all-components (-> task->component reverse-map keys)
                 ;; errors绑定component-id->组件错误信息的map
                 errors (->> all-components
                             (map (fn [c] [c (get-errors storm-cluster-state storm-id c)]))
                             (into {}))
                 ;; executor-summaries绑定ExecutorSummary集合
                 executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
                                           (let [host (-> assignment :node->host (get node))
                                                 heartbeat (get beats executor)
                                                 stats (:stats heartbeat)
                                                 stats (if stats
                                                         (stats/thriftify-executor-stats stats))]
                                             (doto
                                                 (ExecutorSummary. (thriftify-executor-id executor)
                                                                   (-> executor first task->component)
                                                                   host
                                                                   port
                                                                   (nil-to-zero (:uptime heartbeat)))
                                               (.set_stats stats))
                                             ))
                 ]
             ;; 创建TopologyInfo对象
             (TopologyInfo. storm-id
                            (:storm-name base)
                            (time-delta (:launch-time-secs base))
                            executor-summaries
                            (extract-status-str base)
                            errors
                            )
             ))
         
         Shutdownable
         (shutdown [this]
           (log-message "Shutting down master")
           (cancel-timer (:timer nimbus))
           (.disconnect (:storm-cluster-state nimbus))
           (.cleanup (:downloaders nimbus))
           (.cleanup (:uploaders nimbus))
           (log-message "Shut down master")
           )
         DaemonCommon
         (waiting? [this]
           (timer-waiting? (:timer nimbus))))))

    nimbus-data函数定义如下:

    nimbus-data函数
    (defn nimbus-data [conf inimbus]
     (let [forced-scheduler (.getForcedScheduler inimbus)]
       ;; 保存storm集群的配置信息
       {:conf conf
        ;; 保存inimbus实例
        :inimbus inimbus
        ;; 初始化topology提交总数为0
        :submitted-count (atom 0)
        ;; 调用cluster.clj中的mk-storm-cluster-state函数创建StormClusterState实例,StormClusterState实例封装了与zookeeper交互的接口
        :storm-cluster-state (cluster/mk-storm-cluster-state conf)
        ;; 保存"提交锁",在topology提交时,需要先获取该锁,然后才能提交,这样可以防止一次提交多个topology,也保证了topology之间操作的互斥性
        :submit-lock (Object.)
        ;; 初始化心跳缓存
        :heartbeats-cache (atom {})
        ;; 创建下载TimeCacheMap缓存,关于TimeCacheMap缓存会在以后文章中单独分析,在此不做介绍
        :downloaders (file-cache-map conf)
        ;; 创建上传TimeCacheMap缓存
        :uploaders (file-cache-map conf)
        ;; 保存一个返回值为"当前时间"-"nimbus启动时间"的函数,调用该函数可以获取nimbus启动多长时间
        :uptime (uptime-computer)
        ;; 通过java反射创建一个NIMBUS-TOPOLOGY-VALIDATOR指定的validator对象,默认为backtype.storm.nimbus.DefaultTopologyValidator对象
        :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
        ;; mk-timer函数会创建一个"定时线程",关于定时线程会在以后文章中单位分析,在此不做介绍
        :timer (mk-timer :kill-fn (fn [t]
                                    (log-error t "Error when processing event")
                    ;; exit-process!函数通过调用java的Runtime类的exit(int status)方法终止进程,并传达状态码20
                                    (exit-process! 20 "Error when processing an event")
                                    ))
        ;; 由mk-scheduler函数创建scheduler调度器,通过分析mk-scheduler函数,可以发现在没有配置用户自定义的scheduler情况下,mk-scheduler函数默认返回DefaultScheduler,mk-scheduler函数参见其定义部分
        :scheduler (mk-scheduler conf inimbus)
        }))

    mk-scheduler函数定义如下:

    mk-scheduler函数
    (defn mk-scheduler [conf inimbus]
     ;; 当前版本getForcedScheduler函数返回nil
     (let [forced-scheduler (.getForcedScheduler inimbus)
           ;; scheduler绑定IScheduler接口的实现
       ;; cond等价于java中的switch,我们可以发现首先检查forced-scheduler,如果forced-scheduler为nil,则检查是否有用户自定义的scheduler,如果没有则
       ;; 使用默认的DefaultScheduler
           scheduler (cond
                       forced-scheduler
                       (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
                           forced-scheduler)
       
                       (conf STORM-SCHEDULER)
                       (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
                           (-> (conf STORM-SCHEDULER) new-instance))
       
                       :else
                       (do (log-message "Using default scheduler")
                           (DefaultScheduler.)))]
       ;; 先调用prepare函数
       (.prepare scheduler conf)
       ;; 然后返回scheduler
       scheduler
       ))

    cleanup-corrupt-topologies!函数定义如下:

    cleanup-corrupt-topologies!函数
    (defn cleanup-corrupt-topologies! [nimbus]
     ;; 获取nimbus这个map中保存的StormCluterState实例
     (let [storm-cluster-state (:storm-cluster-state nimbus)
           ;; code-ids绑定了nimbus服务器上{storm.local.dir}/nimbus/stormdist/目录下所有子目录的名称,即提交给nimbus的所有topology的id
           code-ids (set (code-ids (:conf nimbus)))
       ;; active-topologies绑定zookeeper上/storms/目录中所有文件名称,即当前storm集群上正在运行的topology的id
           active-topologies (set (.active-storms storm-cluster-state))
       ;; corrupt-topologies绑定active-topologies和code-ids的差集,即当前正在运行的,但丢失jar包、topology信息和配置信息的topology的id
           corrupt-topologies (set/difference active-topologies code-ids)]
       ;; 将id包含在corrupt-topologies集合的topology的分配信息从zookeeper的/assignments目录删除,同时将StormBase信息从zookeeper的/storms目录删除
       (doseq [corrupt corrupt-topologies]
         (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
         (.remove-storm! storm-cluster-state corrupt)
         )))

    transition!函数定义如下:

    transition!函数的作用十分重要,负责topology状态转换,在启动nimbus场景下,event的值为":startup"关键字,error-on-no-transition?的值为false。transition!函数有两个重载版本。

    transition!函数
    (defn transition!
     ([nimbus storm-id event]
        (transition! nimbus storm-id event false))
     ([nimbus storm-id event error-on-no-transition?]
        ;; 加锁
        (locking (:submit-lock nimbus)
          ;; system-events绑定一个集合#{:startup}
          (let [system-events #{:startup}
                ;; 在启动nimbus场景下,event绑定[:startup],event-args为nil
                [event & event-args] (if (keyword? event) [event] event)
                ;; 从zookeeper上获取topology的状态,一个map对象,绑定到status上
                status (topology-status nimbus storm-id)]
            ;; handles the case where event was scheduled but topology has been removed
            (if-not status
              ;; 如果status为nil则记录日志,transition!函数执行结束
              (log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
              ;; 如果status不为nil,get-event绑定一个函数
              (let [get-event (fn [m e]
                                (if (contains? m e)
                                  (m e)
                                  (let [msg (str "No transition for event: " event
                                                 ", status: " status,
                                                 " storm-id: " storm-id)]
                                    (if error-on-no-transition?
                                      (throw-runtime msg)
                                      (do (when-not (contains? system-events event)
                                            (log-message msg))
                                          nil))
                                    )))
                    ;; state-transitions函数返回一个状态转换映射map,这个map中规定了由一种状态可以转换到哪些状态,并且在状态转换后执行哪些处理(即调用哪个函数),参见其定义部分
                    ;; 通过分析state-transitions函数,我们可以发现只有当topology的当前状态为":killed"和":rebalancing"时,才允许转换到":startup"状态,如果当前状态是其他状态,transition将为nil
                    ;; 我们先讨论其他状态,这时transition为nil,接着transition通过if判断将绑定一个(fn [] nil)函数,这样new-status将为nil。所以在启动nimbus场景下,topology由其他状态转换到":startup"状态时,transition!函数什么都没做
                    transition (-> (state-transitions nimbus storm-id status)
                                   (get (:type status))
                                   (get-event event))
                    transition (if (or (nil? transition)
                                       (keyword? transition))
                                 (fn [] transition)
                                 transition)
                    new-status (apply transition event-args)
                    new-status (if (keyword? new-status)
                                 {:type new-status}
                                 new-status)]
                (when new-status
                  (set-topology-status! nimbus storm-id new-status)))))
          )))

    1、如果topology由":killed"转换到":startup"(kill topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定

    (fn [] (delay-event nimbus
                       storm-id
                       (:kill-time-secs status)
                       :remove)
       nil)

    new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :remove false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":killed"转换到":remove", 调用函数

    (fn []
       (log-message "Killing topology: " storm-id)
       ;; 删除zookeeper上该topology的StormBase信息和分配信息
       (.remove-storm! (:storm-cluster-state nimbus)
                       storm-id)
       nil)

    2、如果topology由":rebalancing"转换到":startup"(rebalance topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定

    (fn [] (delay-event nimbus
                   storm-id
                   (:delay-secs status)
                   :do-rebalance)
       nil)

    new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :do-rebalance false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":rebalancing"转换到":do-rebalance",调用函数

    (fn []
      (do-rebalance nimbus storm-id status)
      (:old-status status))

    由于这个函数返回:rebalancing状态的前一个状态,所以storm定时器所执行的定时任务会将topology的状态由:rebalancing修改成前一个状态。以上就是启动nimbus场景下,topology可能的状态转换处理过程。 delay-event函数定义如下:主要功能就是将#(transition! nimbus storm-id event false)函数作为"定时任务"添加到storm定时器中。

    (defn delay-event [nimbus storm-id delay-secs event]
     (log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
     (schedule (:timer nimbus)
               delay-secs
               #(transition! nimbus storm-id event false)
               ))

    state-transitions函数定义如下:

    state-transitions函数
    (defn state-transitions [nimbus storm-id status]
     {:active {:inactivate :inactive            
               :activate nil
               :rebalance (rebalance-transition nimbus storm-id status)
               :kill (kill-transition nimbus storm-id)
               }
      :inactive {:activate :active
                 :inactivate nil
                 :rebalance (rebalance-transition nimbus storm-id status)
                 :kill (kill-transition nimbus storm-id)
                 }
      :killed {:startup (fn [] (delay-event nimbus
                                            storm-id
                                            (:kill-time-secs status)
                                            :remove)
                                nil)
               :kill (kill-transition nimbus storm-id)
               :remove (fn []
                         (log-message "Killing topology: " storm-id)
                         (.remove-storm! (:storm-cluster-state nimbus)
                                         storm-id)
                         nil)
               }
      :rebalancing {:startup (fn [] (delay-event nimbus
                                                 storm-id
                                                 (:delay-secs status)
                                                 :do-rebalance)
                                    nil)
                    :kill (kill-transition nimbus storm-id)
                    :do-rebalance (fn []
                                    (do-rebalance nimbus storm-id status)
                                    (:old-status status))
                    }})

    do-cleanup函数定义如下:

    do-cleanup函数
    (defn do-cleanup [nimbus]
     (let [storm-cluster-state (:storm-cluster-state nimbus)
           conf (:conf nimbus)
           submit-lock (:submit-lock nimbus)]
       ;; to-cleanup-ids绑定需要清理的topology的id,即不再活跃的topology的id,cleanup-storm-ids函数参见其定义部分
       (let [to-cleanup-ids (locking submit-lock
                              (cleanup-storm-ids conf storm-cluster-state))]
         (when-not (empty? to-cleanup-ids)
           (doseq [id to-cleanup-ids]
             (log-message "Cleaning up " id)
             ;; 从zookeeper上删除/workerbeats/{id}节点(清理其心跳信息)
             (.teardown-heartbeats! storm-cluster-state id)
             ;; 从zookeeper上删除/errors/{id}节点(清理其错误信息)
             (.teardown-topology-errors! storm-cluster-state id)
             ;; 从nimbus服务器上删除{storm.local.dir}/nimbus/stormdist/{id}目录(删除其jar包,topology信息,配置信息)
             (rmr (master-stormdist-root conf id))
             ;; 将该topology的心跳信息从nimbus的心跳缓存中删除
             (swap! (:heartbeats-cache nimbus) dissoc id))
           ))))

    cleanup-storm-ids函数定义如下:

    cleanup-storm-ids函数
    (defn cleanup-storm-ids [conf storm-cluster-state]
     ;; heartbeat-ids绑定有心跳的topology的id集合
     (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
           ;; error-ids绑定有错误信息的topology的id集合
           error-ids (set (.error-topologies storm-cluster-state))
           ;; code-ids绑定在nimbus服务器上有jar包的topology的id集合
           code-ids (code-ids conf)
           ;; assigned-ids绑定当前活跃的topology的id集合
           assigned-ids (set (.active-storms storm-cluster-state))]
       ;; heartbeat-ids、error-ids、code-ids的并集再与assigned-ids做差集就是不活跃的topology的id
       (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
       ))

    以上就是nimbus启动源码分析。

  • 相关阅读:
    【ssh】端口转发
    【python】ftp连接,主被动,调试等级
    【ftp】主动模式和被动模式
    【python】声明编码的格式
    【python】gearman阻塞非阻塞,同步/异步,状态
    【mongo】centos6.9安装mongo2.6.3
    线性代数
    Improved GAN
    GAN (Generative Adversarial Network)
    Scaled Exponential Linear Unit
  • 原文地址:https://www.cnblogs.com/ierbar0604/p/3967300.html
Copyright © 2011-2022 走看看