继续学习storm,例子二
import java.util.Map;
import clojure.main;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LocalStormTopology {
public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/**
* 此方法只调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}
/**
* 死循环调用,心跳
*/
int i=0;
public void nextTuple() {
this.collector.emit(new Values(i++));
}
/**
* 声明输出内容
*/
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("num"));
}
}
public static class Sumbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
int sum = 0;
public void execute(Tuple input) {
// TODO Auto-generated method stub
Integer value = input.getIntegerByField("num");
sum+=value;
System.out.println("计算结果:"+sum);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// this.collector.emit(new Values(sum));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout_id", new DataSourceSpout());
topologyBuilder.setBolt("bolt_id", new Sumbolt()).shuffleGrouping("spout_id");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());
}
}