zoukankan      html  css  js  c++  java
  • Storm-源码分析-Topology Submit-Task-TopologyContext (backtype.storm.task)

    1. GeneralTopologyContext

    记录了Topology的基本信息, 包含StormTopology, StormConf
    已经从他们推导出的, task和component, component的streams, input/output信息

    public class GeneralTopologyContext implements JSONAware {
        private StormTopology _topology; 
        private Map<Integer, String> _taskToComponent;
        private Map<String, List<Integer>> _componentToTasks;
        private Map<String, Map<String, Fields>> _componentToStreamToFields; //ComponentCommon.streams, map<string, StreamInfo>
        private String _stormId;   ;;topology id
        protected Map _stormConf;  
    
    }

    StormTopology, worker从磁盘stormcode.ser中读出

    struct StormTopology {
      //ids must be unique across maps
      // #workers to use is in conf
      1: required map<string, SpoutSpec> spouts;
      2: required map<string, Bolt> bolts;
      3: required map<string, StateSpoutSpec> state_spouts;
    }

    StormConf, worker从磁盘stormconf.ser中读出

    taskToComponent, componentToTasks, task和component的对应关系

    componentToStreamToFields, component包含哪些streams, 每个stream包含哪些fields

    除了显而易见的操作以外, 还有如下操作以获得component的输入和输出

        /**
         * Gets the declared inputs to the specified component.
         *
         * @return A map from subscribed component/stream to the grouping subscribed with.
         */
        public Map<GlobalStreamId, Grouping> getSources(String componentId) {
            return getComponentCommon(componentId).get_inputs();  //ComponentCommon.inputs,map<GlobalStreamId, Grouping>
        }
        /**
         * Gets information about who is consuming the outputs of the specified component,
         * and how.
         *
         * @return Map from stream id to component id to the Grouping used.
         */
        public Map<String, Map<String, Grouping>> getTargets(String componentId) {
            Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
            for(String otherComponentId: getComponentIds()) {  //对所有components的id
                Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();  //取出component的inputs
                for(GlobalStreamId id: inputs.keySet()) {  //对inputs里面的每个stream-id
                    if(id.get_componentId().equals(componentId)) {    //判断stream的源component是否是该component
                        Map<String, Grouping> curr = ret.get(id.get_streamId());
                        if(curr==null) curr = new HashMap<String, Grouping>();
                        curr.put(otherComponentId, inputs.get(id));
                        ret.put(id.get_streamId(), curr);
                    }
                }
            }
            return ret; // [steamid, [target-componentid, grouping]]
        }

    这里面的getComponentCommon和getComponentIds, 来自ThriftTopologyUtils类
    不要误解, 不是通过thriftAPI去nimbus获取信息, 只是从StormTopology里面读信息, 而StormTopology类本身是generated by thrift
    thrift产生的class, 是有metaDataMap的, 所以实现如下

        public static Set<String> getComponentIds(StormTopology topology) {
            Set<String> ret = new HashSet<String>();
            for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
                Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
                ret.addAll(componentMap.keySet());
            }
            return ret;
        }
    通过metaDataMap读出StormTopology里面有哪些field, spouts,bolts,state_spouts, 然后遍历getFieldValue, 将value中的keyset返回
    这样做的好处是, 动态, 当StormTopology发生变化时, 代码不用改, 对于普通java class应该无法实现这样的功能, 但是对于python这样的动态语言, 就简单了
    当然这里其实也可以不用ThriftTopologyUtils, 直接写死从StormTopology.spouts…中去读

     

    从storm.thrift里面看看ComponentCommon的定义, 上面两个函数就很好理解了
    getTargets的实现, 需要看看, 因为是从inputs去推出outputs
    因为在ComponentCommon只记录了output的streamid以及fields, 但无法知道这个stream发往哪个component
    但对于input, streamid是GlobalStreamId类型, GlobalStreamId里面不但包含streamid,还有源component的componentid
    所以从这个可以反推, 只要源component是当前component, 那么说明该component是源component的target component

    struct ComponentCommon {
      1: required map<GlobalStreamId, Grouping> inputs;
      2: required map<string, StreamInfo> streams; //key is stream id, outputs
      3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
      4: optional string json_conf;
    }
    
    struct SpoutSpec {
      1: required ComponentObject spout_object;
      2: required ComponentCommon common;
      // can force a spout to be non-distributed by overriding the component configuration
      // and setting TOPOLOGY_MAX_TASK_PARALLELISM to 1
    }
    
    struct Bolt {
      1: required ComponentObject bolt_object;
      2: required ComponentCommon common;
    }

     

    2. WorkerTopologyContext

    WorkerTopologyContext封装了些worker相关信息

    public class WorkerTopologyContext extends GeneralTopologyContext {
        public static final String SHARED_EXECUTOR = "executor";
        
        private Integer _workerPort;         ;;worker进程的port
        private List<Integer> _workerTasks;  ;;worker包含的taskids
        private String _codeDir;             ;;supervisor上的代码目录, stormdist/stormid
        private String _pidDir;              ;;记录worker运行进程(可能多个)的pids的目录,workid/pids 
        Map<String, Object> _userResources;
        Map<String, Object> _defaultResources;
    
    }

     

    3. TopologyContext

    看注释, TopologyContext会作为bolt和spout的prepare(or open)函数的参数
    所以用openOrPrepareWasCalled, 表示该TopologyContext是否被prepare调用过

    registerMetric, 可以用于往_registeredMetrics中注册metics
    注册的结构, [timeBucketSizeInSecs, [taskId, [name, metric]]]

    _hooks, 用于注册task hook

    /**
     * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
     * methods, respectively. This object provides information about the component's
     * place within the topology, such as task ids, inputs and outputs, etc.
     *
     * <p>The TopologyContext is also used to declare ISubscribedState objects to
     * synchronize state with StateSpouts this object is subscribed to.</p>
     */
    public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
        private Integer _taskId;
        private Map<String, Object> _taskData = new HashMap<String, Object>();
        private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
        private Map<String, Object> _executorData;
        private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
        private clojure.lang.Atom _openOrPrepareWasCalled;
        public TopologyContext(StormTopology topology, Map stormConf,
                Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
                Map<String, Map<String, Fields>> componentToStreamToFields,
                String stormId, String codeDir, String pidDir, Integer taskId,
                Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
                Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
                clojure.lang.Atom openOrPrepareWasCalled) {
            super(topology, stormConf, taskToComponent, componentToSortedTasks,
                    componentToStreamToFields, stormId, codeDir, pidDir,
                    workerPort, workerTasks, defaultResources, userResources);
            _taskId = taskId;
            _executorData = executorData;
            _registeredMetrics = registeredMetrics;
            _openOrPrepareWasCalled = openOrPrepareWasCalled;
        }

     

    4. 使用

    mk-task-data, 创建每个task的topology context

    user-context (user-topology-context (:worker executor-data) executor-data task-id)
    (defn user-topology-context [worker executor-data tid]
      ((mk-topology-context-builder
        worker
        executor-data
        (:topology worker))
       tid))
    
    (defn mk-topology-context-builder [worker executor-data topology]
      (let [conf (:conf worker)]
        #(TopologyContext.
          topology
          (:storm-conf worker)
          (:task->component worker)
          (:component->sorted-tasks worker)
          (:component->stream->fields worker)
          (:storm-id worker)
          (supervisor-storm-resources-path
            (supervisor-stormdist-root conf (:storm-id worker)))
          (worker-pids-root conf (:worker-id worker))
          (int %)
          (:port worker)
          (:task-ids worker)
          (:default-shared-resources worker)
          (:user-shared-resources worker)
          (:shared-executor-data executor-data)
          (:interval->task->metric-registry executor-data)
          (:open-or-prepare-was-called? executor-data))))
  • 相关阅读:
    网页css效果调试技巧
    font: 300 12px/24px "宋体",arial,serif;
    php调试心得
    linux的vim命令介绍和其他命令
    wamp提示can't find driver 针对mysql数据库
    调试yii程序php页面显示中文
    【rgw | 运维】部署rgw
    【ceph | 运维】pool相关命令
    【ceph | 运维】application not enabled 的解决方法
    【ceph | 运维】nautilus版本编译
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3217352.html
Copyright © 2011-2022 走看看