zoukankan      html  css  js  c++  java
  • 在Storm的Toplogy中设置多数据源Spout

    上代码:主要看main方法中的设置.   如下代码是一般情况下的设置方法...Trident中设置多数据源看对应的博客总结

      1 /**
      2  * 指定多个数据源
      3  * 数字累加求和
      4  * 先添加storm依赖
      5  */
      6 public class LocalTopologyMeger {
      7     /**
      8      * spout需要继承baserichspout,实现未实现的方法
      9      * @author Administrator
     10      *
     11      */
     12     public static class MySpout extends BaseRichSpout{
     13         private Map conf;
     14         private TopologyContext context;
     15         private SpoutOutputCollector collector;
     16         
     17         /**
     18          * 初始化方法,只会执行一次
     19          * 在这里面可以写一个初始化的代码
     20          * Map conf:其实里面保存的是topology的一些配置信息
     21          * TopologyContext context:topology的上下文,类似于servletcontext
     22          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     23          */
     24         @Override
     25         public void open(Map conf, TopologyContext context,
     26                 SpoutOutputCollector collector) {
     27             this.conf = conf;
     28             this.context = context;
     29             this.collector = collector;
     30         }
     31 
     32         int num = 1;
     33         /**
     34          * 这个方法是spout中最重要的方法,
     35          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     36          * 每调用一次,会向外发射一条数据
     37          */
     38         @Override
     39         public void nextTuple() {
     40             System.out.println("spout发射:"+num);
     41             //把数据封装到values中,称为一个tuple,发射出去
     42             this.collector.emit(new Values(num++));
     43             Utils.sleep(1000);
     44         }
     45         
     46         /**
     47          * 声明输出字段
     48          */
     49         @Override
     50         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     51             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     52             //fields中定义的参数和values中传递的数值是一一对应的
     53             declarer.declare(new Fields("num"));
     54         }
     55         
     56     }
     57     
     58     
     59     /**
     60      * 自定义bolt需要实现baserichbolt
     61      * @author Administrator
     62      *
     63      */
     64     public static class MyBolt extends BaseRichBolt{
     65         private Map stormConf; 
     66         private TopologyContext context;
     67         private OutputCollector collector;
     68         
     69         /**
     70          * 和spout中的open方法意义一样
     71          */
     72         @Override
     73         public void prepare(Map stormConf, TopologyContext context,
     74                 OutputCollector collector) {
     75             this.stormConf = stormConf;
     76             this.context = context;
     77             this.collector = collector;
     78         }
     79 
     80         int sum = 0;
     81         /**
     82          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
     83          */
     84         @Override
     85         public void execute(Tuple input) {
     86             //input.getInteger(0);//也可以根据角标获取tuple中的数据
     87             Integer value = input.getIntegerByField("num");
     88             sum+=value;
     89             System.out.println("和:"+sum);
     90         }
     91         
     92         /**
     93          * 声明输出字段
     94          */
     95         @Override
     96         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     97             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
     98             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
     99         }
    100         
    101     }
    102     /**
    103      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    104      * @param args
    105      */
    106     public static void main(String[] args) {
    107         //组装topology
    108         TopologyBuilder topologyBuilder = new TopologyBuilder();
    109         topologyBuilder.setSpout("spout1", new MySpout());
    110         topologyBuilder.setSpout("spout2", new MySpout());
    111         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    112         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1").shuffleGrouping("spout2");
    113         
    114         //创建本地storm集群
    115         LocalCluster localCluster = new LocalCluster();
    116         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
    117     }
    118 }
  • 相关阅读:
    个人号微信机器人接口
    js tree 根据子节点找到所有父节点
    大数据分析之纳税人画像-实现和优化思路
    前后端分离项目安全漏洞修复总结
    多租户&多账户&多公众号_saas微信公众平台设计思路
    java7 try-with-resources 很香
    java7 异常处理增强
    java7 try-with-resources 很香
    mysql 按分数段,每个专业分数段统计人数
    一文看懂奈奎斯特定理和香农定理
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6675974.html
Copyright © 2011-2022 走看看