spout ----拆分 bolt ---合并 bolt
spout
package cn.ljh.storm.wordcount; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; //采集数据: Spout组件 public class WordCountSpout extends BaseRichSpout{ //模拟产生一些数据 private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"}; //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件 private SpoutOutputCollector collector; @Override public void nextTuple() { //每隔3秒 采集一次数据 Utils.sleep(3000); //由Storm的引擎调用,用于处理采集的每条数据 //生成一个3以内的随机数 int random = (new Random()).nextInt(3); String value = data[random]; //打印 System.out.println("采集的数据是:" + value); //发送给下一个组件 this.collector.emit(new Values(value)); } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件 //在open方法中对collector初始化 this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { // 申明发送给下一个组建的tuple的schema(结构) declare.declare(new Fields("sentence")); } }
拆分bolt
package cn.ljh.storm.wordcount; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; //第一个bolt组件:单词拆分 public class WordCountSplitBolt extends BaseRichBolt{ //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件 private OutputCollector collector; @Override public void execute(Tuple tuple) { //如何处理上一级发来的数据: I love Beijing // String value = tuple.getStringByField("sentence"); String value =tuple.getString(0); //分词 String[] words = value.split(" "); //输出 for(String w:words){ collector.emit(new Values(w,1)); } } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { // 初始化 //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件 this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { // 申明发送给下一个组建的tuple的schema(结构) declare.declare(new Fields("word","count")); } }
计数bolt
package cn.ljh.storm.wordcount; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; //第二个Bolt组件:单词的计数 public class WordCountTotalBolt extends BaseRichBolt{ //使用Map集合存储结果 private Map<String, Integer> result = new HashMap<>(); //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件 private OutputCollector collector; @Override public void execute(Tuple tuple) { //取出数据 String word = tuple.getStringByField("word"); int count = tuple.getIntegerByField("count"); //求和 if(result.containsKey(word)){ //如果已经存在,累加 int total = result.get(word); result.put(word, total+count); }else{ //这是一个新单词 result.put(word, count); } //输出到屏幕 System.out.println("统计的结果是:" + result); //输出给下一个组件 单词 总频率 this.collector.emit(new Values(word,result.get(word))); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("word","total")); } }
拓扑链接
package cn.ljh.storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //指定任务的spout组件 builder.setSpout("mywordcountspout", new WordCountSpout()); //指定任务的第一个bolt组件 builder.setBolt("mywordcountsplit", new WordCountSplitBolt()) .shuffleGrouping("mywordcountspout");//随机分组 //指定任务的第二个bolt组件 builder.setBolt("mywordcounttotal", new WordCountTotalBolt()) .fieldsGrouping("mywordcountsplit", new Fields("word")); //创建任务 StormTopology job = builder.createTopology(); Config conf = new Config(); //任务有两种运行方式:1、本地模式 2、集群模式 //1、本地模式 LocalCluster localcluster = new LocalCluster(); localcluster.submitTopology("MyWordCount", conf, job); //2、集群模式:用于打包jar,并放到storm运行 // StormSubmitter.submitTopology(args[0], conf, job); } }