zoukankan      html  css  js  c++  java
  • Storm/JStorm之TopologyBuilder源码阅读

    在Strom/JStorm中有一个类是特别重要的,主要用来构建Topology的,这个类就是TopologyBuilder. 
    咱先看一下简单的例子:

    public static void main(String[] args) throws AlreadyAliveException,
                InvalidTopologyException {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("input", new RandomSentenceSpout(), 2);
            builder.setBolt("bolt_sentence", new SplitSentenceBolt(), 2)
                    .shuffleGrouping("input");
    
            // 本地模式:最主要用来调试用
            LocalCluster cluster = new LocalCluster();
            System.out.println("start wordcount");
            cluster.submitTopology("word count", conf,    builder.createTopology());
        }

    在上面的main方法里先创建TopologyBuilder对象,然后设置好已创建的Spout节点和Bolt节点,并用随机分组(shuffleGrouping)将Spout和Bolt节点连接起来形成Topology。

    那TopologyBuilder是如何做的呢?请看下面TopologyBuilder源代码:

    /**
     * TopologyBuilder是一个用于构建Topology的工具类
     *
     */
    public class TopologyBuilder {
        /**
         * 定义了类成员变量_bolts,用来存放IRichBolt类型的所有Bolt对象
         */
        private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
        /**
         * 定义了类成员变量_spouts,用来存放IRichSpout类型的所有Spout对象
         */
        private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
        /**
         * 定义了类成员变量_commons,存放了所有的Bolt和Spout对象
         */
        private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
    
        // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
    
        private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
        /**
         * 根据传入的Bolt和Spout对象构建StormTopology对象
         * @return
         */
    public StormTopology createTopology() {
            Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
            Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
            for (String boltId : _bolts.keySet()) {
                //根据boltId从_bolts中获取到对应的bolt对象
                IRichBolt bolt = _bolts.get(boltId);
                //设置对应ComponentCommon对象的streams(输出的字段列表以及是否是直接流)属性值
                ComponentCommon common = getComponentCommon(boltId, bolt);
                /**
                 * 先将Bolts对象序列化得到数组,再创建Bolt对象,所以所有在StormTopology中Bolts是对象序列化过后得到的字节数组.
                 */
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            }
            for (String spoutId : _spouts.keySet()) {
                //根据spoutId从_spouts中获取到对应的spout对象
                IRichSpout spout = _spouts.get(spoutId);
                //设置对应ComponentCommon对象的streams(输出的字段列表以及是否是直接流)
                ComponentCommon common = getComponentCommon(spoutId, spout);
                /**
                 * 先将Spout对象序列化得到数组,再创建SpoutSpec对象,所以所有在StormTopology中Spouts是对象序列化过后得到的字节数组.
                 */
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
    
            }
            //将上述所设置的所有组件都封装到StormTopology对象中,最后提交到集群中运行
            return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
        }
        /**
         * 下面几个方法定义了setBolt方法以及它的重载方法
         */
        /**
         * 在这个topology中定义一个只有单线程并行度的新的bolt
         * 其它想要消耗这个bolt的输出的组件会引用这个id
         */
       public BoltDeclarer setBolt(String id, IRichBolt bolt) {
            return setBolt(id, bolt, null);
        }
    
        /**
         *  为这个topology定义一个指定数量的并行度的bolt
         */
        public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
            //检测传入的组件id是否唯一
            validateUnusedId(id);
            //生成common对象
            initCommon(id, bolt, parallelism_hint);
            _bolts.put(id, bolt);
            return new BoltGetter(id);
        }
    
    
        public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
            return setBolt(id, bolt, null);
        }
    
    
        public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
            /**
             * 该方法利用BasicBoltExecutor包装(封装)传入的IBasicBolt对象
             * 在BasicBoltExecutor中实现了对消息的追踪
             */
            return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
        }
      /**
         * 下面几个方法定义了setSpout方法以及它的重载方法
         */
        public SpoutDeclarer setSpout(String id, IRichSpout spout) {
            return setSpout(id, spout, null);
        }
    
        public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
            //检测输入的id是否唯一,若已经存在将抛出异常
            validateUnusedId(id);
            /**
             * 构建ComponentCommon对象并进行相对应的初始化,最后放入到_commons(在上述中已经定义)
             */
            initCommon(id, spout, parallelism_hint);
            _spouts.put(id, spout);
            return new SpoutGetter(id);
        }
    
    
        public SpoutDeclarer setSpout(String id, IControlSpout spout) {
            return setSpout(id, spout, null);
        }
    
        public SpoutDeclarer setSpout(String id, IControlSpout spout, Number parallelism_hint) {
            return setSpout(id, new ControlSpoutExecutor(spout), parallelism_hint);
        }
    
        public BoltDeclarer setBolt(String id, IControlBolt bolt, Number parallelism_hint) {
            return setBolt(id, new ControlBoltExecutor(bolt), parallelism_hint);
        }
        public BoltDeclarer setBolt(String id, IControlBolt bolt) {
            return setBolt(id, bolt, null);
        }
    
        public void setStateSpout(String id, IRichStateSpout stateSpout) {
            setStateSpout(id, stateSpout, null);
        }
        public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) {
            validateUnusedId(id);
            // TODO: finish
        }
        /**
         * 检测输入的id是否唯一
         * @param id
         */
        private void validateUnusedId(String id) {
            if (_bolts.containsKey(id)) {
                throw new IllegalArgumentException("Bolt has already been declared for id " + id);
            }
            if (_spouts.containsKey(id)) {
                throw new IllegalArgumentException("Spout has already been declared for id " + id);
            }
            if (_stateSpouts.containsKey(id)) {
                throw new IllegalArgumentException("State spout has already been declared for id " + id);
            }
        }
    
        private ComponentCommon getComponentCommon(String id, IComponent component) {
            ComponentCommon ret = new ComponentCommon(_commons.get(id));
    
            OutputFieldsGetter getter = new OutputFieldsGetter();
            component.declareOutputFields(getter);
            ret.set_streams(getter.getFieldsDeclaration());
            return ret;
        }
        /**
         * 定义了initCommon方法,用来初始化变量CommonentCommon对象,并给类成员变量_commons赋值
         * 初始化所做的工作:设置并行度还有一些其它配置
         * @param id
         * @param component
         * @param parallelism
         */
    private void initCommon(String id, IComponent component, Number parallelism) {
            ComponentCommon common = new ComponentCommon();
            //设置消息流的来源及分组方式
            common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
            if (parallelism != null) {
                //设置并行度
                common.set_parallelism_hint(parallelism.intValue());
            } else {
                //如果并行度没有手动设置则默认为1
                common.set_parallelism_hint(1);
            }
            Map conf = component.getComponentConfiguration();
            if (conf != null)
                //设置组件的配置参数
                common.set_json_conf(JSONValue.toJSONString(conf));
            _commons.put(id, common);
        }
    }

    从上面TopologyBuilder的类中可以看到这个类提供了创建StormTopology的方法以及一些数据源节点和处理节点的相关设置的方法,

    还有就是存储Bolt对象和Spout对象的方法,当然这里关于分组的代码没有写出来。事实上这个类就是用来设置Spout节点和Bolt节点,

    并通过分组方式将Spout和Bolt节点连接起来形成拓扑结构的。

  • 相关阅读:
    ACM-ICPC 2018 徐州赛区网络预赛 I. Characters with Hash
    hdu 5437
    poj 1502
    ACM-ICPC 2018 沈阳赛区网络预赛 K. Supreme Number
    ACM-ICPC 2018 沈阳赛区网络预赛 F. Fantastic Graph
    ACM-ICPC 2018 南京赛区网络预赛 B. The writing on the wall
    ACM-ICPC 2018 南京赛区网络预赛 J. Sum
    法里数列
    ACM-ICPC 2018 南京赛区网络预赛 L. Magical Girl Haze
    Hashtable 为什么不叫 HashTable?
  • 原文地址:https://www.cnblogs.com/RoseVorchid/p/5967041.html
Copyright © 2011-2022 走看看