package com.csylh.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* Description:使用Storm实现累加求和操作
*
*/
public class LocalSumStormTopology {
/**
* Spout需要继承BaseRichSpout(是一个抽象类)
* 数据源需要产生数据并发射到Bolt
*/
public static class DataSourceSpout extends BaseRichSpout{
//定义一个发射器
private SpoutOutputCollector collector;
/**
* 初始化方法 只是会被调用一次
* @param conf 配置参数
* @param context 上下文:相当于一个框 可以从里面获取许多东西
* @param collector 数据发射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//将传入的collector发射器 对私有变量 进行赋初值
this.collector = collector;
}
int number = 0;
/**
* 用于产生数据
* 生产中肯定是从消息队列中获取数据,
* 例如从kafka获取数据,将获取的数据发射到Bolt进行处理
* 这个方法是一个死循环,会一直不停的执行
*/
@Override
public void nextTuple() {
//发送方式,调用上面定义的数据发射器
this.collector.emit(new Values(++number));
System.out.println("Spout==》发送的数据:" + number);
//每隔1s中发射一次,防止数据产生太快
Utils.sleep(1000);
}
/**
* 该方法用于定义 : 发射的数据 由哪一个Bolt处理
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
}
/**
* Bolt需要继承BaseRichBolt(抽象类)
* 用于接收数据并对数据进行处理
*/
public static class SumBolt extends BaseRichBolt{
/**
* 初始化方法 ,会被执行一次
* @param stormConf
* @param context
* @param collector 这里的数据发射器,由于业务逻辑中没有没有必要进行放下发的操作,所以就是没有必要进行new一个
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//由于这里不需要发射数据出去,所以是不需要定义发射器的
}
int sum = 0;
/**
* 也是一个死循环 ,职责: 获取Spout发射过来的数据
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
//Bolt中获取值可以通过index获取
// 也可以根据上一个环节中定义的filed的名称获取(***推荐)
//根据字段名称,进行获取tuple的获取
Integer value = tuple.getIntegerByField("num");
sum += value;
System.out.println("Bolt :Sum = ["+ sum + "]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
//Topology根据Spout和Bolt来构建Topology
//Storm中的任何一个作业都是通过Topology方式进行提交的
//Topology中需要指定Spout和Bolt的执行顺序
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout());
//通过shuffleGrouping 进行顺序关联
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
//创建一个本地的Storm集群 ,本地模式运行,不需要搭建Storm集群,方便测试,但是和集群提交的代码 不一样
LocalCluster localCluster = new LocalCluster();
//参数一:类名,参数二:new Config(),参数三:builder.createTopology()
localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
}
}