zoukankan      html  css  js  c++  java
  • Storm编程入门API系列之Storm的Topology多个Executors数目控制实现

    前期博客

    Storm编程入门API系列之Storm的Topology默认Workers、默认executors和默认tasks数目

    Storm编程入门API系列之Storm的Topology多个Workers数目控制实现

    继续编写

      StormTopologyMoreExecutor.java

    package zhouls.bigdata.stormDemo;
    
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class StormTopologyMoreExecutor {
        
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.collector = collector;
                this.context = context;
            }
    
            int num = 0; 
            public void nextTuple() {
                num++;
                System.out.println("spout:"+num);
                this.collector.emit(new Values(num));
                Utils.sleep(1000);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
            
        }
        
        
        
        public static class MyBolt extends BaseRichBolt{
            
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            public void execute(Tuple input) {
                Integer num = input.getIntegerByField("num");
                System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
            }
            
        }
        
        
        
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            String spout_id = MySpout.class.getSimpleName();
            String bolt_id = MyBolt.class.getSimpleName();
            
            topologyBuilder.setSpout(spout_id, new MySpout());
            topologyBuilder.setBolt(bolt_id, new MyBolt(),3).shuffleGrouping(spout_id);
            
            
            Config config = new Config();
            String topology_name = StormTopologyMoreExecutor.class.getSimpleName();
            if(args.length==0){
                //在本地运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
            }else{
                //在集群运行
                try {
                    StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }
            
        }
    
    }

       打jar包

     

     提交作业之前

     

    [hadoop@master apache-storm-1.0.2]$ pwd
    /home/hadoop/app/apache-storm-1.0.2
    [hadoop@master apache-storm-1.0.2]$ ll
    total 208
    drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 bin
    -rw-r--r--  1 hadoop hadoop 82317 Jul 27  2016 CHANGELOG.md
    drwxrwxr-x  2 hadoop hadoop  4096 Jul 27 20:12 conf
    drwxrwxr-x  3 hadoop hadoop  4096 Jul 27  2016 examples
    drwxrwxr-x 17 hadoop hadoop  4096 May 21 17:18 external
    drwxrwxr-x  2 hadoop hadoop  4096 Jul 27  2016 extlib
    drwxrwxr-x  2 hadoop hadoop  4096 Jul 27  2016 extlib-daemon
    drwxrwxr-x  2 hadoop hadoop  4096 Jul 27 23:00 jar
    drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 lib
    -rw-r--r--  1 hadoop hadoop 32101 Jul 27  2016 LICENSE
    drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 log4j2
    drwxrwxr-x  2 hadoop hadoop  4096 May 21 19:05 logs
    -rw-r--r--  1 hadoop hadoop   981 Jul 27  2016 NOTICE
    drwxrwxr-x  6 hadoop hadoop  4096 May 21 17:18 public
    -rw-r--r--  1 hadoop hadoop 15287 Jul 27  2016 README.markdown
    -rw-r--r--  1 hadoop hadoop     6 Jul 27  2016 RELEASE
    -rw-r--r--  1 hadoop hadoop 23774 Jul 27  2016 SECURITY.md
    [hadoop@master apache-storm-1.0.2]$ bin/storm jar jar/StormTopologyMoreExecutor.jar zhouls.bigdata.stormDemo.StormTopologyMoreExecutor aaa
    Running: /home/hadoop/app/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/hadoop/app/apache-storm-1.0.2 -Dstorm.log.dir=/home/hadoop/app/apache-storm-1.0.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/app/apache-storm-1.0.2/lib/log4j-api-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/kryo-3.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-rename-hack-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-core-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/slf4j-api-1.7.7.jar:/home/hadoop/app/apache-storm-1.0.2/lib/minlog-1.3.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/objenesis-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/clojure-1.7.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/servlet-api-2.5.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-slf4j-impl-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-over-slf4j-1.6.6.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-core-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/disruptor-3.3.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/asm-5.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/reflectasm-1.10.1.jar:jar/StormTopologyMoreExecutor.jar:/home/hadoop/app/apache-storm-1.0.2/conf:/home/hadoop/app/apache-storm-1.0.2/bin -Dstorm.jar=jar/StormTopologyMoreExecutor.jar zhouls.bigdata.stormDemo.StormTopologyMoreExecutor aaa
    2632 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7700164527916050772:-9124174655622273375
    3011 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
    3598 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar jar/StormTopologyMoreExecutor.jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-83ebee61-5051-4ab5-aff7-e9fcf4560f42.jar
    3711 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-83ebee61-5051-4ab5-aff7-e9fcf4560f42.jar
    3714 [main] INFO  o.a.s.StormSubmitter - Submitting topology StormTopologyMoreExecutor in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7700164527916050772:-9124174655622273375"}
    5363 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: StormTopologyMoreExecutor
    [hadoop@master apache-storm-1.0.2]$ 

     

      

      为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

       因为,我之前运行的StormTopologyMoreWorker没有停掉

       为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

       

      即,以下就是它的3个Executor

      

  • 相关阅读:
    07_zookeeper的客户端工具curator_基本api
    06_zookeeper原生Java API使用
    05_zookeeper的ACL
    04_zookeeper的watcher机制
    03_Zookeeper基本数据模型及基本命令操作
    02_zookeeper配置
    01_Zookeeper简述
    thinkphp之url的seo优化
    thinkphp Upload上传文件在客户端生成的临时文件$_FILES['file']['tmp_name']
    php过滤表单输入的emoji表情
  • 原文地址:https://www.cnblogs.com/zlslch/p/7247885.html
Copyright © 2011-2022 走看看