zoukankan      html  css  js  c++  java
  • Storm累计求和Demo并且在集群上运行

     打成jar包放在主节点上去运行.

      1 import java.util.Map;
      2 
      3 import backtype.storm.Config;
      4 import backtype.storm.StormSubmitter;
      5 import backtype.storm.generated.AlreadyAliveException;
      6 import backtype.storm.generated.InvalidTopologyException;
      7 import backtype.storm.spout.SpoutOutputCollector;
      8 import backtype.storm.task.OutputCollector;
      9 import backtype.storm.task.TopologyContext;
     10 import backtype.storm.topology.OutputFieldsDeclarer;
     11 import backtype.storm.topology.TopologyBuilder;
     12 import backtype.storm.topology.base.BaseRichBolt;
     13 import backtype.storm.topology.base.BaseRichSpout;
     14 import backtype.storm.tuple.Fields;
     15 import backtype.storm.tuple.Tuple;
     16 import backtype.storm.tuple.Values;
     17 import backtype.storm.utils.Utils;
     18 
     19 /**
     20  * 在集群运行
     21  * 数字累加求和
     22  * 先添加storm依赖
     23  * 
     24  * @author Administrator
     25  *
     26  */
     27 public class StormTopologySum {
     28     
     29     
     30     /**
     31      * spout需要继承baserichspout,实现未实现的方法
     32      * @author Administrator
     33      *
     34      */
     35     public static class MySpout extends BaseRichSpout{
     36         private Map conf;
     37         private TopologyContext context;
     38         private SpoutOutputCollector collector;
     39         
     40         /**
     41          * 初始化方法,只会执行一次
     42          * 在这里面可以写一个初始化的代码
     43          * Map conf:其实里面保存的是topology的一些配置信息
     44          * TopologyContext context:topology的上下文,类似于servletcontext
     45          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     46          */
     47         @Override
     48         public void open(Map conf, TopologyContext context,
     49                 SpoutOutputCollector collector) {
     50             this.conf = conf;
     51             this.context = context;
     52             this.collector = collector;
     53         }
     54 
     55         int num = 1;
     56         /**
     57          * 这个方法是spout中最重要的方法,
     58          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     59          * 每调用一次,会向外发射一条数据
     60          */
     61         @Override
     62         public void nextTuple() {
     63             System.out.println("spout发射:"+num);
     64             //把数据封装到values中,称为一个tuple,发射出去
     65             this.collector.emit(new Values(num++));
     66             Utils.sleep(1000);
     67         }
     68         
     69         /**
     70          * 声明输出字段
     71          */
     72         @Override
     73         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     74             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     75             //fields中定义的参数和values中传递的数值是一一对应的
     76             declarer.declare(new Fields("num"));
     77         }
     78         
     79     }
     80     
     81     
     82     /**
     83      * 自定义bolt需要实现baserichbolt
     84      * @author Administrator
     85      *
     86      */
     87     public static class MyBolt extends BaseRichBolt{
     88         private Map stormConf; 
     89         private TopologyContext context;
     90         private OutputCollector collector;
     91         
     92         /**
     93          * 和spout中的open方法意义一样
     94          */
     95         @Override
     96         public void prepare(Map stormConf, TopologyContext context,
     97                 OutputCollector collector) {
     98             this.stormConf = stormConf;
     99             this.context = context;
    100             this.collector = collector;
    101         }
    102 
    103         int sum = 0;
    104         /**
    105          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    106          */
    107         @Override
    108         public void execute(Tuple input) {
    109             //input.getInteger(0);//也可以根据角标获取tuple中的数据
    110             Integer value = input.getIntegerByField("num");
    111             sum+=value;
    112             System.out.println("和:"+sum);
    113         }
    114         
    115         /**
    116          * 声明输出字段
    117          */
    118         @Override
    119         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    120             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    121             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    122         }
    123         
    124     }
    125     /**
    126      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    127      * @param args
    128      */
    129     public static void main(String[] args) {
    130         //组装topology
    131         TopologyBuilder topologyBuilder = new TopologyBuilder();
    132         topologyBuilder.setSpout("spout1", new MySpout());
    133         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    134         topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1");
    135         
    136         //创建本地storm集群
    137         /*LocalCluster localCluster = new LocalCluster();
    138         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/
    139         
    140         //在集群运行
    141         String simpleName = StormTopologySum.class.getSimpleName();
    142         try {
    143             Config stormConf = new Config();
    144             //stormConf.setNumWorkers(2);
    145             StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology());
    146         } catch (AlreadyAliveException e) {
    147             e.printStackTrace();
    148         } catch (InvalidTopologyException e) {
    149             e.printStackTrace();
    150         }
    151     }
    152 }
  • 相关阅读:
    Postfix邮件
    RAID和LVM磁盘阵列
    CF1400G
    CF1400F
    2020 AC Saber夏季赛 游记
    APIO2018 题解
    2020北京中考游记
    初中数学几何推理大梳理
    CF1373F Network Coverage
    部编人教版初中历史书事件影响/意义汇总
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5786198.html
Copyright © 2011-2022 走看看