zoukankan      html  css  js  c++  java
  • Storm系列(五)架构分析之Nimbus启动过程

    启动流程图

    image

    mk-assignments

    功能:对当前集群中所有Topology进行新一轮的任务调度。


    实现源码路径:
    apache-storm-0.9.4storm-coresrccljacktypestormdaemon nimbus.clj


    方法原型:

    defnk mk-assignments [nimbus :scratch-topology-id nil]
     
    方法说明:
    1. 参数nimbus为nimbus-data对象,scratch-topology-id为提交的Topology id.
    2. 从nimbus中依次获取conf、storm-cluster-state。
    3. 获取当前所有活跃的topology的id集合。
    4. 根据活跃的topology id调用read-topology-details方法,获取TopologyDetails信息并返回<storm-id,TopologyDetails>集合。
    5. 根据<storm-id,TopologyDetails>集合创建topologies对象。
    6. 获取已分配资源的Topology的id集合。
    7. 根据已分配资源的Topology id获取每个Topology的任务分配情况assigments,并返回<storm-id,Assigment>集合existing-assignments,除了scratch-topology-id指定的Topology不会获取它的Assigments。
    8. 调用compute-new-topology->executor->node+port方法获为所有Topology计算新的调度,返回topology->executor->node+port.
    9. 调用basic-supervisor-details-map从Zookeeper中获取所有SupervisorInfo信息,返回<supervisor-id,supervisorDetails>集合。
    10. 对第8步返回的结果集中的每一项进行遍历构造新的Assignment对象集合new-assignments,Assigmnet定义如下:
      (defrecord Assignent [master-code-dir node->host executor->node+port executor->start-time-secs])
      master-code-dir:Nimbus在本地保存Topology信息路劲,主要包括stormjar.jar、stormcode.ser、stormconf.ser.
      node->host:该Topology分配的<supervisor-id,hostname>集合.
      executor->node+port:该Topology中executor的分配情况,node为supervisor-id,port为端口号。
      executor->start-time-secs:该Topology对用的supervisor的启动时间.
    11. 比较new-assignments与existing-assignments中的每一项是否有差异,如果没有就打印一条提示信息,如果有就将该Topology在Zookeeper中保存的调度结果更新为new-assignments。
    12. 计算new-assignment中的每一项新增加的slot并进行分配。(新增的solt通过new-assignment中的node+port减去existing-assignment中的node+port得到,返回为<topology-id,WorkerSlot>集合)
      WorkerSlot格式为{ nodeId port }

    功能总结:
    获取已分配资源的Topology的任务分配情况<storm-id,Assigment>集合(existing-assignments),获取活跃的Topology信息<storm-id,TopologyDetails>集合创建topologies对象。然后调用compute-new-topology->executor->node+port方法获为所有Topology计算新的调度,返回topology->executor->node+port再构造Assigmnet对象集。

    compute-new-topology->executor->node+port

    函数原型:

    defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]

    参数说明:
    nimbus:nimbus-data对象。
    existing-assignments:当前已经分配的的任务,格式<topology-id,Assignment>。
    Topologies:当前活跃的Topology,格式<storm-id,TopologyDetails>.
    scratch-topology-id:需要重新调度的topology-id.

    1. 调用compute-topology->executors方法根据existing-assignments中的topology-id获取<topology-id,executors>集合,与调用compute-executors方法效果作用一样。
    2. 调用update-all-hearbeats!更新上一步中executor中的心跳信息.
    3. 调用compute-topology->alive-executors获取<topology-id,alive-executors>集合,每个topology-id对应的活跃的executor.
    4. 调用compute-supervisor->dead-ports获取<supervisor-id,dead-ports>集合。
    5. 调用compute-topology->scheduler-assignment获取<topology-id,Scheduler-AssignmentImpl>集合.(topology-id对用的任务分配情况Scheduler-AssignmentImpl == <ExecutorDetails,WorkerSlot>).
    6. 根据参数topologies中的topology-id进行条件过滤,该topology中所有executor为空或者topology中的所有executor不等于Topology中活跃的executor或者该Topology的num-use-workers小于其指定的num-worker,过滤后的结果集群赋值给missing-assignmnet-topologies.
    7. 调用all-scheduling-slots方法获取<node-id,port>集合。
    8. 调用read-all-supervisor-details方法获取<supervisor-id,supervisorDetails>集合。
    9. 根据参数nimbus、第5步、第8步的结果集构造Cluster对象。
    10. 调用nimbus中的scheduler方法进行任务调度。
    11. 从Cluster对象中获取重新调度完之后的所有Assignments作为new-scheduler-assignment,格式为<topology-id,SchedulerAssignment>集合。
    12. 调用compute-topology->executor->node+port将第11步的结果集转换为<topology-id,{executor[node port]}>集合。
    13. 调用basic-supervisor-details-map将Zookeeper中记录的所有SupervisorInfo都转换为SupervisorDetails,返回<supervisor-id,SuperviosrDetails>集合.

    流程图:

    image

    compute-executor

    函数原型:

    defn- compute-executors [nimbus storm-id]

    函数实现说明:

    1. 获取storm-id(topology-id)对用的stormBase中component-executors形象(每个组件的并行度)。
    2. 获取storm-id对应的storm-conf配置。
    3. 获取storm-id对应Topology.
    4. 调用storm-task-info获取<task-id,component-id>集合,其中task-id对该Topology的所有组件是全局递增的。
    5. 将第4步的结果集转换为<component-id,tasks>并按照升序排序。
    6. 将第1步的结果集<component-id,parallelism>与第5步的结果集进行join得到<component-id,[parallelism,tasks]>集合.
    7. 对第6步的结果集中的每一项进行处理,将tasks集合均匀分布到数目为parallelism的分区上。

    功能总结:

    获取storm-id对应Topology所有组件的并行度(线程数),获取该Topology中各组件TOPOLOGY_TASK信息,最后的结果使每个线程中均匀分布多个task运行。

  • 相关阅读:
    bugKu getshell
    XCTF 进阶区 CAT
    php弱类型比较
    XCTF command_execution
    关于错误 openssl/ssl.h:没有那个文件或目录的解决办法
    libffi-dev : 依赖: libffi6 (= 3.2.1-4) 但是 3.2.1-4kord 正要被安装
    如何查看 Ubuntu下已安装包版本号
    git 下载指定tag版本的源码
    ubuntu 环境 openstack 源码包制成 deb 包
    fedora 国内源
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4794839.html
Copyright © 2011-2022 走看看