1 package com.mengyao.storm; 2 3 import java.io.File; 4 import java.io.IOException; 5 import java.util.Collection; 6 import java.util.HashMap; 7 import java.util.List; 8 import java.util.Map; 9 import java.util.Map.Entry; 10 11 import org.apache.commons.io.FileUtils; 12 13 import backtype.storm.Config; 14 import backtype.storm.LocalCluster; 15 import backtype.storm.StormSubmitter; 16 import backtype.storm.generated.AlreadyAliveException; 17 import backtype.storm.generated.InvalidTopologyException; 18 import backtype.storm.spout.SpoutOutputCollector; 19 import backtype.storm.task.OutputCollector; 20 import backtype.storm.task.TopologyContext; 21 import backtype.storm.topology.OutputFieldsDeclarer; 22 import backtype.storm.topology.TopologyBuilder; 23 import backtype.storm.topology.base.BaseRichBolt; 24 import backtype.storm.topology.base.BaseRichSpout; 25 import backtype.storm.tuple.Fields; 26 import backtype.storm.tuple.Tuple; 27 import backtype.storm.tuple.Values; 28 import backtype.storm.utils.Utils; 29 30 /** 31 * Storm中的单词计数,拓扑结构为InputSpout->SplitBolt->CountBolt = WordCountTopology 32 * @author mengyao 33 * 34 */ 35 @SuppressWarnings("all") 36 public class WordCountTopology { 37 38 public static class InputSpout extends BaseRichSpout{ 39 40 private Map conf; 41 private TopologyContext context; 42 private SpoutOutputCollector collector; 43 44 /** 45 * 实例化该Spout时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法 46 */ 47 @Override 48 public void open(Map conf, TopologyContext context, 49 SpoutOutputCollector collector) { 50 this.conf = conf; 51 this.context = context; 52 this.collector = collector; 53 } 54 55 /** 56 * 死循环发射每行消息 57 */ 58 @Override 59 public void nextTuple() { 60 Collection<File> listFiles = FileUtils.listFiles(new File("D:/"), new String[]{"log"}, false); 61 for (File file : listFiles) { 62 try { 63 List<String> lines = FileUtils.readLines(file); 64 for (String line : lines) { 65 this.collector.emit(new Values(line)); 66 System.err.println("==== InputSpout:"+line+" ===="); 67 } 68 FileUtils.moveFile(file, new File(file.getAbsoluteFile()+".tmp")); 69 } catch (IOException e) { 70 e.printStackTrace(); 71 throw new RuntimeException(e); 72 } 73 } 74 } 75 76 /** 77 * 声明字段“line”提供给下一个Bolt组件订阅 78 */ 79 @Override 80 public void declareOutputFields(OutputFieldsDeclarer declarer) { 81 declarer.declare(new Fields("line")); 82 } 83 84 } 85 86 public static class SplitBolt extends BaseRichBolt{ 87 88 private Map stormConf; 89 private TopologyContext context; 90 private OutputCollector collector; 91 92 /** 93 * 实例化该Bolt时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法 94 */ 95 @Override 96 public void prepare(Map stormConf, TopologyContext context, 97 OutputCollector collector) { 98 this.stormConf = stormConf; 99 this.context = context; 100 this.collector = collector; 101 } 102 103 /** 104 * 死循环发送每个单词 105 */ 106 @Override 107 public void execute(Tuple input) { 108 String line = input.getStringByField("line"); 109 String[] words = line.split(" "); 110 for (String word : words) { 111 this.collector.emit(new Values(word)); 112 System.err.println("==== SplitBolt:"+word+" ===="); 113 } 114 } 115 116 /** 117 * 声明字段“word”提供给下一个Bolt组件订阅 118 */ 119 @Override 120 public void declareOutputFields(OutputFieldsDeclarer declarer) { 121 declarer.declare(new Fields("word")); 122 } 123 124 } 125 126 public static class CountBolt extends BaseRichBolt{ 127 128 private Map stormConf; 129 private TopologyContext context; 130 private OutputCollector collector; 131 HashMap<String, Long> map = new HashMap<String, Long>(); 132 133 /** 134 * 实例化该Bolt时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法 135 */ 136 @Override 137 public void prepare(Map stormConf, TopologyContext context, 138 OutputCollector collector) { 139 this.stormConf = stormConf; 140 this.context = context; 141 this.collector = collector; 142 } 143 144 @Override 145 public void execute(Tuple input) { 146 String word = input.getStringByField("word"); 147 Long value = map.get(word); 148 if (value==null) { 149 value=0L; 150 } 151 value++; 152 map.put(word, value); 153 for (Entry<String, Long> entry : map.entrySet()) { 154 System.err.println("==== CountBolt:"+entry+" ===="); 155 } 156 } 157 158 @Override 159 public void declareOutputFields(OutputFieldsDeclarer declarer) { 160 } 161 162 } 163 164 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 165 String topologyName = WordCountTopology.class.getSimpleName(); 166 TopologyBuilder builder = new TopologyBuilder(); 167 builder.setSpout("input", new InputSpout()); 168 builder.setBolt("split", new SplitBolt()).shuffleGrouping("input"); 169 builder.setBolt("count", new CountBolt()).shuffleGrouping("split"); 170 171 Config config = new Config(); 172 config.setDebug(true); 173 174 if (args!=null && args.length>0) { //如果是生产环境中使用集群模式提交拓扑 175 config.setNumWorkers(3); 176 StormSubmitter.submitTopology(topologyName, config, builder.createTopology()); 177 } else { //否则使用本地模式提交拓扑 178 LocalCluster cluster = new LocalCluster(); 179 cluster.submitTopology(topologyName, config, builder.createTopology()); 180 Utils.sleep(1000*100); 181 cluster.killTopology(topologyName); 182 cluster.shutdown(); 183 } 184 185 } 186 } 187 188 依赖的jar包如下图: