zoukankan      html  css  js  c++  java
  • Storm-源码分析-Topology Submit-Supervisor

    mk-supervisor

    (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
      (log-message "Starting Supervisor with conf " conf)
      (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) ;;初始化supervisor-id,并存在localstate中(参考ISupervisor的实现)
    (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) ;;清空本机的supervisor目录 (let [supervisor (supervisor-data conf shared-context isupervisor)
    ;;创建两个event-manager,用于在后台执行function [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] sync-processes (partial sync-processes supervisor) ;;partial sync-process ;;mk-synchronize-supervisor, mk-supervisor的主要工作,参考下面
    synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) ;;定义生成supervisor hb的funciton
    heartbeat-fn (fn [] (.supervisor-heartbeat! (:storm-cluster-state supervisor) (:supervisor-id supervisor) (SupervisorInfo. (current-time-secs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) ;; used ports (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)))))] ;;先调用heartbeat-fn发送一次supervisor的hb
        ;;接着使用schedule-recurring去定期调用heartbeat-fn更新hb
        (heartbeat-fn)
        ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
        (schedule-recurring (:timer supervisor)
                            0
                            (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                            heartbeat-fn))
     

    mk-synchronize-supervisor

    supervisor很简单, 主要管两件事,
    当assignment发生变化时, 从nimbus同步topology的代码到本地
    当assignment发生变化时, check workers状态, 保证被分配的work的状态都是valid

    两个需求,
    1. 当assignment发生变化时触发
        怎样通过zookeeper的watcher实现这个反复触发机制, 参考
    Storm-源码分析- Storm中Zookeeper的使用

    2. 因为比较耗时, 后台执行
        创建两个event-manager, 分别用于后台执行mk-synchronize-supervisor和sync-processes

    mk-synchronize-supervisor, 比较特别的是内部用了一个有名字的匿名函数this来封装这个函数体
    刚开始看到非常诧异, 其实目的是为了可以在sync-callback中将这个函数add到event-manager里面去
    即每次被调用, 都需要再一次把sync-callback注册到zk, 以保证下次可以被继续触发

    (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
      (fn this []
        (let [conf (:conf supervisor)
              storm-cluster-state (:storm-cluster-state supervisor)
              ^ISupervisor isupervisor (:isupervisor supervisor)
              ^LocalState local-state (:local-state supervisor) ;;本地缓存数据库
              sync-callback (fn [& ignored] (.add event-manager this)) ;;生成callback函数(后台执行mk-synchronize-supervisor)
              assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback) ;;读取assignments,并注册callback,在zk->assignment发生变化时被触发
              storm-code-map (read-storm-code-locations assignments-snapshot) ;;从哪儿下载topology code
              downloaded-storm-ids (set (read-downloaded-storm-ids conf)) ;;已经下载了哪些topology
              all-assignment (read-assignments  ;;supervisor的port上被分配了哪些executors
                               assignments-snapshot
                               (:assignment-id supervisor)) ;;supervisor-id
              new-assignment (->> all-assignment ;;new=all,因为confirmAssigned没有具体实现,always返回true
                                  (filter-key #(.confirmAssigned isupervisor %)))
              assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) ;;supervisor上被分配的topology id集合
               existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] ;;从local-state数据库里面读出当前保存的local assignments
          
          ;;下载新分配的topology代码
          (doseq [[storm-id master-code-dir] storm-code-map]
            (when (and (not (downloaded-storm-ids storm-id))
                       (assigned-storm-ids storm-id))
             (download-storm-code conf storm-id master-code-dir)))     
          (.put local-state     ;;把new-assignment存到local-state数据库中
                LS-LOCAL-ASSIGNMENTS
                new-assignment)
          (reset! (:curr-assignment supervisor) new-assignment) ;;把new-assignment cache到supervisor对象中
           
          ;;删除无用的topology code 
    ;;remove any downloaded code that's no longer assigned or active (doseq [storm-id downloaded-storm-ids] (when-not (assigned-storm-ids storm-id) (log-message "Removing code for storm id " storm-id) (rmr (supervisor-stormdist-root conf storm-id)) ))
          ;;后台执行sync-processes
          (.add processes-event-manager sync-processes)
          )))

    sync-processes

    sync-processes用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers
    首先从本地读出workers的hb, 来判断work状况, shutdown所有状态非valid的workers
    并为被assignment, 而worker状态非valid的slot, 创建新的worker

    (defn sync-processes [supervisor]
      (let [conf (:conf supervisor)
            ^LocalState local-state (:local-state supervisor)
            assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
            now (current-time-secs)
            allocated (read-allocated-workers supervisor assigned-executors now) ;;1.读取当前worker的状况
            keepers (filter-val     ;;找出状态为valid的worker
                     (fn [[state _]] (= state :valid))
                     allocated)
            keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) ;;keepers的ports集合
             ;;select-keys-pred(pred map), 对map中的key使用pred进行过滤
             ;;找出assigned-executors中executor的port, 哪些不属于keep-ports, 
            ;;即找出新被assign的workers或那些虽被assign但状态不是valid的workers(dead或没有start)
    ;;这些executors需要从新分配到新的worker上去
    reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) new-worker-ids (into {} (for [port (keys reassign-executors)] ;;为reassign-executors的port产生新的worker-id [port (uuid)])) ] ;; 1. to kill are those in allocated that are dead or disallowed ;; 2. kill the ones that should be dead ;; - read pids, kill -9 and individually remove file ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) ;; 3. of the rest, figure out what assignments aren't yet satisfied ;; 4. generate new worker ids, write new "approved workers" to LS ;; 5. create local dir for worker id ;; 5. launch new workers (give worker-id, port, and supervisor-id) ;; 6. wait for workers launch (doseq [[id [state heartbeat]] allocated] (when (not= :valid state) ;;shutdown所有状态不是valid的worker (shutdown-worker supervisor id))) (doseq [id (vals new-worker-ids)] (local-mkdirs (worker-pids-root conf id))) ;;为新的worker创建目录, 并加到local-state的LS-APPROVED-WORKERS中 (.put local-state LS-APPROVED-WORKERS ;;更新的approved worker, 状态为valid的 + new workers (merge (select-keys (.get local-state LS-APPROVED-WORKERS) ;;现有approved worker中状态为valid (keys keepers)) (zipmap (vals new-worker-ids) (keys new-worker-ids)) ;;new workers )) (wait-for-workers-launch ;;2.wait-for-workers-launch conf (dofor [[port assignment] reassign-executors] (let [id (new-worker-ids port)] (launch-worker supervisor (:storm-id assignment) port id) id))) ))

    1. read-allocated-workers

    (defn read-allocated-workers
      "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
      [supervisor assigned-executors now]
      (let [conf (:conf supervisor)
            ^LocalState local-state (:local-state supervisor)
    ;从local-state中读出每个worker的hb, 当然每个worker进程会不断的更新本地hb id->heartbeat (read-worker-heartbeats conf)
            approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] ;;从local-state读出approved的worker
        (into
         {}
         (dofor [[id hb] id->heartbeat] ;;根据hb来判断worker的当前状态
                (let [state (cond
                             (or (not (contains? approved-ids id))
                                 (not (matches-an-assignment? hb assigned-executors)))
                               :disallowed  ;;不被允许
                             (not hb)
                               :not-started ;;无hb,没有start
                             (> (- now (:time-secs hb))
                                (conf SUPERVISOR-WORKER-TIMEOUT-SECS))
                               :timed-out  ;;超时,dead
                             true
                               :valid)]
                  (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
                  [id [state hb]] ;;返回每个worker的当前state和hb
                  ))
         )))

    2. wait-for-workers-launch

    对reassign-executors中的每个new_work_id调用launch-worker

    最终调用wait-for-workers-launch, 等待worder被成功launch

    逻辑也比较简单, check hb, 如果没有就不停的sleep, 至到超时, 打印failed to start

    (defn- wait-for-worker-launch [conf id start-time]
      (let [state (worker-state conf id)]    
        (loop []
          (let [hb (.get state LS-WORKER-HEARTBEAT)]
            (when (and
                   (not hb)
                   (<
                    (- (current-time-secs) start-time)
                    (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
                    ))
              (log-message id " still hasn't started")
              (Time/sleep 500)
              (recur)
              )))
        (when-not (.get state LS-WORKER-HEARTBEAT)
          (log-message "Worker " id " failed to start")
          )))
    
    (defn- wait-for-workers-launch [conf ids]
      (let [start-time (current-time-secs)]
        (doseq [id ids]
          (wait-for-worker-launch conf id start-time))
        ))
  • 相关阅读:
    关于 锁的四种状态与锁升级过程 图文详解
    悲观锁与乐观锁的实现(详情图解)
    面试三轮我倒在了一道sql题上——sql性能优化
    我的程序跑了60多小时,就是为了让你看一眼JDK的BUG导致的内存泄漏。
    快来!我从源码中学习到了一招Dubbo的骚操作!
    我从LongAdder中窥探到了高并发的秘籍,上面只写了两个字...
    震惊!ConcurrentHashMap里面也有死循环,作者留下的“彩蛋”了解一下?
    mybatis 逆向工程使用姿势不对,把表清空了,心里慌的一比,于是写了个插件。
    吐血输出:2万字长文带你细细盘点五种负载均衡策略。
    mybatis开发,你用 xml 还是注解?我 pick ...
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3161241.html
Copyright © 2011-2022 走看看