一、开发
* 假定是用IDEA工具开发,这里实现的是上面(2)类型的2层Bolt实例,Spout -> Bolt1 -> Bolt2
1.创建Maven项目
项目名是StormProcessor,包名是com.clotho.storm。后面运行命令时会用到。
2.配置Maven
在pom.xml的<dependencies>和</dependencies>中间加入以下内容:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>2.3.0</version> <scope>provided</scope> <!--<scope>${provided.scope}</scope>--> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-client</artifactId> <version>2.3.0</version> </dependency>
* 如果pom.xml有其它Hadoop组件的引用,运行时有可能会报错。例如:
“Detected both log4j-over-slf4j.jar AND bound slf4j-log4j12.jar on the class path”
遇到这种情况,有2个选择:
a.独立一个项目开发Storm程序
b.exclude掉引发异常的依赖项(在pom.xml编辑界面:右键 -> Maven -> Show Dependencies -> 点击Show Conflicts/Duplicates按钮查看)
3.Spout类
可以实现IRichSpout接口,也可以扩展BaseRichSpout类,BaseRichSpout类也是实现IRichSpout接口
Spout一般是负责获取数据或生成数据,并发送给下一层Bolt处理。
public class TextSpout extends BaseRichSpout
{
private SpoutOutputCollector collector;
private static final String field = "text";
private int count = 0;
private String[] message = {
"这是 第 1 条 信息 ~ 统计",
"这是 第 2 条 信息 ~ 统计 测试",
"这是 第 3 条 信息 ~ 测试"
};
/**
* 在Spout初始化时被调用
*
* @param map
* @param context
* @param collector
*/
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector)
{
System.out.println("open:" + map.get("test"));
this.collector = collector;
}
/**
* Spout实现的核心方法,会被循环调用
*/
@Override
public void nextTuple()
{
if (count < message.length)
{
System.out.println("第" + count + "次开始发送数据..");
collector.emit(new Values(message[count]));
}
count++;
}
/**
* 声明要输出的Tuple的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
System.out.println("定义格式...");
declarer.declare(new Fields(field));
}
}
4.第1层的Bolt类
可以实现IRichBolt接口,也可以扩展BaseRichBolt类,BaseRichBolt类也是实现IRichBolt接口
这层Bolt是负责把前面传过来的文本进行空格分词,并传递给下一层Bolt
public class SplitBolt extends BaseRichBolt
{
private OutputCollector collector;
private static final String field1 = "text";
private static final String field2 = "count";
/**
* 在Bolt启动前执行,提供Bolt启动环境配置的入口
*/
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector)
{
System.out.println("prepare:" + map.get("test"));
this.collector = collector;
}
/**
* Bolt实现的核心方法
*/
@Override
public void execute(Tuple tuple)
{
String message = tuple.getStringByField(field1);
System.out.println("开始分割单词:" + message);
String[] words = message.toLowerCase().split(" ");
for (String word : words)
{
collector.emit(new Values(word)); //向下一个Bolt发送数据
}
}
/**
* 声明数据格式
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields(field2));
}
/**
* Storm在终止一个Bolt之前会调用这个方法
*/
@Override
public void cleanup()
{
System.out.println("TestBolt的资源释放");
}
}
5.第2层的Bolt类
这层Bolt是负责对上一层传来的词进行统计
public class StatBolt extends BaseRichBolt
{
private static final String field = "count";
private int count = 1;
/**
* 保存单词和对应的计数
*/
private HashMap<String, Integer> counts = null;
/**
* @param map
* @param context
* @param collector
*/
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector)
{
System.out.println("prepare:" + map.get("test"));
counts = new HashMap<String, Integer>();
}
/**
* @param tuple
*/
@Override
public void execute(Tuple tuple)
{
String message = tuple.getStringByField(field);
System.out.println("第" + count + "次统计单词出现的次数");
if (!counts.containsKey(message))
{
counts.put(message, 1);
}
else
{
counts.put(message, counts.get(message) + 1);
}
count++;
}
/**
*
*/
@Override
public void cleanup()
{
System.out.println("===========开始显示单词数量============");
for (Map.Entry<String, Integer> entry : counts.entrySet())
{
System.out.println(entry.getKey() + ": " + entry.getValue());
}
System.out.println("===========结束============");
System.out.println("Test2Bolt的资源释放");
}
/**
* 声明数据格式
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
}
}
6.Topology类
负责绑定Spout和Bolt1、Bolt2,并提交给Storm
public class CountBolt extends BaseRichBolt
{
private static final String field = "count";
private int count = 1;
/**
* 保存单词和对应的计数
*/
private HashMap<String, Integer> counts = null;
/**
* @param map
* @param context
* @param collector
*/
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector)
{
System.out.println("prepare:" + map.get("test"));
counts = new HashMap<String, Integer>();
}
/**
* @param tuple
*/
@Override
public void execute(Tuple tuple)
{
String message = tuple.getStringByField(field);
System.out.println("第" + count + "次统计单词出现的次数");
if (!counts.containsKey(message))
{
counts.put(message, 1);
}
else
{
counts.put(message, counts.get(message) + 1);
}
count++;
}
/**
*
*/
@Override
public void cleanup()
{
System.out.println("===========开始显示单词数量============");
for (Map.Entry<String, Integer> entry : counts.entrySet())
{
System.out.println(entry.getKey() + ": " + entry.getValue());
}
System.out.println("===========结束============");
}
/**
* 声明数据格式
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
}
}
二、启动
1.编译成jar文件并上传到服务器
假定项目在E:\Code\StormProcess\目录
cd e:\Code\StormProcess
mvn package
打包后,在E:\Code\StormProcess\target\目录下会有StormProcess-1.0.jar文件,把文件上传到服务器
2.用命令提交到Storm执行
(1) 有2种方式
a.使用代码中的命名作为拓扑名
storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology
b.运行时在命令中指定拓扑名
storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology WordCount
(2) 验证
storm list
* 也可以在Storm UI查看
(3) 删除拓扑(Topology)
storm kill StormProcess
* 也可以在Storm UI删除,进入具体拓扑(Topology),点击Topology actions的kill按钮
附录:
Storm的原理和机制
https://www.cnblogs.com/live41/p/15560493.html
Storm的安装与部署