zoukankan      html  css  js  c++  java
  • Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

    前期博客

    Storm编程入门API系列之Storm的Topology默认Workers、默认executors和默认tasks数目

    Storm编程入门API系列之Storm的Topology多个Workers数目控制实现

     继续编写

      StormTopologyMoreTask.java

    package zhouls.bigdata.stormDemo;
    
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class StormTopologyMoreTask {
        
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.collector = collector;
                this.context = context;
            }
    
            int num = 0; 
            public void nextTuple() {
                num++;
                System.out.println("spout:"+num);
                this.collector.emit(new Values(num));
                Utils.sleep(1000);
            }
    
        
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
            
        }
        
        
        
        public static class MyBolt extends BaseRichBolt{
            
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            
            public void execute(Tuple input) {
                Integer num = input.getIntegerByField("num");
                System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);
            }
    
            
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
            }
            
        }
        
        
        
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            String spout_id = MySpout.class.getSimpleName();
            String bolt_id = MyBolt.class.getSimpleName();
            
            topologyBuilder.setSpout(spout_id, new MySpout());
            topologyBuilder.setBolt(bolt_id, new MyBolt()).setNumTasks(3).shuffleGrouping(spout_id);
            
            
            Config config = new Config();
            String topology_name = StormTopologyMoreTask.class.getSimpleName();
            if(args.length==0){
                //在本地运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
            }else{
                //在集群运行
                try {
                    StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }
            
        }
    
    }

     打jar包

     

     

     

    [hadoop@master jar]$ pwd
    /home/hadoop/app/apache-storm-1.0.2/jar
    [hadoop@master jar]$ ll
    total 24
    -rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
    -rw-r--r-- 1 hadoop hadoop 5091 Jul 27 23:00 StormTopologyMoreExecutor.jar
    -rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
    [hadoop@master jar]$ rz
    
    [hadoop@master jar]$ ll
    total 32
    -rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
    -rw-r--r-- 1 hadoop hadoop 5091 Jul 27 23:00 StormTopologyMoreExecutor.jar
    -rw-r--r-- 1 hadoop hadoop 5105 Jul 27 23:20 StormTopologyMoreTask.jar
    -rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
    [hadoop@master jar]$ 

    提交作业之前

      

    为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

       因为,我之前运行的StormTopologyMoreExecutor没有停掉

      

    为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

  • 相关阅读:
    floating IP 原理分析
    创建 floating IP
    Why Namespace?
    虚拟 ​router 原理分析- 每天5分钟玩转 OpenStack(101)
    链接脚本使用一例2---将二进制文件 如图片、MP3音乐、词典一类的东西作为目标文件中的一个段
    linux-2.6.26内核中ARM中断实现详解(转)
    有关Cache –(1) linux list之中的Prefetc
    Linux 内核中的 GCC 特性
    对entry-common.S和call.S的部分理解1
    kernel&uboot学习笔记
  • 原文地址:https://www.cnblogs.com/zlslch/p/7247940.html
Copyright © 2011-2022 走看看