zoukankan      html  css  js  c++  java
  • Storm系列(三)Topology提交过程

    提交示例代码:

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("random", new RandomWordSpout(), 2);
        builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");
        builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));
        Config conf = new Config();
        conf.setNumWorkers(4);// 设置启动4个Worker
        conf.setNumAckers(1); // 设置一个ack线程
        conf.setDebug(true); // 设置打印所有发送的消息及系统消息
    10      StormSubmitter.submitTopology("test", conf, builder.createTopology());
    11  }
     

    1、构建 TopologyBuilder 对象 builder,主要用于对各个组件(bolt、spout)进行配置,TopologyBuilder主要属性字段定义如下:

    public class TopologyBuilder {
     
        // 所提交Topolog中所有的bolt将放入到_bolts中
        private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
     
        // 所提交Topolog中所有的spout将放入到_spouts中
        private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
     
        // 所提交Topolog中所有的spout和bolt都将放入_commons中
    10      private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
    11   
    12      ....................................
    13  }
    14   

     

    2、以上提交代码中第三行,配置了一个id值为random,IRichSpout对象为RandomWordSpout,而并行度为2(两个线程里面跑两个任务)的spout, setSpout函数实现源码如下:

    public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
     
        validateUnusedId(id);
     
        initCommon(id, spout, parallelism_hint);
     
        _spouts.put(id, spout);
     
        return new SpoutGetter(id);
    10   
    11  }
    12   

    validateUnusedId:检测输入的id是不是唯一,若已经存在将抛出异常;

    initCommon:构建ComponentCommon对象并进行相应的初始化,最后放入到_commons(以上TopologyBuilder中定义的Map);

    initCommon函数实现源码:

    private void initCommon(String id, IComponent component, Number parallelism) {
     
        ComponentCommon common = new ComponentCommon();
     
        // 设置消息流的来源及分组方式
     
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
     
        if(parallelism!=null) 
    10   
    11      // 设置并行度
    12      common.set_parallelism_hint(parallelism.intValue());
    13   
    14      Map conf = component.getComponentConfiguration();
    15   
    16      if(conf!=null) 
    17   
    18      // 设置组件的配置参数
    19      common.set_json_conf(JSONValue.toJSONString(conf));
    20   
    21      _commons.put(id, common);
    22  }

    在ComponentCommon中主要对以下四个属性字段进行设置:

    GlobalStreamId:确定消息来源,其中componentId表示所属组件,streamId为消息流的标识符;

    Grouping:确定消息分组方式;

    private Map<GlobalStreamId,Grouping> inputs;

    StreamInfo表示输出的字段列表及是否为直接流

    private Map<String,StreamInfo> streams;

    private int parallelism_hint; // 设置并行度

    private String json_conf; // 其它配置参数设置(必须为JSON格式)

     

    3、SpoutGetter实现源码:

    protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
     
        public SpoutGetter(String id) {
            super(id);
        }
    }

    ConfigGetter、SpoutGetter的实现都是在TopologyBuilder中, ConfigGetter作用:设置程序中的配置项,覆盖默认的配置项,且配置项的格式为为JSON(本质上是改变对应ComponentCommon对象中json_conf的值);

     

    4、提交示例代码中的第四行定义了一个id为transfer,IRichSpout对象为TransferBolt,并行度为4的bolt

    setBolt实现源码:

    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
     
        validateUnusedId(id);
     
        initCommon(id, bolt, parallelism_hint);
     
        _bolts.put(id, bolt);
     
        return new BoltGetter(id);
    10   
    11  }
    12   

    设置Bolt的函数与设置Spout函数的实现唯一的区别在返回结果;

    BoltGetter实现部分源码:

    protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
     
        private String _boltId;
     
        public BoltGetter(String boltId) {
       
            super(boltId);
       
            _boltId = boltId;
    10     
    11      }
    12     
    13      public BoltDeclarer shuffleGrouping(String componentId) {
    14     
    15          return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
    16     
    17      }
    18     
    19      public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
    20     
    21          return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
    22     
    23      }
    24     
    25      public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {
    26     
    27          return grouping(componentId, streamId, Grouping.fields(fields.toList()));
    28     
    29      }
    30     
    31      public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
    32     
    33          return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));
    34     
    35      }
    36     
    37      private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {
    38     
    39          _commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
    40     
    41          return this;
    42     
    43      }
    44   
    45      .........................................
    46   
    47  }
    48   

    BoltGetter继承至ConfigGetter并实现了BoltDeclarer接口,并重载了BoltDeclarer(InputDeclarer)中各种分组方式(如:fieldsGrouping、shuffleGrouping),分组方式的实现本质上是在_commons中通过对用的boltId找到对应的ComponentCommon对象,对inputs属性进行设置;

     

    5、通过以上几步完成了bolt与spout的配置(对应提交示例代码中的2~5行),6~9行是对运行环境的配置,10行用于向集群提交执行任务,builder.createTopology用于构建StormTopology对象,createTopology实现源码如下:

    public StormTopology createTopology() {
     
        Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
       
        Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
       
        for(String boltId: _bolts.keySet()) {
       
            IRichBolt bolt = _bolts.get(boltId);
    10     
    11          ComponentCommon common = getComponentCommon(boltId, bolt);
    12     
    13          boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));
    14     
    15      }
    16     
    17      for(String spoutId: _spouts.keySet()) {
    18     
    19          IRichSpout spout = _spouts.get(spoutId);
    20     
    21          ComponentCommon common = getComponentCommon(spoutId, spout);
    22     
    23          spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));
    24     
    25      }
    26     
    27      return new StormTopology(spoutSpecs,boltSpecs,new HashMap<String, StateSpoutSpec>());
    28   
    29  }
    30   
    31   

     

    以上源码实现中主要做了两件事:

    • 通过boltId从_bolts中获取到对应的bolt对象,再通过getComponentCommon方法设置对应ComponentCommon对象的streams(输出的字段列表及是否为直接流)属性值,最后将bolt和common一起 放入到boltSpecs集合中。
    • 通过spoutId从_spouts中获取到对应的spout对象,再通过getComponentCommon方法设置对应ComponentCommon对象的streams(输出的字段列表及是否为直接流)属性值,最后将spout和common一起 放入到boltSpecs集合中。
    • 通过以上两步使所设置的所有组件都封装到StormTopology对象中,最后提交的到集群中运行。
  • 相关阅读:
    hdu 1028 Ignatius and the Princess III (n的划分)
    CodeForces
    poj 3254 Corn Fields (状压DP入门)
    HYSBZ 1040 骑士 (基环外向树DP)
    PAT 1071 Speech Patterns (25)
    PAT 1077 Kuchiguse (20)
    PAT 1043 Is It a Binary Search Tree (25)
    PAT 1053 Path of Equal Weight (30)
    c++ 常用标准库
    常见数学问题
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4440188.html
Copyright © 2011-2022 走看看