zoukankan      html  css  js  c++  java
  • Storm中并行度原来是这样计算的(1.0.1版本)

    ==思考问题1==

    向集群提交一个拓扑的时候,Storm是如何计算Task数以及Executor数的?

    具体有多少个worker,多少个executor,每个executor负责多少个task?

    ==思考问题2:==

    构建拓扑的时候,有3个地方会影响并行度,这3个地方之间有什么关系?

    builder.setSpout("spout", new RandomSentenceSpout(), 5); //parallelism-hint
    builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);
    builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);

    ==3个参数的信息==

    1、parallelism-hint:

    构建拓扑时,可以通过setSpout或setBolt的函数参数中指定。为初始executor数

    如:builder.setSpout("spout", new RandomSentenceSpout(), 5);

    2、 TOPOLOGY-TASKS:

    构建拓扑时,通过Spout/Bolt的setNumTasks()方法来指定。为component的task数(Spout或Bolt)。

    如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);

    3、TOPOLOGY-MAX-TASK-PARALLELISM:

    构建拓扑时,通过Spout/Bolt的setMaxTaskParallelism()方法来指定。为component的最大并行度通常用于测试,在本地模式时使用。

    如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);

    ==结论1:Executor数是多少?==

    对应topology代码中, 为每个component指定的parallelism-hint数(通过setBolt和setSpout的参数)

    ==结论2:Task数是多少?==

    版本号:apache-storm-1.0.1

    代码路径:org/apache/storm/daemon/nimbus.clj

    这里有一个函数非常重要,看了之后上面的3个关系多少会清晰很多。

    该函数返回计算之后的真实的Task数

    (defn- component-parallelism [storm-conf component]
      (let [storm-conf (merge storm-conf (component-conf component))
            num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
            max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
            ]
        (if max-parallelism
          (min max-parallelism num-tasks)
          num-tasks)))

    这个代码是用clojure语言编写的,没有用过的人估计会非常蛋疼,

    为了方便理解,用伪代码(方便理解)翻译之后,大概思路是这个样子的:

    num-tasks = (TOPOLOGY-TASKS != null ? TOPOLOGY-TASKS : parallelism-hint);
    max-parallelism = TOPOLOGY-MAX-TASK-PARALLELISM;
        
    if (max-parallelism != null) {
        //取两者较小
        return min(num-tasks, max-parallelism);
    } else {    
        return num-tasks;
    }

    如果将3个参数进行排列组合之后,获得结果如下:

    简单理解来说:

    1、暂时不考虑TOPOLOGY-MAX-TASK-PARALLELIS。(测试用的玩意儿,弄出来影响思路)

    2、TOPOLOGY-TASKS优先于parallelism-hint。

    ==Executor与Task是如何匹配的?==

    下面的代码是分配的代码

    (defn- compute-executors [nimbus storm-id]
      (let [conf (:conf nimbus)
            blob-store (:blob-store nimbus)
            storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
            component->executors (:component->executors storm-base)
            storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
            topology (read-storm-topology-as-nimbus storm-id blob-store)
            task->component (storm-task-info topology storm-conf)]
        (->> (storm-task-info topology storm-conf)
             reverse-map
             (map-val sort)
             (join-maps component->executors)
             (map-val (partial apply partition-fixed))
             (mapcat second)
             (map to-executor-id)
             )))

    理解这个代码之前,我们首先把注意力放在storm-task-info这个函数上,看看它都干了些什么。

    代码位置:org/apache/storm/daemon/common.clj

    (defn storm-task-info
      "Returns map from task -> component id"
      [^StormTopology user-topology storm-conf]
      (->> (system-topology! storm-conf user-topology)
           all-components
           (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
           (sort-by first)
           (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
           (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
           (into {})
           ))

    来看看广大网友的解读版。参考博客:https://www.cnblogs.com/ierbar0604/p/4386480.html

    这个函数, 首先读出所有components ,对每个component, 读出TOPOLOGY-TASKS(已经过标准化之后的TASK数,具体参照前面的内容),

    最后用递增序列产生taskid, 并最终生成component和task的对应关系。

    (如果不设置TOPOLOGY-TASKS,task数等于executor数,后面分配就很容易,否则就涉及task分配问题)

    storm-task-info函数的输出,是这个样子的:

    {1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}

    然后,我们把注意力返回到compute-executors函数(调用storm-task-info函数的调用处)。

    还是用上面博客中,网友解读的版本来帮助我们理解。(注意:需要对照源码,确认当前版本代码是否有变化)

    ==我的笔记==

    最后,从程序与StormUI界面对比来看看并行度的分配结果。

    (拓扑程序)

     (UI界面)

    ==简单总结==

    1、有3个地方可以影响Task数,根据3个参数的结果决定Task数。

    2、executor数 = 所有组件的parallelism-hint总数。

    3、task数在生命周期内不变,executor数可能改变。

    ==rebalance命令==

    storm运行过程中,而已使用rebalance命令动态调整拓扑的worker数及并发度。

    命令模板:storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*  (*表示可以设置多个)

    ## 重新配置拓扑 "mytopology",使得该拓扑拥有 5 个 worker processes,
    ## 另外,配置名为 "blue-spout" 的 spout 使用 3 个 executor,
    ## 配置名为 "yellow-bolt" 的 bolt 使用 10 个 executor。
    
    $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=1

    -w:标记覆盖Storm在禁用与关闭期间等待的时间长度。

     

    ==其他疑问==

    1、网上总是能看到,“不推荐使用setNumTasks”的方式来提高并发度。至于原因确实是一直没有搞明白。

    答:如果只单纯的使用setNumTasks,不调整parallelism-hint,会造成多个Task运行在1个executor的结果。并不一定能够提高性能。

    2、如果task数比executor数多,是否会有闲置executor?(需要用代码验证)

    答:不会有闲置executor。

    -------------

    参考博客:

    https://www.cnblogs.com/ierbar0604/p/4386480.html

    http://lib.csdn.net/article/60/42875

  • 相关阅读:
    spring aop
    Linux进程管理命令
    逻辑卷管理-LVM(Logical Volume Manager)
    Linux磁盘与文件系统管理(二)
    Linux磁盘与文件系统管理(一)
    Linux后台运行和关闭、查看后台任务
    Linux用户管理及用户信息查询
    文件备份与压缩
    Liunx信息显示与文件搜索
    文本处理三剑客之 awk
  • 原文地址:https://www.cnblogs.com/quchunhui/p/8271349.html
Copyright © 2011-2022 走看看