zoukankan      html  css  js  c++  java
  • Storm源码分析 Scheduler (backtype.storm.scheduler)

    首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule

    对于schedule的参数注释写的非常清楚,
    topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息
    根据他们就可以来判断如何assignment

    package backtype.storm.scheduler;
    import java.util.Map;
    public interface IScheduler {   
        void prepare(Map conf);
        
        /**
         * Set assignments for the topologies which needs scheduling. The new assignments is available 
         * through <code>cluster.getAssignments()</code>
         *
         *@param topologies, all the topologies in the cluster, some of them need schedule. Topologies object here 
         *       only contain static information about topologies. Information like assignments, slots are all in
         *       the <code>cluster</code>object.
         *@param cluster, the cluster these topologies are running in. <code>cluster</code> contains everything user
         *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current 
         *       assignments for all the topologies etc. User can set the new assignment for topologies using
         *       <code>cluster.setAssignmentById</code>
         */
        void schedule(Topologies topologies, Cluster cluster);
    }

    DefaultScheduler

    DefaultScheduler, 实现backtype.storm.scheduler.IScheduler接口

    (ns backtype.storm.scheduler.DefaultScheduler  
      (:gen-class
        :implements [backtype.storm.scheduler.IScheduler]))
    (defn -prepare [this conf]
      )
    (defn -schedule [this ^Topologies topologies ^Cluster cluster]
      (default-schedule topologies cluster))

    下面看看default-schedule做了些什么?

    (defn default-schedule [^Topologies topologies ^Cluster cluster]
      (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] ;;1,取出需要scheduling的topologies
        (doseq [^TopologyDetails topology needs-scheduling-topologies
                :let [topology-id (.getId topology)
                      available-slots (->> (.getAvailableSlots cluster)
                                           (map #(vector (.getNodeId %) (.getPort %))))
                      all-executors (->> topology
                                         .getExecutors
                                         (map #(vector (.getStartTask %) (.getEndTask %)))
                                         set)
                      alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
                      alive-executors (->> alive-assigned vals (apply concat) set)
                      can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
                      total-slots-to-use (min (.getNumWorkers topology)
                                              (+ (count can-reassign-slots) (count available-slots)))
                      bad-slots (if (or (> total-slots-to-use (count alive-assigned)) 
                                        (not= alive-executors all-executors))
                                    (bad-slots alive-assigned (count all-executors) total-slots-to-use)
                                    [])]]
          (.freeSlots cluster bad-slots)
          (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
     

    1,取出需要scheduling的topologies (cluster.needsScheduling)

    判断是否需要scheduling, 满足(or)
    现在已经assigned的worker数小于配置的worker数 (dead slot或上次分配是可用slot不够)
    all executors > assigned executors(新的topology或已分配的executor dead(没有hb))
    public boolean needsScheduling(TopologyDetails topology) {
        int desiredNumWorkers = topology.getNumWorkers();
        int assignedNumWorkers = this.getAssignedNumWorkers(topology);
        if (desiredNumWorkers > assignedNumWorkers) {
            return true;
        }
        return this.getUnassignedExecutors(topology).size() > 0;
    }

    2, 对于每个需要scheduling的topology

    2.1 找出cluster中所有可用的slots, 从每个SupervisorDetails中读出可用的slots(即assignedports - usedPorts)

    available-slots, ([node1 port2] [node2 port2])

    2.2 读出该topology所有的executors
    all-executors , ([1 3] [4 4] [5 7])

    2.3 从cluster中读出该topology的assignment关系

    因为前面只将alive executors的assignment关系记录到cluster中, 所以从alive-assigned可用推出alive-executors
    alive-assigned, node+port->executor, {[node1 port1] [1 3], [node2 port1] [5 7]}
    alive-executors, ([1 3] [5 7])

    2.4 找出topology当前运行的slots中哪些是可用的 (slots-can-reassign)
    alive executors是有可能跑在dead slot上的, 所以不是所有alive executors的slot都可用
    reassign的条件, node不在cluster的blacklist, port是否在supervisor的allPort中(即不是dead port), 即这个slot是可用的
    可用的slot, 就可以用于reassign

    2.5 total-slots-to-use应该等于(available-slots + can-reassign-slots)
    当然最多slots数不能大于topology配置的worker number, 在可用slot数不够的情况下, 可能小于

    2.6找出bad slots
    针对不合理或bad的slots assignment关系, 找出相应的slots
    并在下一步释放掉这些不合理的slots assignment
    一般两种情况, 前一次分配时可用slots不够, 所以没有达到配置的数目;使用中某slot dead, 导致alive slot减少

    if (or (> total-slots-to-use (count alive-assigned));;当前可用slots数大于当前assign的slots数
           (not= alive-executors all-executors));;某些executors死了, 表明肯定有坏的slots, 或新的topology, 还没有分配
       (bad-slots alive-assigned (count all-executors) total-slots-to-use) [])
    (defn- bad-slots [existing-slots num-executors num-workers]
      (if (= 0 num-workers)
        '()
        (let [distribution (atom (integer-divided num-executors num-workers))
              keepers (atom {})]
          (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
            (when (pos? (get @distribution executor-count 0)) ;;是否在正常的distribution中可以找到, 找到说明这个slot分配合理, 需要keep
              (swap! keepers assoc node+port executor-list) ;;slot分配合理, 所以加到keeper中
              (swap! distribution update-in [executor-count] dec) ;;并且该分配比例的份数减一
              ))
          (->> @keepers
               keys
               (apply dissoc existing-slots) ;;在exisiting-slots中除去keeper, 剩下的都是bad-slots
               keys
               (map (fn [[node port]]
                      (WorkerSlot. node port)))))))

    例子, 7个executor, 3个worker, 那么正常情况下, ((2 2)(3 1)), 1份3个, 2份2个
    所以check所有现有的assignment, 把符合正常分配比例的加到keeper上, 比如这个case如果出现1个或2份3个都是不符合比例的
    这些slot都被认为是bad slots

    3 free bad slots

    所谓的free, 就是在SchedulerAssignmentImpl中, 把所有bad slot上的executors从executorToSlot中删除
    slot只要没有executor占用就是free

        Map<ExecutorDetails, WorkerSlot> executorToSlot; 
        /**
         * Release the slot occupied by this assignment.
         * @param slot
         */
        public void unassignBySlot(WorkerSlot slot) {
            List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
            for (ExecutorDetails executor : this.executorToSlot.keySet()) {
                WorkerSlot ws = this.executorToSlot.get(executor);
                if (ws.equals(slot)) {
                    executors.add(executor);
                }
            }       
            // remove
            for (ExecutorDetails executor : executors) {
                this.executorToSlot.remove(executor);
            }
        }

    4 EvenScheduler/schedule-topologies-evenly

    这个function是doseq的经典应用, 两层doseq的嵌套
    第一层doseq的处理函数, 仍然是一个doseq
    第二层doseq的处理函数, .assign

    (defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
      (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
        (doseq [^TopologyDetails topology needs-scheduling-topologies
                :let [topology-id (.getId topology)
                      new-assignment (schedule-topology topology cluster)
                      node+port->executors (reverse-map new-assignment)]]
          (doseq [[node+port executors] node+port->executors
                  :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
                        executors (for [[start-task end-task] executors]
                                    (ExecutorDetails. start-task end-task))]]
            (.assign cluster slot topology-id executors)))))


    4.1 调用schedule-topology

    (defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
      (let [topology-id (.getId topology)
            available-slots (->> (.getAvailableSlots cluster)
                                 (map #(vector (.getNodeId %) (.getPort %))))
            all-executors (->> topology
                              .getExecutors
                              (map #(vector (.getStartTask %) (.getEndTask %)))
                              set)
            alive-assigned (get-alive-assigned-node+port->executors cluster topology-id) ;;必须重新计算, 因为刚刚free的slot assignment关系
            total-slots-to-use (min (.getNumWorkers topology)
                                    (+ (count available-slots) (count alive-assigned)))
            reassign-slots (take (- total-slots-to-use (count alive-assigned)) ;;上一步已经把bad slot都free, 仍然alive说明改slot不需要被reassign
                                 (sort-slots available-slots))
            reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
            reassignment (into {}
                               (map vector
                                    reassign-executors
                                    ;; for some reason it goes into infinite loop without limiting the repeat-seq
                                    (repeat-seq (count reassign-executors) reassign-slots)))]
        (when-not (empty? reassignment)
          (log-message "Available slots: " (pr-str available-slots))
          )
        reassignment))

    reassign-slots,
    a. 计算现在可以用于assignment的slots数, 之所以不直接使用available-slots, 因为有worker number限制, 所以可能小于available-slots
    b. sort-slots, 按照port对slots进行排序

    (defn sort-slots [all-slots] ;'(["n1" "p1"] ["n1" "p2"] ["n1" "p3"] ["n2" "p1"] ["n3" "p1"] ["n3" "p2"])
      (let [split-up (vals (group-by first all-slots))]
        (apply interleave-all split-up) ;'(["n1" "p1"] ["n2" "p1"] ["n3" "p1"] ["n1" "p2"] ["n3" "p2"] ["n1" "p3"])
        ))
    c.从排过序的list取前(a.)个slots, 之所以前面按port排序, 可以使executors尽量分布在不同的node上

    reassign-executors, 现在没有被assign的executors

    assignment的过程非常简单, 就是对reassign-executors 和 reassign-slots做map, 注释中解释为什么要加count, 其实应该是不用加的, 因为理论上是当一个coll结束就会停止, 但某种原因似乎这里会停不下来.
    之所以需要repeat-seq, 是因为executors往往多于slots

    (map vector #{[1,3] [4,4] [5,6]}(repeat-seq 3 '(["n1" "p1"] ["n1" "p2"])))
    ([[4 4] ["n1" "p1"]] [[5 6] ["n1" "p2"]] [[1 3] ["n1" "p1"]])

    4.2 将新的assignment封装成WorkerSlot和ExecutorDetails

    4.3 最终将新的assignment结果放到SchedulerAssignmentImpl的executorToSlot中去

        /**
         * Assign the slot to executors.
         * @param slot
         * @param executors
         */
        public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
            for (ExecutorDetails executor : executors) {
                this.executorToSlot.put(executor, slot);
            }
        }
  • 相关阅读:
    条件概率融合
    径向函数,随机映射,SVM, 或者特征提取都是一类策略
    神经网络结构搜索
    NetCDF
    你应该看到更多
    Focal Loss for Dense Object Detection
    胶囊网络
    seq2seq模型
    编译lua解析器和lua动态库
    JavaScript Json数组的简单使用
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3136008.html
Copyright © 2011-2022 走看看