1. vim /etc/hosts
ssh免密登录
192.168.132.154 c0
192.168.132.156 c1
192.168.132.155 c2
storm集群:
192.168.132.154 c0
192.168.132.156 c1
192.168.132.155 c2
2. 请在官网下载,并解压。文末有文件下载地址
tar -zxvf apache-storm-1.2.3.tar.gz
配置环境变量(c0,c1,c2都要修改)
vim /etc/profile
# storm
export STORM_HOME=/home/xiaozw/soft/java/storm
export PATH=$PATH:${JAVA_PATH}:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin:$SPARK_HOME/bin:${STORM_HOME}/bin
刷新生效
source /etc/profile
3. 修改storm/conf/storm.yaml配置文件
storm.zookeeper.servers:
- "192.168.132.154"
- "192.168.132.156"
- "192.168.132.155"
nimbus.seeds: ["192.168.132.154"]
storm.local.dir:"/home/xiaozw/soft/tmp/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
创建storm目录(c0,c1,c2都要创建)
mkdir /home/xiaozw/soft/tmp/storm
4. 拷贝storm文件夹到其它集群上(c1,c2)
scp -r /home/xiaozw/soft/java/storm root@c1:/home/xiaozw/soft/java/
scp -r /home/xiaozw/soft/java/storm root@c2:/home/xiaozw/soft/java/
先启动zookeeper,安装zookeeper请查看其它文档。
zkServer.sh start
5. 启动storm
c0,nimbus上启动
./storm nimbus >> /dev/null 2>&1 &
启动界面
storm ui >> /dev/null 2>&1 &
c1,c2 supervisor上启动
./storm supervisor >> /dev/null 2>&1 &
界面查看,http://192.168.132.154:8080
6. java wordcount程序。
package com.xiaozw.demo4.storm; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; public class WordCountTopology { /** * spout 继承一个基类,实现接口,这个里面主要是负责从数据源获取数据。 * 简化从内部发射数据。 */ public static class RandomSentenceSpout extends BaseRichSpout{ private static final long serialVersionUID = -8017609899644290351L; private SpoutOutputCollector collector; private Random random; /** * 对spout初始化,创建线程,数据库连接 * @param conf * @param topologyContext * @param collector */ @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) { //初始化数据,SpoutOutputCollector用来发射数据出去, this.collector=collector; this.random=new Random(); } /** * 最终运行在task中,某个worker进程的某个executor线程内部。 * 某个task负责无限循环调用nextTuple方法, * 形成数据流。 */ @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[random.nextInt(sentences.length)]; System.err.println("【发射句子】sentence=" + sentence); collector.emit(new Values(sentence)); } /** * 发射出去每个tuple中field名称是什么。 * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } } /** * 每个bolt同样是发送到worker某个executor的task中执行 * * @author Administrator * */ public static class SplitSentence extends BaseRichBolt { private static final long serialVersionUID = -1863792429350238883L; private OutputCollector collector; /** * 对于bolt来说,第一个方法就是prepare()方法。 */ @Override @SuppressWarnings("rawtypes") public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 每接收到一条数据后,就会交给executor方法来执行 */ @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); if (sentence != null && "".equals(sentence) == false) { String[] words = sentence.split(" "); for (String word : words) { collector.emit(new Values(word)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } /** * 单词计数bolt * * @author Administrator * */ public static class WordCount extends BaseRichBolt { private static final long serialVersionUID = -8940950046975910504L; //private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class); private OutputCollector collector; private Map<String, Integer> wordCounts = new HashMap<String, Integer>(); @Override @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = wordCounts.get(word); if (count == null) { count = 0; } wordCounts.put(word, ++count); System.err.println("【单词计数】" + word + "出现的次数是" + count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception{ // 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑 TopologyBuilder builder = new TopologyBuilder(); // 第一个参数的意思,就是给这个spout设置一个名字 // 第二个参数的意思,就是创建一个spout的对象 // 第三个参数的意思,就是设置spout的executor有几个 builder.setSpout("RandomSentence", new RandomSentenceSpout(), 5); builder.setBolt("SplitSentence", new SplitSentence(), 10).setNumTasks(20) .shuffleGrouping("RandomSentence"); // 这里设置fieldsGrouping很重要,相同的单词从SplitSentence发射出来时,一定会进入到下游的指定的同一个task中 // 只有这样子,才能准确的统计出每个单词的数量 builder.setBolt("WordCount", new WordCount(), 10).setNumTasks(20).fieldsGrouping( "SplitSentence", new Fields("word")); Config config = new Config(); if(args!=null && args.length>0){ config.setNumWorkers(3); try{ StormSubmitter.submitTopologyWithProgressBar(args[0],config,builder.createTopology()); } catch (Exception e){ e.printStackTrace(); } } else{ config.setMaxTaskParallelism(20); // 在eclipse本地运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordCountTopology", config, builder.createTopology()); Utils.sleep(60000); cluster.shutdown(); } } }
打包 mvn clean install -DskipTests
上传jar包到测试服务器
执行命令:
storm jar demo4-1.0-SNAPSHOT.jar com.xiaozw.demo4.storm.WordCountTopology WordCountTopology
查看UI界面 http://192.168.132.154:8080/index.html
7. 源码和文件下载地址
链接:https://pan.baidu.com/s/1RmBlhZ_p-30clHoUhxycBg
提取码:ik6g