1 import java.util.Map; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.spout.SpoutOutputCollector; 6 import backtype.storm.task.OutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.TopologyBuilder; 10 import backtype.storm.topology.base.BaseRichBolt; 11 import backtype.storm.topology.base.BaseRichSpout; 12 import backtype.storm.tuple.Fields; 13 import backtype.storm.tuple.Tuple; 14 import backtype.storm.tuple.Values; 15 import backtype.storm.utils.Utils; 16 17 /** 18 * 数字累加求和 19 * 先添加storm依赖 20 * 21 * @author Administrator 22 * 23 */ 24 public class LocalTopologySum { 25 26 27 /** 28 * spout需要继承baserichspout,实现未实现的方法 29 * @author Administrator 30 * 31 */ 32 public static class MySpout extends BaseRichSpout{ 33 private Map conf; 34 private TopologyContext context; 35 private SpoutOutputCollector collector; 36 37 /** 38 * 初始化方法,只会执行一次 39 * 在这里面可以写一个初始化的代码 40 * Map conf:其实里面保存的是topology的一些配置信息 41 * TopologyContext context:topology的上下文,类似于servletcontext 42 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple) 43 */ 44 @Override 45 public void open(Map conf, TopologyContext context, 46 SpoutOutputCollector collector) { 47 this.conf = conf; 48 this.context = context; 49 this.collector = collector; 50 } 51 52 int num = 1; 53 /** 54 * 这个方法是spout中最重要的方法, 55 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内 56 * 每调用一次,会向外发射一条数据 57 */ 58 @Override 59 public void nextTuple() { 60 System.out.println("spout发射:"+num); 61 //把数据封装到values中,称为一个tuple,发射出去 62 this.collector.emit(new Values(num++)); 63 Utils.sleep(1000); 64 } 65 66 /** 67 * 声明输出字段 68 */ 69 @Override 70 public void declareOutputFields(OutputFieldsDeclarer declarer) { 71 //给values中的数据起个名字,方便后面的bolt从这个values中取数据 72 //fields中定义的参数和values中传递的数值是一一对应的 73 declarer.declare(new Fields("num")); 74 } 75 76 } 77 78 79 /** 80 * 自定义bolt需要实现baserichbolt 81 * @author Administrator 82 * 83 */ 84 public static class MyBolt extends BaseRichBolt{ 85 private Map stormConf; 86 private TopologyContext context; 87 private OutputCollector collector; 88 89 /** 90 * 和spout中的open方法意义一样 91 */ 92 @Override 93 public void prepare(Map stormConf, TopologyContext context, 94 OutputCollector collector) { 95 this.stormConf = stormConf; 96 this.context = context; 97 this.collector = collector; 98 } 99 100 int sum = 0; 101 /** 102 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 103 */ 104 @Override 105 public void execute(Tuple input) { 106 //input.getInteger(0);//也可以根据角标获取tuple中的数据 107 Integer value = input.getIntegerByField("num"); 108 sum+=value; 109 System.out.println("和:"+sum); 110 } 111 112 /** 113 * 声明输出字段 114 */ 115 @Override 116 public void declareOutputFields(OutputFieldsDeclarer declarer) { 117 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 118 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 119 } 120 121 } 122 /** 123 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的 124 * @param args 125 */ 126 public static void main(String[] args) { 127 //组装topology 128 TopologyBuilder topologyBuilder = new TopologyBuilder(); 129 topologyBuilder.setSpout("spout1", new MySpout()); 130 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple 131 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1"); 132 133 //创建本地storm集群 134 LocalCluster localCluster = new LocalCluster(); 135 Config config = new Config(); 136 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology()); 137 } 138 139 }