zoukankan      html  css  js  c++  java
  • Storm详解

    1、Storm并行度相关的概念

    Storm集群有很多节点,按照类型分为nimbus(主节点)、supervisor(从节点),在conf/storm.yaml中配置了一个supervisor,有多个槽(supervisor.slots.ports),每个槽就是一个JVM,就是一个worker(一个节点,运行一个worker),在每个worker里面可以运行多个线程叫做executor,在executor里运行一个topology的一个component(spout、bolt)叫做task。task  是storm中进行计算的最小的运行单位,表示是spout或者bolt的运行实例。

    总结一下,supervisor(节点)>worker(进程)>executor(线程)>task(实例)

    程序执行的最大粒度的运行单位是进程,刚才说的task也是需要有进程来运行它的,在supervisor中,运行task的进程称为worker
    Supervisor节点上可以运行非常多的worker进程,一般在一个进程中是可以启动多个线程的,所以我们可以在worker中运行多个线程,这些线程称为executor在executor中运行task。
     
     
    提高storm的并行度,可 考虑如下几点:
    worker(进程)>executor(线程)>task(实例)
    增加work进程,增加executor线程,增加task实例
     
    看下面的图:
    这表示是一个work进程,其实就是一个jvm虚拟机进程,在这个work进程里面有多个executor线程,每个executor线程会运行一个或多个task实例。一个task是最终完成数据处理的实体单元。(默认情况下一个executor运行一个task).
     
     
    worker,executor,task解释
     
    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的
     
    executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例)。
     
    task是最终运行spout或bolt中代码的单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。 
     
     
     
    刚才从理论说明了如何提高集群的并行度,在这里我们就来看一下这些东西worker(进程)>executor(线程)>task(实例) 是如何设置的
    l  worker(进程):这个worker进程数量是在集群启动之前配置好的,在哪配置的呢?是在storm/conf/storm.yaml文件中,参数是supervisor.slots.port,如果我们不在这进行配置的话,这个参数也是有默认值的,在strom-0.9.3的压缩包中的lib目录下,有一个strom-core.jar,打开这个jar文件,在里面有一个defaults.yaml文件中是有一些默认配置的。
    默认情况下一个storm项目只使用一个work进程,也可以通过代码进行修改,通过config.setNumWorkers(workers)设置。(最好一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输)
     
    注意:如果worker使用完的话再提交topology就不会执行,因为没有可用的worker,只能处于等待状态,把之前运行的topology停止一个之后这个就会继续执行了,
     
    l executor(线程):默认情况下一个executor运行一个task,可以通过在代码中设置builder.setSpout(id,spout, parallelism_hint);或者builder.setBolt(id,bolt,parallelism_hint);来提高线程数的。
     
    l task(实例):通过boltDeclarer.setNumTasks(num);来设置实例的个数
     
    默认情况下,一个supervisor节点会启动4个worker进程。每个worker进程会启动1个executor,每个executor启动1个task。
     
    Ok,这几个参数都可以使用一些方法进行增加。
     
     
    下面来举个例子看一下对这些配置修改之后的效果
     
    l  worker(进程),通过在代码中设置,可以在ui界面上查看worker的总数,并且还可以在linux服务器上执行jps查看work进程。
    在代码中设置使用3个worker,查看ui界面,发现workers是3个,executors使用了5个,为什么呢?因为每一个worker默认都会占用一个executor(这个executor会启动一个acker任务),这样就会占用三个,剩下的两个是spout和bolt实例占用了。
     
    如果使用5个worker,executor会使用7个,因为worker本身就会占用5个,spout和bolt占用两个。
     
     
    Acker任务默认是每个worker进程启动一个executor线程来执行,,可以在topology中取消acker任务,这样的话就不会多出来一个executor和任务了。
    代码如下:
    实际上就是修改一个配置
    topology.acker.executors
    这样的话在页面查看就只有两个executor和2个task了。
     
     
    l  executor(线程),在spout和bolt中设置线程数,都设置为2个,查看ui界面
    现在使用的executor和tasks就是7个了,因为worker本身使用3个,spout和bolt分别使用2个。
     
     
    l  task(实例),在sum中设置实例个数为5,查看ui界面
     
    发现ui界面上显示的tasks是10,因为spout占用2个,bolt占用5个,剩下的3个由acker任务占用
     
    注意:虽然在这设置了多个task实例,但是并行度并没有很大提高,因为只有两个线程去运行这些实例,只有设置足够多的线程和实例才可以真正的提高并行度。
    在这设置多个实例主要是为了下面执行rebalance的时候用到,因为rebalance不需要修改代码,就可以动态修改topology的并行度,这样的话就必须提前配置好多个实例,在rebalance的时候主要是对之前设置多余的任务实例分配线程去执行。
     
    在命令行动态修改并行度
    除了使用代码进行调整,还可以在shell命令行下对并行度进行调整。
    storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2
    表示 10秒之后对mytopology进行并行度调整。把spout调整为2个executor,把bolt调整为2个executor
    注意:并行度主要就是调整executor的数量,但是调整之后的executor的数量必须小于等于task的数量,如果分配的executor的线程数比task数量多的话也只能分配和task数量相等的executor。

    storm中共分为三种Scheduler:

    1. EvenScheduler 会将系统资源均匀的分配给多个Topology
    2. DefaultScheduler 首先释放掉其他Topology不需要的资源,之后调用EvenScheduler的方法进行资源分配
    3. IsolationScheduler 可以单独对某些Topology制定使用多少台机器,IsolationScheduler会优先对这些机器进行资源分配,这些Topology的资首先源分配完毕后再调用DefaultScheduler进行资源分配

    如何创建Scheduler

    还记得分布式启动nimbus时会创建standalone-nimbus,他实现了INimbus接口,不过在实现getForcedScheduler方法时把返回值设为Nil。

    不过,在mk-scheduler时会真正创建Scheduler:

    (defn mk-scheduler [conf inimbus]
      (let [forced-scheduler (.getForcedScheduler inimbus)
        scheduler (cond
                    forced-scheduler
                    (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
                        forced-scheduler);;首先会尝试直接从INimbus中获取
                    (conf STORM-SCHEDULER);;如果INimbus中没有,则根据conf中的STORM-SCHEDULER配置向中创建
                    (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
                        (-> (conf STORM-SCHEDULER) new-instance))
                    :else
                    (do (log-message "Using default scheduler");;如果conf中也没有相关配置,则会使用默认的DefaultScheduler
                        (DefaultScheduler.)))]
    (.prepare scheduler conf);;而后执行schedulerprepare函数进行准备工作
    scheduler
    ))

    IScheduler

    IScheduler接口总共定义了两个方法:

    1. void prepare(Map conf); //准备工作
    2. void schedule(Topologies topologies, Cluster cluster); //为集群内的Topology分配资源

    EvenScheduler

    EvenScheduler是最基础的Scheduler,所以先来看看他是如何实现的。

    (defn -prepare [this conf]
      )

    prepare方法是空实现,说明EvenScheduler不用做任何准备工作

    (defn -schedule [this ^Topologies topologies ^Cluster cluster]
      (schedule-topologies-evenly topologies cluster))

    schedule方法会调用函数schedule-topologies-evenly:

    (defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
      (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)];;从cluster中查找需要被调度的topology,分为两个方面:1.实际的worker数低于预想worker数。2.worker数满足要求,但是实际的executor数少于预想的executor
    (doseq [^TopologyDetails topology needs-scheduling-topologies
            :let [topology-id (.getId topology)
                  new-assignment (schedule-topology topology cluster);;调度topology并返回新分配的资源
                  node+port->executors (reverse-map new-assignment)]];;获取workerexecutor的对应关系
      (doseq [[node+port executors] node+port->executors
              :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port));;创建WorkerSlot
                    executors (for [[start-task end-task] executors]
                                (ExecutorDetails. start-task end-task))]];;创建ExecutorDetails
        (.assign cluster slot topology-id executors)))));;分配资源

    调度topology的方法:

    (defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
      (let [topology-id (.getId topology)
        available-slots (->> (.getAvailableSlots cluster);;获得所有可用的WorkerSlot
                             (map #(vector (.getNodeId %) (.getPort %))));;转成vector的格式
        all-executors (->> topology
                          .getExecutors
                          (map #(vector (.getStartTask %) (.getEndTask %)))
                          set);;获取executor并转为#{[startTask, endTask]}的格式
        alive-assigned (get-alive-assigned-node+port->executors cluster topology-id);;获取存活的node+portexecutor的对应关系
        total-slots-to-use (min (.getNumWorkers topology)
                                (+ (count available-slots) (count alive-assigned)));;获取Topology所能使用的全部slot
        reassign-slots (take (- total-slots-to-use (count alive-assigned))
                             (sort-slots available-slots));;将要分配的slot
        reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))));;将要分配的executor
        reassignment (into {}
                           (map vector
                                reassign-executors
                                ;; for some reason it goes into infinite loop without limiting the repeat-seq
                                (repeat-seq (count reassign-executors) reassign-slots)))]
    (when-not (empty? reassignment)
      (log-message "Available slots: " (pr-str available-slots))
      )
    reassignment));;如果存在,则返回

    Cluster类中的assign方法如下:

    public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
        if (this.isSlotOccupied(slot)) {//判断slot是否已经被占
            throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
        }
        SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)this.getAssignmentById(topologyId);//获取这个topology的部署信息
        if (assignment == null) {//如果没有则创建
            assignment = new SchedulerAssignmentImpl(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
            this.assignments.put(topologyId, assignment);
        } else {//如果存在则检查要分配的这些executor是否已经被当前topology部署过了
            for (ExecutorDetails executor : executors) {
                 if (assignment.isExecutorAssigned(executor)) {
                     throw new RuntimeException("the executor is already assigned, you should unassign it before assign it to another slot.");
                 }
            }
        }
        assignment.assign(slot, executors);//部署
    }
    class SchedulerAssignmentImpl...
    public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
        for (ExecutorDetails executor : executors) {//部署过程就是把executor和slot放入当前保存的状态中
            this.executorToSlot.put(executor, slot);
        }
    }

    DefaultScheduler

    DefaultScheduler作为默认的调度器他与EvenScheduler最大的区别就是会首先计算bad-slots,default-schedule函数与EvenScheduler中的schedule-topology几乎一样:

    (defn default-schedule [^Topologies topologies ^Cluster cluster]
      (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
        (doseq [^TopologyDetails topology needs-scheduling-topologies
            :let [topology-id (.getId topology)
                  available-slots (->> (.getAvailableSlots cluster)
                                       (map #(vector (.getNodeId %) (.getPort %))))
                  all-executors (->> topology
                                     .getExecutors
                                     (map #(vector (.getStartTask %) (.getEndTask %)))
                                     set)
                  alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
                  alive-executors (->> alive-assigned vals (apply concat) set);;获取存活的executor
                  can-reassign-slots (slots-can-reassign cluster (keys alive-assigned));;通过slots-can-reassign方法获得可以重新分配的slot
                  total-slots-to-use (min (.getNumWorkers topology)
                                          (+ (count can-reassign-slots) (count available-slots)));;获取Topology所能使用的全部slot
                  bad-slots (if (or (> total-slots-to-use (count alive-assigned));;如果预设的slot大于存活的slot 
                                    (not= alive-executors all-executors));;或预设的executor不等于存活的executor
                                (bad-slots alive-assigned (count all-executors) total-slots-to-use);;执行bad-slots方法获取bad-slot
                                [])]]
      (.freeSlots cluster bad-slots);;把bad-slot释放掉
      (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))));;执行EvenScheduler中的分配资源方法

    如何获取可以被重新分配的slot:

    (defn slots-can-reassign [^Cluster cluster slots]
      (->> slots
      (filter
        (fn [[node port]]
          (if-not (.isBlackListed cluster node);;如果slot所在的node不在黑名单中
            (if-let [supervisor (.getSupervisorById cluster node)]
              (.contains (.getAllPorts supervisor) (int port));;并且portsupervisorport列表中
              ))))))

    bad-slots方法如下:

    (defn- bad-slots [existing-slots num-executors num-workers];;参数分别为现存的slot,预设的executor,预设的slot
      (if (= 0 num-workers)
    '()
    (let [distribution (atom (integer-divided num-executors num-workers));;integer-divided函数会把executor均匀的分配到worker上,并把预订方案赋值给distribution
          keepers (atom {})]
      (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
        (when (pos? (get @distribution executor-count 0))
          (swap! keepers assoc node+port executor-list)
          (swap! distribution update-in [executor-count] dec)
          ));;这一步会把现存的existing-slots进行筛选,如果存在满足预订方案的现存部署,则从distribution中剔除,并放置到keepers中
      (->> @keepers
           keys
           (apply dissoc existing-slots);;从现存的existing-slots剔除keepers,剩下的就是不满足预订方案的部署,需要进行重新分配的
           keys
           (map (fn [[node port]]
                  (WorkerSlot. node port)))))))

    IsolationScheduler

    这个Scheduler在初始化时就与前两者不同,他创建了一个container并赋值到保存在state中,并在调用prepare函数时把conf放到container中。

    (defn -init []
      [[] (container)])
    (defn -prepare [this conf]
      (container-set! (.state this) conf))

    IsolationScheduler的schedule方法非常之长,让我们来一点一点进行分析:

    (defn -schedule [this ^Topologies topologies ^Cluster cluster]
      (let [conf (container-get (.state this));;获取conf        
        orig-blacklist (HashSet. (.getBlacklistedHosts cluster));;获取blacklist
        iso-topologies (isolated-topologies conf (.getTopologies topologies));;从confISOLATION-SCHEDULER-MACHINES中筛选出需要隔离的topology
        iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set);;获取隔离的topology的id
        topology-worker-specs (topology-worker-specs iso-topologies);;获取隔离的topology对应的executor
        topology-machine-distribution (topology-machine-distribution conf iso-topologies);;的出machineworker的最佳分配
        host-assignments (host-assignments cluster)];;获取当前集群中机器资源的分配情况
    (doseq [[host assignments] host-assignments]
      (let [top-id (-> assignments first second);;从分配情况中获取topologyid
            distribution (get topology-machine-distribution top-id);;获取理想情况下topologymachineworker的最佳分配
            ^Set worker-specs (get topology-worker-specs top-id);;获取这个topology对应的executor信息
            num-workers (count assignments);;获取当前部署情况中的worker
            ]
        (if (and (contains? iso-ids-set top-id);;如果隔离topology包含这个topology
                 (every? #(= (second %) top-id) assignments);;并且当前部署的assignment都属于这个topology
                 (contains? distribution num-workers);;理想部署情况中包含当前部署的worker
                 (every? #(contains? worker-specs (nth % 2)) assignments));;executor也满足理想分配
          ;;这些条件都符合则进行一下操作:
          (do (decrement-distribution! distribution num-workers);;如果满足从理想部署情况,则把num-workers1
              (doseq [[_ _ executors] assignments] (.remove worker-specs executors));;从worker-specs中把理想部署情况的executor去除
              (.blacklistHost cluster host));;把host加入blacklist,不会对这台机器做任何新任务的分配
          ;;如果不满足条件
          (doseq [[slot top-id _] assignments];;便利这台机器上的所有topology
            (when (contains? iso-ids-set top-id);;如果topology不属于隔离的
              (.freeSlot cluster slot);;释放这个slot
              ))
          )))
    (let [host->used-slots (host->used-slots cluster);;hostused-slot的映射
          ^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)];;可用的hostslot的映射
      ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
      (doseq [[top-id worker-specs] topology-worker-specs
              :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]];;从理想配置中选出隔离的topID对应的Worker并按降序排列
        (doseq [amt amts;;从worker中选择一个
                :let [[host host-slots] (.peek sorted-assignable-hosts)]];;从可用的host中查找第一个
          (when (and host-slots (>= (count host-slots) amt));;如果可用的host-slot存在,并且数量大于理想中topology需要的worker
            (.poll sorted-assignable-hosts);;从可用的host中取出第一个
            (.freeSlots cluster (get host->used-slots host));;从cluster中释放这个host对应的slot
            (doseq [slot (take amt host-slots);;获取slot
                    :let [executors-set (remove-elem-from-set! worker-specs)]];;获取executor
              (.assign cluster slot top-id executors-set));;在cluster中设置topology对应的slotexecutor
            (.blacklistHost cluster host));;黑名单中去掉这个host
          )))
    (let [failed-iso-topologies (->> topology-worker-specs
                                  (mapcat (fn [[top-id worker-specs]]
                                    (if-not (empty? worker-specs) [top-id])
                                    )))];;得到隔离的topology中没有被分配完的topology
      (if (empty? failed-iso-topologies);;如果未被分配的隔离top为空
        ;; run default scheduler on non-isolated topologies
        (-<> topology-worker-specs
             allocated-topologies
             (leftover-topologies topologies <>)
             (DefaultScheduler/default-schedule <> cluster));;所有topology中除去隔离的topology,其他的使用DefaultScheduler进行资源分配
        ;;如果有未被分配的隔离top为空
        (do
          (log-warn "Unable to isolate topologies " (pr-str failed-iso-topologies) ". No machine had enough worker slots to run the remaining workers for these topologies. Clearing all other resources and will wait for enough resources for isolated topologies before allocating any other resources.")
          ;; clear workers off all hosts that are not blacklisted
          (doseq [[host slots] (host->used-slots cluster)]
            (if-not (.isBlacklistedHost cluster host)
              (.freeSlots cluster slots)
              )));;释放所有不在blacklist中的hostslot,为下一次资源调度做准备
        ))
    (.setBlacklistedHosts cluster orig-blacklist);;把blacklist恢复到未被分配前的状态
    ))

    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    svn出现权限不足时的解决方法
    子线程简单实现(ZT)
    Ubuntu下安装Apache mysql php的命令
    修改主机名Ubuntu
    form:select form:options 标签数据回显
    form:select form:options 标签数据回显
    checkbox选择根据后台List数据进行回显
    checkbox选择根据后台List数据进行回显
    Java随机数
    Java随机数
  • 原文地址:https://www.cnblogs.com/candlia/p/11920292.html
Copyright © 2011-2022 走看看