Storm是一种分布式流式处理技术,这一点和Spark与Hadoop的批处理特性有明显的区别。
在数据连续产生,响应时延要求较低的场景中,Storm具有Spark不可比拟的优势。
网络性能监控系统中,Storm可以在秒级Dashboard监控,分钟级告警监控中大显生手。
学习任何技术,首先从Hello Wold开始,Storm也不例外,下面代码实现了这样一个例子:
(1) NamesReader Spout读取一行名字字符串,发送给NameSpliter;
(2) NameSpliter Bolt按照空格分割名字字符串,每个名字发送给HelloWorld;
(3) HelloWorld Bolt打印Hello world + 名字。
注:Spout是Storm有向网络的起始节点,Bolt是Storm有向网络的其他节点。数据在Storm有向网络中流动,节点则可以对流经的数据进行处理。
1、 名字字符串读取Spout
package com.coshaho.learn.storm; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; /** * * NamesReaderSpout.java Create on 2017年6月4日 下午10:57:32 * * 类功能说明: 读取名字列表并派发 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ public class NamesReaderSpout implements IRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { List<Object> list = new ArrayList<Object>(); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); list.add("刘备 关羽 张飞"); // 第一个参数是传递的业务数据,第二个参数是消息标识,用于追踪消息是否正确处理 this.collector.emit(list, "stream"); list.clear(); list.add("曹操 郭嘉 荀彧"); this.collector.emit(list, "stream"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 必须设置,否则topo启动失败,names对应传递消息第一个元素,即list(0) declarer.declare(new Fields("names")); } public void close() { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void ack(Object msgId) { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
2、 名字字符串分割Bolt
package com.coshaho.learn.storm; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; /** * * NamesSpliterBolt.java Create on 2017年6月4日 下午10:58:08 * * 类功能说明: 名字列表按空格分割 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ public class NamesSpliterBolt implements IRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { // 打印线程号用于追踪Storm的分配策略 Thread current = Thread.currentThread(); String names = input.getString(0); System.out.println("准备拆分" + names + "。当前线程号是" + current.getId() + "。"); List<Tuple> inputList = new ArrayList<Tuple>(); inputList.add(input); String[] nameArray = names.split(" "); for(String name : nameArray) { List<Object> splitList = new ArrayList<Object>(); splitList.add(name); collector.emit(inputList, splitList); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 必须设置,否则topo启动失败 declarer.declare(new Fields("name")); } public void cleanup() { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
3、 HelloWorld Bolt
package com.coshaho.learn.storm; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; /** * * HelloWorldBolt.java Create on 2017年6月4日 下午10:58:26 * * 类功能说明: Storm Hello World * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ public class HelloWorldBolt implements IRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { // 打印线程号用于追踪Storm的分配策略 Thread current = Thread.currentThread(); String name = input.getString(0); System.out.println("你好," + name + "。欢迎来到Storm世界。当前线程号是" + current.getId() + "。"); collector.ack(input); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
4、 Storm TOPO网络任务启动
package com.coshaho.learn.storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class StormTest { public static void main(String[] args) throws InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("names-reader", new NamesReaderSpout()); // 启动两个名字分割Task,名字列表随机分配给一个Task builder.setBolt("names-spliter", new NamesSpliterBolt(), 2) .shuffleGrouping("names-reader"); // 启动两个Hello World Task,相同名字发送到同一个Task builder.setBolt("hello-world", new HelloWorldBolt(), 2) .fieldsGrouping("names-spliter", new Fields("name")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-test", conf, builder.createTopology()); } }
5、 执行结果
准备拆分刘备 关羽 张飞。当前线程号是85。
你好,刘备。欢迎来到Storm世界。当前线程号是79。
你好,关羽。欢迎来到Storm世界。当前线程号是81。
你好,张飞。欢迎来到Storm世界。当前线程号是81。
准备拆分曹操 郭嘉 荀彧。当前线程号是87。
你好,荀彧。欢迎来到Storm世界。当前线程号是79。
你好,曹操。欢迎来到Storm世界。当前线程号是81。
你好,郭嘉。欢迎来到Storm世界。当前线程号是81。
准备拆分刘备 关羽 张飞。当前线程号是87。
准备拆分曹操 郭嘉 荀彧。当前线程号是85。
你好,荀彧。欢迎来到Storm世界。当前线程号是79。
你好,曹操。欢迎来到Storm世界。当前线程号是81。
你好,郭嘉。欢迎来到Storm世界。当前线程号是81。
你好,刘备。欢迎来到Storm世界。当前线程号是79。
你好,关羽。欢迎来到Storm世界。当前线程号是81。
你好,张飞。欢迎来到Storm世界。当前线程号是81。
6、 maven依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.2-incubating</version> </dependency>