本实例为入门篇无可靠性保证实例,关于storm的介绍,以及一些术语名词等,可以参考Storm介绍(一)、Storm介绍(二)。
本案例是基于storm0.9.3版本
1.案例结构
案例:Word Count案例
语句Spout --> 语句分隔Bolt --> 单词计数Bolt --> 上报Bolt
2.语句生成Spout - SentenceSpout
作为入门案例,我们直接从一个数组中不断读取语句,作为数据来源。
SentenceSpout不断读取语句将其作为数据来源,组装成单值tuple(键名sentence,键值为祖父穿格式的语句)向后发射。
{"sentence":"i am so shuai!"}
3.代码结构
话不多说,上代码:
1 import backtype.storm.Config; 2 import backtype.storm.LocalCluster; 3 import backtype.storm.generated.StormTopology; 4 import backtype.storm.topology.TopologyBuilder; 5 import backtype.storm.tuple.Fields; 6 7 public class WCTopologyDriver { 8 public static void main(String[] args) throws Exception { 9 //1.创建组件 10 SentenceSpout sentenceSpout = new SentenceSpout(); 11 SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt(); 12 WordCountBolt wordCountBolt = new WordCountBolt(); 13 ReportBolt reportBolt = new ReportBolt(); 14 15 //2.创建构建者 16 TopologyBuilder builder = new TopologyBuilder(); 17 18 //3.向构建者描述拓扑结构 19 builder.setSpout("Sentence_Spout", sentenceSpout); 20 builder.setBolt("Split_Sentence_Bolt", splitSentenceBolt) 21 .shuffleGrouping("Sentence_Spout"); 22 builder.setBolt(" ", wordCountBolt) 23 .fieldsGrouping("Split_Sentence_Bolt", new Fields("word")); 24 builder.setBolt("Report_Bolt", reportBolt) 25 .shuffleGrouping("Word_Count_Bolt"); 26 27 //4.通过构建者创建拓扑 28 StormTopology topology = builder.createTopology(); 29 30 //5.将拓扑提交到集群中运行 31 //Config conf = new Config(); 32 //StormSubmitter.submitTopology("WC_Topology", conf, topology); 33 34 //5.创建本地集群 模拟运行拓扑 35 LocalCluster cluster = new LocalCluster(); 36 Config conf = new Config(); 37 cluster.submitTopology("WC_Topology", conf, topology); 38 39 Thread.sleep(10 * 1000); 40 cluster.killTopology("WC_Topology"); 41 cluster.shutdown(); 42 } 43 }
1 import java.util.Map; 2 3 import backtype.storm.spout.SpoutOutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichSpout; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Values; 9 10 public class SentenceSpout extends BaseRichSpout { 11 12 private String [] sentences = { 13 "my name is park", 14 "i am so shuai", 15 "do you like me", 16 "are you sure you do not like me", 17 "ok i am sure" 18 }; 19 20 private SpoutOutputCollector collector = null; 21 22 /** 23 * 初始化的方法 24 * 当前组件初始化时 调用 执行初始化操作 25 * conf:代表当前topology相关配置信息 26 * context:代表上下文环境 可以用来获取 任务id 组件id 输入输出相关信息 等信息 27 * collector:代表发送者 可以用来发送 拓扑 可以在任何时候发送 此对象线程安全 可以放心的保存在类的内部作为类的成员 28 */ 29 @Override 30 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 31 this.collector = collector; 32 } 33 34 /** 35 * storm会在一个单一线程中不停的调用此方法 要求发送tuple 36 * 如果有数据要发 直接发 如果没有数据要发 也不要阻塞这个方法 而是直接返回即可 37 * 如果真的没有数据要发送 最好睡上一个很短的时间 以便释放cpu 不至于浪费过多资源 38 */ 39 private int index = 0; 40 @Override 41 public void nextTuple() { 42 if(index < sentences.length){ 43 collector.emit(new Values(sentences[index])); 44 index++; 45 }else{ 46 try { 47 Thread.sleep(1); 48 } catch (InterruptedException e) { 49 e.printStackTrace(); 50 } 51 return; 52 } 53 } 54 55 /** 56 * 用来声明输出信息 57 * declarer:声明输出的流的编号 输出的tuple中的字段 以及是否是一个指向性的流 58 * 要注意 组件发送的tuple的结构 都要现在此方法中声明 59 */ 60 @Override 61 public void declareOutputFields(OutputFieldsDeclarer declarer) { 62 declarer.declare(new Fields("sentence")); 63 } 64 65 }
1 import java.util.Map; 2 3 import backtype.storm.task.OutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 public class SplitSentenceBolt extends BaseRichBolt{ 12 13 private OutputCollector collector = null; 14 15 /** 16 * 初始化的方法 17 * 当前组件初始化时 调用 执行初始化操作 18 * conf:代表当前topology相关配置信息 19 * context:代表上下文环境 可以用来获取 任务id 组件id 输入输出相关信息 等信息 20 * collector:代表发送者 可以用来发送 拓扑 可以在任何时候发送 此对象线程安全 可以放心的保存在类的内部作为类的成员 21 */ 22 @Override 23 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 24 this.collector = collector; 25 } 26 27 /** 28 * 对于输入的tuple 一个tuple触发一次此方法 29 * 在这个方法中对tuple进行处理 30 */ 31 @Override 32 public void execute(Tuple input) { 33 String sentence = input.getStringByField("sentence"); 34 String [] words = sentence.split(" "); 35 for(String word : words){ 36 collector.emit(new Values(word)); 37 } 38 } 39 40 /** 41 * 用来声明输出信息 42 * declarer:声明输出的流的编号 输出的tuple中的字段 以及是否是一个指向性的流 43 * 要注意 组件发送的tuple的结构 都要现在此方法中声明 44 */ 45 @Override 46 public void declareOutputFields(OutputFieldsDeclarer declarer) { 47 declarer.declare(new Fields("word")); 48 } 49 50 }
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import backtype.storm.task.OutputCollector; 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.OutputFieldsDeclarer; 7 import backtype.storm.topology.base.BaseRichBolt; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Tuple; 10 import backtype.storm.tuple.Values; 11 12 public class WordCountBolt extends BaseRichBolt { 13 14 private OutputCollector collector = null; 15 16 @Override 17 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 18 this.collector = collector; 19 } 20 21 private Map<String,Integer> map = new HashMap<>(); 22 @Override 23 public void execute(Tuple input) { 24 String word = input.getStringByField("word"); 25 map.put(word, map.containsKey(word) ? map.get(word)+1 : 1); 26 collector.emit(new Values(word,map.get(word))); 27 } 28 29 @Override 30 public void declareOutputFields(OutputFieldsDeclarer declarer) { 31 declarer.declare(new Fields("word","count")); 32 } 33 34 }
1 import java.util.Map; 2 3 import backtype.storm.task.OutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichBolt; 7 import backtype.storm.tuple.Tuple; 8 9 public class ReportBolt extends BaseRichBolt { 10 11 @Override 12 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 13 14 } 15 16 @Override 17 public void execute(Tuple input) { 18 String word = input.getStringByField("word"); 19 int count = input.getIntegerByField("count"); 20 System.out.println("--单词数量发生变化:"+word+"~"+count+"--"); 21 } 22 23 @Override 24 public void declareOutputFields(OutputFieldsDeclarer declarer) { 25 26 } 27 28 }
运行结果:
补充,以下是本文案例用到的jar包,由于太大,没有上传,下载0.9.3的storm源码,解压后文件夹中的lib下的所有jar包: