1 import java.io.File; 2 import java.io.IOException; 3 import java.util.Collection; 4 import java.util.HashMap; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.Map.Entry; 8 9 import org.apache.commons.io.FileUtils; 10 11 import backtype.storm.Config; 12 import backtype.storm.LocalCluster; 13 import backtype.storm.spout.SpoutOutputCollector; 14 import backtype.storm.task.OutputCollector; 15 import backtype.storm.task.TopologyContext; 16 import backtype.storm.topology.OutputFieldsDeclarer; 17 import backtype.storm.topology.TopologyBuilder; 18 import backtype.storm.topology.base.BaseRichBolt; 19 import backtype.storm.topology.base.BaseRichSpout; 20 import backtype.storm.tuple.Fields; 21 import backtype.storm.tuple.Tuple; 22 import backtype.storm.tuple.Values; 23 import backtype.storm.utils.Utils; 24 25 /** 26 * 单词计数 27 * 监控d:\test目录下面的文件,统计单词出现的总次数 28 * 当有新文件出现的时候,也要能解析出来 29 * 30 * @author Administrator 31 * 32 */ 33 public class LocalTopologyWordCount { 34 35 36 /** 37 * spout需要继承baserichspout,实现未实现的方法 38 * @author Administrator 39 * 40 */ 41 public static class DataSourceSpout extends BaseRichSpout{ 42 private Map conf; 43 private TopologyContext context; 44 private SpoutOutputCollector collector; 45 46 /** 47 * 初始化方法,只会执行一次 48 * 在这里面可以写一个初始化的代码 49 * Map conf:其实里面保存的是topology的一些配置信息 50 * TopologyContext context:topology的上下文,类似于servletcontext 51 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple) 52 */ 53 @Override 54 public void open(Map conf, TopologyContext context, 55 SpoutOutputCollector collector) { 56 this.conf = conf; 57 this.context = context; 58 this.collector = collector; 59 } 60 61 /** 62 * 这个方法是spout中最重要的方法, 63 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内 64 * 每调用一次,会向外发射一条数据 65 */ 66 @Override 67 public void nextTuple() { 68 //获取指定目录下面的新文件, 69 Collection<File> listFiles = FileUtils.listFiles(new File("d:\test"), new String[]{"txt"}, true); 70 //分别读取每个文件 71 for (File file : listFiles) { 72 try { 73 List<String> readLines = FileUtils.readLines(file); 74 for (String line : readLines) { 75 //把每一行封装成一个tuple,发射出去 76 this.collector.emit(new Values(line)); 77 } 78 FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));//给文件该名字,否则会一直处理这个文件. 79 } catch (IOException e) { 80 e.printStackTrace(); 81 } 82 } 83 } 84 85 /** 86 * 声明输出字段 87 */ 88 @Override 89 public void declareOutputFields(OutputFieldsDeclarer declarer) { 90 //给values中的数据起个名字,方便后面的bolt从这个values中取数据 91 //fields中定义的参数和values中传递的数值是一一对应的 92 declarer.declare(new Fields("line")); 93 } 94 95 } 96 97 98 /** 99 * 自定义bolt需要实现baserichbolt 100 * @author Administrator 101 * 102 */ 103 public static class SplitBolt extends BaseRichBolt{ 104 private Map stormConf; 105 private TopologyContext context; 106 private OutputCollector collector; 107 108 /** 109 * 和spout中的open方法意义一样 110 */ 111 @Override 112 public void prepare(Map stormConf, TopologyContext context, 113 OutputCollector collector) { 114 this.stormConf = stormConf; 115 this.context = context; 116 this.collector = collector; 117 } 118 119 /** 120 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 121 */ 122 @Override 123 public void execute(Tuple input) { 124 //获取每一行数据进行切割 125 String line = input.getStringByField("line"); 126 String[] splits = line.split(" "); 127 //把切割出来的单词一个一个发射出去 128 for (String word : splits) { 129 this.collector.emit(new Values(word)); 130 } 131 132 } 133 134 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 135 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 136 /** 137 * 声明输出字段 138 */ 139 @Override 140 public void declareOutputFields(OutputFieldsDeclarer declarer) { 141 declarer.declare(new Fields("word")); 142 } 143 144 } 145 146 147 148 /** 149 * 自定义bolt需要实现baserichbolt 150 * @author Administrator 151 * 152 */ 153 public static class CountBolt extends BaseRichBolt{ 154 private Map stormConf; 155 private TopologyContext context; 156 private OutputCollector collector; 157 158 /** 159 * 和spout中的open方法意义一样 160 */ 161 @Override 162 public void prepare(Map stormConf, TopologyContext context, 163 OutputCollector collector) { 164 this.stormConf = stormConf; 165 this.context = context; 166 this.collector = collector; 167 } 168 169 HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); 170 /** 171 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 172 */ 173 @Override 174 public void execute(Tuple input) { 175 //获取每一个单词 176 String word = input.getStringByField("word"); 177 //在map中进行统计 178 Integer integer = hashMap.get(word); 179 if(integer==null){ 180 integer=0; 181 } 182 integer++; 183 hashMap.put(word, integer); 184 //把这个统计结果打印到控制台 185 Utils.sleep(1000); 186 System.out.println("========================================="); 187 for (Entry<String, Integer> entry : hashMap.entrySet()) { 188 System.out.println(entry); 189 } 190 } 191 192 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 193 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 194 /** 195 * 声明输出字段 196 */ 197 @Override 198 public void declareOutputFields(OutputFieldsDeclarer declarer) { 199 } 200 201 } 202 /** 203 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的 204 * @param args 205 */ 206 public static void main(String[] args) { 207 //组装topology 208 TopologyBuilder topologyBuilder = new TopologyBuilder(); 209 topologyBuilder.setSpout("spout1", new DataSourceSpout()); 210 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple 211 topologyBuilder.setBolt("bolt1", new SplitBolt()).shuffleGrouping("spout1"); 212 topologyBuilder.setBolt("bolt2", new CountBolt()).shuffleGrouping("bolt1"); 213 214 //创建本地storm集群 215 LocalCluster localCluster = new LocalCluster(); 216 localCluster.submitTopology("wordCountTopology", new Config(), topologyBuilder.createTopology()); 217 } 218 }