1. Storm编程模型概要
消息源spout, 继承BaseRichSpout类 或 实现IRichSpout接口
1)BaseRichSpout类相对比较简单,需要覆写的方法较少,满足基本业务需求
2)IRichSpout接口,需要实现较多的框架定义好的抽象方法,部分方法和BaseRichSpout重叠,通常用于比较复杂的业务数据接入需求
重要方法
open方法: 初始化动作(传入参数,传入上下文),是spout第一个执行的方法,主类中的拓扑配置实例conf会将相关参数作为实参传入给open
nextTuple方法: 数据接入,消息tuple发射 (spout中最重要的方法)
ack方法:传入参数是object,本质是id,标识唯一一个tuple, 实现该tuple被spout成功处理后的逻辑,storm消息保证的API接口
fail方法:与ack方法对应,实现tuple处理失败后的逻辑, storm消息保证的API接口
declareOutputFields方法:声明输出字段,storm中的每个tuple都要求设置输出字段
处理单元bolt: 继承BaseBasicBolt类,或者实现IRichBolt接口
重要方法
*prepare方法:worker初始化,传递参数,声明上下文
*execute方法:接收1个tuple, 实现bolt处理逻辑,发射新tuple(最重要)
*clearup方法:worker关闭前调用,资源释放等,但不能保证该方法一定会被调用(worker进程出现问题等情况)
*declareOutputFields方法:声明输出tuple的每个字段,storm中的每个tuple都要求设置输出字段
2. wordcount代码架构
从拓扑的角度来看各个类间的关系
各个类主要功能
1. spout中随机发送内置的语句作为消息源
2. 第一组bolt进行语句切分,分离出单词,然后发送给下游bolt
3. 第二组bolt,基于字段分组策略,订阅(接收)切分好的单词,完成单词统计并将结果发送给下游
4. 最后1个bolt, 将结果打印到console
3. RandomSentenceSpout
import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; /** * * @Author: shayzhang * @Email: shayzhang@sina.com * @Blog: cn.blogs.com/shay-zhangjin * @Describe: select 1 random sentence and send to next-level bolt using field streaming groups * */ public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector spcollector; //for emit tuple Random random; //spout initialize, spout执行的第一个方法,主要是从conf自动传入spout发送数据的SpoutOutputCollector public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { spcollector = spoutOutputCollector; random = new Random(); } //generate tuple and emit public void nextTuple() { //每2秒发送一次tuple Utils.sleep(2000); String[] sentences = new String[]{ "And if the golden sun", "four score and seven years ago", "storm hadoop hbase spark", "storm is a real-time processing frame-work", "you make my night bright" }; // select random sentence String msg = sentences[random.nextInt(sentences.length)]; // emit tuple, doesn't assign message id spcollector.emit(new Values(msg.trim().toLowerCase()) ); // ack tuple, successfully sent } //每个tuple都要说明字段 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence") ); } }
4. WordNormalizserBolt
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; /** * * @author: shayzhang * @email: shayzhang@sina.com * @blog: cn.blogs.com/shay-zhangjin * @description: split sentence into words and emit * */ public class WordNormalizerBolt implements IRichBolt { private OutputCollector wncollector; //bolt执行的第一个方法,主类中conf对象,将相关数据推送过来,重点是获取Bolt发送数据使用的OutputCollector public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { wncollector = outputCollector; } public void execute(Tuple tuple) { //获取整个句子 String sentence = tuple.getString(0); //分离出单词 String words[] = sentence.split(" "); //发送每个单词 for(String word: words){ wncollector.emit(new Values(word)); wncollector.ack(tuple); } } //Bolt退出会执行,但不能保证cleanup方法一定会被执行到(节点故障等) public void cleanup() { } //字段说明 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); //当下游bolt通过fieldsGrouping方式获取数据时尤为重要,要保障上下游使用的Fields能够对的上 } public Map<String, Object> getComponentConfiguration() { return null; } }
5. WordCountBolt
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * * @author: shayzhang * @email: shayzhang@sina.com * @blog: cn.blogs.com/shay-zhangjin * @description: count word number, emit top N * */ public class WordCountBolt implements IRichBolt{ OutputCollector collector; //for emit Map<String,Integer> counters; //for word count //init bolt, bolt执行的第一个方法,主类中的config对象自动进行参数传递 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { collector = outputCollector; counters = new HashMap<String, Integer>(); } //统计每个单词的次数,并排序 public void execute(Tuple tuple) { String word = tuple.getString(0); //处理收到的每一个word if ( !counters.containsKey(word)){ counters.put(word,1); }else{ counters.put(word, counters.get(word)+1); } //实时输出 <word, count> Integer length = counters.keySet().size(); Integer count = 0; for(String key:counters.keySet()){ word = "[" + new String(key) + ": " + String.valueOf(counters.get(key)) + "]"; collector.emit(new Values(word)); collector.ack(tuple); count++; if(count==length){ break; } }//execute }//execute public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("wordcount")); } public Map<String, Object> getComponentConfiguration() { return null; } }
6. PrintBolt
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import java.util.Map; public class PrintBolt extends BaseBasicBolt{ OutputCollector collector; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { collector = outputCollector; } public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { String msg = tuple.getString(0); if (msg != null) { System.out.println(msg); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("result")); } }
7. 主类:构建拓扑,根据是否传入参数决定运行本地模式、集群模式
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; public class WordCountTopology { public static void main(String args[]) throws AlreadyAliveException, InvalidTopologyException { //Topology配置 Config config = new Config(); //Topology构造器 TopologyBuilder builder = new TopologyBuilder(); //使用builder构建Topology //spout, 2个executor builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2); //bolt, 2个executor, 从id为RandomSentence的spout接收tuple, shuffle方式 builder.setBolt("WordNormalizer", new WordNormalizerBolt(), 2).shuffleGrouping("RandomSentence"); //从id为“RandomSentence”的spout获取数据 //bolt, 2个executor,从id为wordNormalizer的bolt接收tuple, fields方式: 根据‘word’字段, 进行单词接收 builder.setBolt("wordcount", new WordCountBolt(), 2).fieldsGrouping("WordNormalizer", new Fields("word")); //从id为WordNormalizser的bolt获取数据,并且按照word域进行分发 //bolt, 1个executor, 收集所有结果并打印, global方式接收所有id为wordcount的bolt发出的信息 builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("wordcount"); //从id为wordcount的bolt获取数据 //设置debug, true则打印节点间交换的所有信息 config.setDebug(false); //通过是否有参数,来控制启动本地模式,集群模式 if(args!=null && args.length>0){ //有提交参数,执行集群模式 config.setNumWorkers(1); //1个worker进程 try { //提交拓扑到storm集群, 拓扑名通过提交参数给定(args[0]) StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else{ //无提交参数,执行本地模式,调测用 //本地模式,通过LocalCluster实例提交拓扑任务 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WC_Local", config, builder.createTopology()); //运行5分钟后,kill该topology任务 Utils.sleep(300000); cluster.killTopology("WC_Local"); cluster.shutdown(); } }//main }//WordCountTopology
8. 本地模式运行
在IDEA中选择主类,直接运行即可, 从IDE终端可以看到代码是否正常运行