打成jar包放在主节点上去运行.
1 import java.util.Map; 2 3 import backtype.storm.Config; 4 import backtype.storm.StormSubmitter; 5 import backtype.storm.generated.AlreadyAliveException; 6 import backtype.storm.generated.InvalidTopologyException; 7 import backtype.storm.spout.SpoutOutputCollector; 8 import backtype.storm.task.OutputCollector; 9 import backtype.storm.task.TopologyContext; 10 import backtype.storm.topology.OutputFieldsDeclarer; 11 import backtype.storm.topology.TopologyBuilder; 12 import backtype.storm.topology.base.BaseRichBolt; 13 import backtype.storm.topology.base.BaseRichSpout; 14 import backtype.storm.tuple.Fields; 15 import backtype.storm.tuple.Tuple; 16 import backtype.storm.tuple.Values; 17 import backtype.storm.utils.Utils; 18 19 /** 20 * 在集群运行 21 * 数字累加求和 22 * 先添加storm依赖 23 * 24 * @author Administrator 25 * 26 */ 27 public class StormTopologySum { 28 29 30 /** 31 * spout需要继承baserichspout,实现未实现的方法 32 * @author Administrator 33 * 34 */ 35 public static class MySpout extends BaseRichSpout{ 36 private Map conf; 37 private TopologyContext context; 38 private SpoutOutputCollector collector; 39 40 /** 41 * 初始化方法,只会执行一次 42 * 在这里面可以写一个初始化的代码 43 * Map conf:其实里面保存的是topology的一些配置信息 44 * TopologyContext context:topology的上下文,类似于servletcontext 45 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple) 46 */ 47 @Override 48 public void open(Map conf, TopologyContext context, 49 SpoutOutputCollector collector) { 50 this.conf = conf; 51 this.context = context; 52 this.collector = collector; 53 } 54 55 int num = 1; 56 /** 57 * 这个方法是spout中最重要的方法, 58 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内 59 * 每调用一次,会向外发射一条数据 60 */ 61 @Override 62 public void nextTuple() { 63 System.out.println("spout发射:"+num); 64 //把数据封装到values中,称为一个tuple,发射出去 65 this.collector.emit(new Values(num++)); 66 Utils.sleep(1000); 67 } 68 69 /** 70 * 声明输出字段 71 */ 72 @Override 73 public void declareOutputFields(OutputFieldsDeclarer declarer) { 74 //给values中的数据起个名字,方便后面的bolt从这个values中取数据 75 //fields中定义的参数和values中传递的数值是一一对应的 76 declarer.declare(new Fields("num")); 77 } 78 79 } 80 81 82 /** 83 * 自定义bolt需要实现baserichbolt 84 * @author Administrator 85 * 86 */ 87 public static class MyBolt extends BaseRichBolt{ 88 private Map stormConf; 89 private TopologyContext context; 90 private OutputCollector collector; 91 92 /** 93 * 和spout中的open方法意义一样 94 */ 95 @Override 96 public void prepare(Map stormConf, TopologyContext context, 97 OutputCollector collector) { 98 this.stormConf = stormConf; 99 this.context = context; 100 this.collector = collector; 101 } 102 103 int sum = 0; 104 /** 105 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 106 */ 107 @Override 108 public void execute(Tuple input) { 109 //input.getInteger(0);//也可以根据角标获取tuple中的数据 110 Integer value = input.getIntegerByField("num"); 111 sum+=value; 112 System.out.println("和:"+sum); 113 } 114 115 /** 116 * 声明输出字段 117 */ 118 @Override 119 public void declareOutputFields(OutputFieldsDeclarer declarer) { 120 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 121 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 122 } 123 124 } 125 /** 126 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的 127 * @param args 128 */ 129 public static void main(String[] args) { 130 //组装topology 131 TopologyBuilder topologyBuilder = new TopologyBuilder(); 132 topologyBuilder.setSpout("spout1", new MySpout()); 133 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple 134 topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1"); 135 136 //创建本地storm集群 137 /*LocalCluster localCluster = new LocalCluster(); 138 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/ 139 140 //在集群运行 141 String simpleName = StormTopologySum.class.getSimpleName(); 142 try { 143 Config stormConf = new Config(); 144 //stormConf.setNumWorkers(2); 145 StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology()); 146 } catch (AlreadyAliveException e) { 147 e.printStackTrace(); 148 } catch (InvalidTopologyException e) { 149 e.printStackTrace(); 150 } 151 } 152 }