strom hello word
概述
然后卡一下代码怎么实现的:
- 编写数据源类:Spout。可以使用两种方式:
继承BaseRichSpout类
实现IRichSpout接口
主要需要实现或重写几个方法:open、nextTuple、declareOutputFields - 继续编写数据处理类:Bolt。可以使用两种方式:
继承BaseBasicBolt类
实现IRichBolt接口
终点实现或重写几个方法:execute、declareOutputFields - 最后编写主函数(Topology)去进行提交一个任务
在使用Topology的时候,Storm框架为我们提供了两种模式:本地模式和集群模式
本地模式:(无需Storm集群,直接在java中即可运行,一般用于测试和开发阶段)执行main函数即可
集群模式:(需要Storm集群,把实现java程序打包,然后Topology进行提交)需要把应用打成jar,使用Storm命令吧Topology提交到集群中去。
实际操作
先来看一下代码结构:
就如上图所说,数据从PWSpout流到PrintBolt,最后到WriteBolt写到文件。具体看一下这几个类的代码:
先看一本地模式的:
PWTopology1.java 拓扑结构构建
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import bhz.bolt.PrintBolt; import bhz.bolt.WriteBolt; import bhz.spout.PWSpout; public class PWTopology1 { public static void main(String[] args) throws Exception { // Config cfg = new Config(); cfg.setNumWorkers(2); cfg.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new PWSpout()); builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout"); builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt"); //1 本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("top1", cfg, builder.createTopology()); Thread.sleep(10000); cluster.killTopology("top1"); cluster.shutdown(); //2 集群模式 // StormSubmitter.submitTopology("top1", cfg, builder.createTopology()); } }
代码分析:
数据来源:
import java.util.HashMap; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class PWSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private static final Map<Integer, String> map = new HashMap<Integer, String>(); static { map.put(0, "java"); map.put(1, "php"); map.put(2, "groovy"); map.put(3, "python"); map.put(4, "ruby"); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //对spout进行初始化 this.collector = collector; //System.out.println(this.collector); } /** * <B>方法名称:</B>轮询tuple<BR> * <B>概要说明:</B><BR> * @see backtype.storm.spout.ISpout#nextTuple() */ @Override public void nextTuple() { //随机发送一个单词 final Random r = new Random(); int num = r.nextInt(5); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } this.collector.emit(new Values(map.get(num))); } /** * <B>方法名称:</B>declarer声明发送数据的field<BR> * <B>概要说明:</B><BR> * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer) */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //进行声明 declarer.declare(new Fields("print")); } }
代码解析:
整体结构
细入分析
---------------------------- open 方法---------------------------------------------------------
--------------------------------- nextTuple方法 --------------------------------------------------------------
---------------------------- declareOutputFields方法 ----------------------------------------------------
数据处理
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class PrintBolt extends BaseBasicBolt { private static final Log log = LogFactory.getLog(PrintBolt.class); private static final long serialVersionUID = 1L; @Override public void execute(Tuple input, BasicOutputCollector collector) { //获取上一个组件所声明的Field String print = input.getStringByField("print"); log.info("【print】: " + print); //System.out.println("Name of input word is : " + word); //进行传递给下一个bolt collector.emit(new Values(print)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("write")); } }
代码分析
import java.io.FileWriter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import clojure.main; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class WriteBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; private static final Log log = LogFactory.getLog(WriteBolt.class); private FileWriter writer ; @Override public void execute(Tuple input, BasicOutputCollector collector) { //获取上一个组件所声明的Field String text = input.getStringByField("write"); try { if(writer == null){ if(System.getProperty("os.name").equals("Windows 10")){ writer = new FileWriter("D:\099_test\" + this); } else if(System.getProperty("os.name").equals("Windows 8.1")){ writer = new FileWriter("D:\099_test\" + this); } else if(System.getProperty("os.name").equals("Windows 7")){ writer = new FileWriter("D:\099_test\" + this); } else if(System.getProperty("os.name").equals("Linux")){ System.out.println("----:" + System.getProperty("os.name")); writer = new FileWriter("/usr/local/temp/" + this); } } log.info("【write】: 写入文件"); writer.write(text); writer.write(" "); writer.flush(); } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
和PrintBolt 这个类很相似,都是在处理数据。不作过多解释