关于,storm的启动我这里不多说了。
见博客
storm的3节点集群详细启动步骤(非HA和HA)(图文详解)
建立stormDemo项目
Group Id : zhouls.bigdata
Artifact Id : stormDemo
Package : stormDemo
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>zhouls.bigdata</groupId> <artifactId>stormDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>stormDemo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> </dependency> </dependencies> </project>
编写代码StormTopology.java
以下是数字累加求和的例子
spout一直产生从1开始的递增数字
bolt进行汇总打印
package zhouls.bigdata.stormDemo; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class StormTopology { public static class MySpout extends BaseRichSpout{ private Map conf; private TopologyContext context; private SpoutOutputCollector collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.collector = collector; this.context = context; } int num = 0; public void nextTuple() { num++; System.out.println("spout:"+num); this.collector.emit(new Values(num)); Utils.sleep(1000); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } } public static class MyBolt extends BaseRichBolt{ private Map stormConf; private TopologyContext context; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.stormConf = stormConf; this.context = context; this.collector = collector; } int sum = 0; public void execute(Tuple input) { Integer num = input.getIntegerByField("num"); sum += num; System.out.println("sum="+sum); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); String spout_id = MySpout.class.getSimpleName(); String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout()); topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id); Config config = new Config(); String topology_name = StormTopology.class.getSimpleName(); if(args.length==0){ //在本地运行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology()); }else{ //在集群运行 try { StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } } }
[hadoop@master apache-storm-1.0.2]$ pwd /home/hadoop/app/apache-storm-1.0.2 [hadoop@master apache-storm-1.0.2]$ ll total 204 drwxrwxr-x 2 hadoop hadoop 4096 May 21 17:18 bin -rw-r--r-- 1 hadoop hadoop 82317 Jul 27 2016 CHANGELOG.md drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 20:12 conf drwxrwxr-x 3 hadoop hadoop 4096 Jul 27 2016 examples drwxrwxr-x 17 hadoop hadoop 4096 May 21 17:18 external drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 2016 extlib drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 2016 extlib-daemon drwxrwxr-x 2 hadoop hadoop 4096 May 21 17:18 lib -rw-r--r-- 1 hadoop hadoop 32101 Jul 27 2016 LICENSE drwxrwxr-x 2 hadoop hadoop 4096 May 21 17:18 log4j2 drwxrwxr-x 2 hadoop hadoop 4096 May 21 19:05 logs -rw-r--r-- 1 hadoop hadoop 981 Jul 27 2016 NOTICE drwxrwxr-x 6 hadoop hadoop 4096 May 21 17:18 public -rw-r--r-- 1 hadoop hadoop 15287 Jul 27 2016 README.markdown -rw-r--r-- 1 hadoop hadoop 6 Jul 27 2016 RELEASE -rw-r--r-- 1 hadoop hadoop 23774 Jul 27 2016 SECURITY.md [hadoop@master apache-storm-1.0.2]$ mkdir jar [hadoop@master apache-storm-1.0.2]$ cd jar/ [hadoop@master jar]$ pwd /home/hadoop/app/apache-storm-1.0.2/jar [hadoop@master jar]$ ll total 0 [hadoop@master jar]$ rz [hadoop@master jar]$ ll total 8 -rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar [hadoop@master jar]$
提交作业之前
[hadoop@master apache-storm-1.0.2]$ pwd /home/hadoop/app/apache-storm-1.0.2 [hadoop@master apache-storm-1.0.2]$ ll total 208 drwxrwxr-x 2 hadoop hadoop 4096 May 21 17:18 bin -rw-r--r-- 1 hadoop hadoop 82317 Jul 27 2016 CHANGELOG.md drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 20:12 conf drwxrwxr-x 3 hadoop hadoop 4096 Jul 27 2016 examples drwxrwxr-x 17 hadoop hadoop 4096 May 21 17:18 external drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 2016 extlib drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 2016 extlib-daemon drwxrwxr-x 2 hadoop hadoop 4096 Jul 27 22:18 jar drwxrwxr-x 2 hadoop hadoop 4096 May 21 17:18 lib -rw-r--r-- 1 hadoop hadoop 32101 Jul 27 2016 LICENSE drwxrwxr-x 2 hadoop hadoop 4096 May 21 17:18 log4j2 drwxrwxr-x 2 hadoop hadoop 4096 May 21 19:05 logs -rw-r--r-- 1 hadoop hadoop 981 Jul 27 2016 NOTICE drwxrwxr-x 6 hadoop hadoop 4096 May 21 17:18 public -rw-r--r-- 1 hadoop hadoop 15287 Jul 27 2016 README.markdown -rw-r--r-- 1 hadoop hadoop 6 Jul 27 2016 RELEASE -rw-r--r-- 1 hadoop hadoop 23774 Jul 27 2016 SECURITY.md [hadoop@master apache-storm-1.0.2]$ bin/storm jar jar/StormTopology.jar zhouls.bigdata.stormDemo.StormTopology aaa Running: /home/hadoop/app/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/hadoop/app/apache-storm-1.0.2 -Dstorm.log.dir=/home/hadoop/app/apache-storm-1.0.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/app/apache-storm-1.0.2/lib/log4j-api-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/kryo-3.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-rename-hack-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-core-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/slf4j-api-1.7.7.jar:/home/hadoop/app/apache-storm-1.0.2/lib/minlog-1.3.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/objenesis-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/clojure-1.7.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/servlet-api-2.5.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-slf4j-impl-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-over-slf4j-1.6.6.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-core-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/disruptor-3.3.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/asm-5.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/reflectasm-1.10.1.jar:jar/StormTopology.jar:/home/hadoop/app/apache-storm-1.0.2/conf:/home/hadoop/app/apache-storm-1.0.2/bin -Dstorm.jar=jar/StormTopology.jar zhouls.bigdata.stormDemo.StormTopology aaa 16503 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -5252258187769573644:-8540038416575654367 17093 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 18654 [main] INFO o.a.s.StormSubmitter - Uploading topology jar jar/StormTopology.jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-cf402e8a-abf7-46bc-a452-14b53aa6b25e.jar 18939 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-cf402e8a-abf7-46bc-a452-14b53aa6b25e.jar 18940 [main] INFO o.a.s.StormSubmitter - Submitting topology StormTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5252258187769573644:-8540038416575654367"} 23899 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: StormTopology [hadoop@master apache-storm-1.0.2]$
然后,查看storm 的ui界面
为什么,会是如上的数字呢?大家要学,就要深入去学和理解。
因为,默认Workers是1。所以是如上如图所示。
点击进去
由此可见,
StormTopology默认是只有1个Worker、3个executors、3个tasks。