本篇记录一下Storm的编程案例:将手机型号转换成大写,并且加上当前时间,再输出到文件。
1、所需jar包
解压安装包apache-storm-0.9.2-incubating.tar.gz,在apache-storm-0.9.2-incubating/lib下
2、创建一个Spout类
负责实时读取数据,然后发送给后续的bolt组件进行处理
package demo; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; //负责实时读取数据,然后发送给后续的bolt组件进行处理 public class DataSouceSpout extends BaseRichSpout { //模拟一些数据 private String[] phones = {"iphone","xiaomi","moto","sunsumg","mate","huawei","nokia"}; //得到上下文的信息 private SpoutOutputCollector collector; //消息的处理方法 public void nextTuple() { //模拟从外部读取数据 Utils.sleep(500); //随机得到了一个手机的型号 int index = new Random().nextInt(7); String phone = phones[index]; //发送给后续的bolt this.collector.emit(new Values(phone)); } //对这个spout进行初始化 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //指定spout组件发送出去的数据的key public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("phone-name")); } }
3、创建一个Bolt类
将Spout发送过来的数据,转成大写
package demo; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //将Spout发送过来的数据,转成大写 public class MyBoltA extends BaseBasicBolt { //业务的处理逻辑方法 public void execute(Tuple tuple, BasicOutputCollector collector) { //从spout组件中获取数据 //方式一: //String phone = tuple.getStringByField("phone-name"); //方式二 String phone = tuple.getString(0); //处理数据 String upperPhone = phone.toUpperCase(); //将数据发送给下一个组件继续处理 collector.emit(new Values(upperPhone)); } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("upperphone")); } }
4、创建另一个Bolt类
将BoltA发送过来的数据,加上时间,并且写到文件中
package demo; import java.io.FileWriter; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; //将BoltA发送过来的数据,加上时间,并且写到文件中 public class MyBoltB extends BaseBasicBolt { private FileWriter fw = null; public void prepare(Map stormConf, TopologyContext context) { //对FileWriter进行初始化 try { fw = new FileWriter("/usr/local/test/storm/mystormoutput.txt"); } catch (Exception e) { e.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //将boltA发送过来的数据,加上当前的时间 //获取数据 String upperPhoneName = tuple.getString(0); //业务逻辑 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String result = upperPhoneName +" " + df.format(new Date()); //将数据输出到文件系统中 try { fw.write(result + " "); fw.flush(); } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { //这是最后一个bolt组件 } }
5、创建一个组装类
组装各个组件,并且提交任务到Storm集群
package demo; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; //组装各个组件,并且提交任务到Storm集群 public class SubmitClient { public static void main(String[] args) throws Exception { //得到一个topology的构造器 TopologyBuilder builder = new TopologyBuilder(); //指定spout builder.setSpout("datasource-spout", new DataSouceSpout()); //指定Bolt组件,还需要指定数据的来源 builder.setBolt("boltA", new MyBoltA()).shuffleGrouping("datasource-spout"); builder.setBolt("boltB", new MyBoltB()).shuffleGrouping("boltA"); //生成一个具体的任务 StormTopology phoneTopo = builder.createTopology(); //指明任务的一些参数 Config config = new Config(); //希望storm集群分配6个worker来执行任务 config.setNumWorkers(6); //提交任务 StormSubmitter.submitTopology("mystormdemo", config, phoneTopo); } }
6、文件打包,发送服务器
将这四个文件打成 stormDemo.jar,并且上传到Storm的服务器,临时存放在 /usr/local/test/storm
7、运行程序
首先启动Zookeeper和Storm,然后执行以下命令提交任务
[root@localhost apache-storm-0.9.2-incubating]# bin/storm jar /usr/local/test/storm/stormDemo.jar demo.SubmitClient
如下图所示:
8、查看结果
[root@localhost storm]# tail -f /usr/local/test/storm/mystormoutput.txt
程序执行成功,文件一输出到 mystormoutput.txt