zoukankan      html  css  js  c++  java
  • Storm中的定时任务

    1.全局定时器

      1 import java.util.Map;
      2 
      3 import backtype.storm.Config;
      4 import backtype.storm.Constants;
      5 import backtype.storm.LocalCluster;
      6 import backtype.storm.spout.SpoutOutputCollector;
      7 import backtype.storm.task.OutputCollector;
      8 import backtype.storm.task.TopologyContext;
      9 import backtype.storm.topology.OutputFieldsDeclarer;
     10 import backtype.storm.topology.TopologyBuilder;
     11 import backtype.storm.topology.base.BaseRichBolt;
     12 import backtype.storm.topology.base.BaseRichSpout;
     13 import backtype.storm.tuple.Fields;
     14 import backtype.storm.tuple.Tuple;
     15 import backtype.storm.tuple.Values;
     16 import backtype.storm.utils.Utils;
     17 
     18 /**
     19  * 全局定时器
     20  * 
     21  * 数字累加求和
     22  * 先添加storm依赖
     23  * 
     24  * @author Administrator
     25  *
     26  */
     27 public class LocalTopologySumTimer1 {
     28     
     29     
     30     /**
     31      * spout需要继承baserichspout,实现未实现的方法
     32      * @author Administrator
     33      *
     34      */
     35     public static class MySpout extends BaseRichSpout{
     36         private Map conf;
     37         private TopologyContext context;
     38         private SpoutOutputCollector collector;
     39         
     40         /**
     41          * 初始化方法,只会执行一次
     42          * 在这里面可以写一个初始化的代码
     43          * Map conf:其实里面保存的是topology的一些配置信息
     44          * TopologyContext context:topology的上下文,类似于servletcontext
     45          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     46          */
     47         @Override
     48         public void open(Map conf, TopologyContext context,
     49                 SpoutOutputCollector collector) {
     50             this.conf = conf;
     51             this.context = context;
     52             this.collector = collector;
     53         }
     54 
     55         int num = 1;
     56         /**
     57          * 这个方法是spout中最重要的方法,
     58          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     59          * 每调用一次,会向外发射一条数据
     60          */
     61         @Override
     62         public void nextTuple() {
     63             System.out.println("spout发射:"+num);
     64             //把数据封装到values中,称为一个tuple,发射出去
     65             this.collector.emit(new Values(num++));
     66             Utils.sleep(1000);
     67         }
     68         
     69         /**
     70          * 声明输出字段
     71          */
     72         @Override
     73         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     74             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     75             //fields中定义的参数和values中传递的数值是一一对应的
     76             declarer.declare(new Fields("num"));
     77         }
     78         
     79     }
     80     
     81     
     82     /**
     83      * 自定义bolt需要实现baserichbolt
     84      * @author Administrator
     85      *
     86      */
     87     public static class MyBolt extends BaseRichBolt{
     88         private Map stormConf; 
     89         private TopologyContext context;
     90         private OutputCollector collector;
     91         
     92         /**
     93          * 和spout中的open方法意义一样
     94          */
     95         @Override
     96         public void prepare(Map stormConf, TopologyContext context,
     97                 OutputCollector collector) {
     98             this.stormConf = stormConf;
     99             this.context = context;
    100             this.collector = collector;
    101         }
    102 
    103         int sum = 0;
    104         /**
    105          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    106          */
    107         @Override
    108         public void execute(Tuple input) {
    109             if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
    110                 //如果满足,就说明这个tuple是系统几倍的组件发送的,也就意味着定时时间到了
    111                 System.out.println("定时任务执行了。");
    112                 
    113             }else{//这个地方必须要做判断,否则让系统级别的tuple去取"num"会取不到报错的.
    114                 //这个地方的逻辑可以将产生的数据封装成一个map或者是list放在内存中.到达定时任务的时候取出来,使用batch批处理向数据库中操作.
    115                 //然后再把集合中的数据清空...之后再添加.
    116                 
    117                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
    118                 Integer value = input.getIntegerByField("num");
    119                 sum+=value;
    120                 System.out.println("和:"+sum);
    121             }
    122             
    123         }
    124         
    125         /**
    126          * 声明输出字段
    127          */
    128         @Override
    129         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    130             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    131             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    132         }
    133         
    134     }
    135     /**
    136      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    137      * @param args
    138      */
    139     public static void main(String[] args) {
    140         //组装topology
    141         TopologyBuilder topologyBuilder = new TopologyBuilder();
    142         topologyBuilder.setSpout("spout1", new MySpout());
    143         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    144         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
    145         
    146         //创建本地storm集群
    147         LocalCluster localCluster = new LocalCluster();
    148         Config config = new Config();
    149         //下面这样设置就是一个全局的定时任务  还有局部的定时任务.
    150         config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//表示每隔5秒storm会给Topology中的所有bolt发射一个系统级别的tuple
    151         //前面的单词计数的例子 我们可能只需要在最后一个CountBolt中做定时任务  SpiltBolt中不需要做定时任务  但是两个Bolt中都可以收到这个系统级别的tuple
    152         //所以需要每个Bolt中都做判断...SplitBolt可以加上一个判断  没有方法体...if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){  }
    153         //否则会出错...从系统级别的tuple取你定义的值 取不到 报错.
    154         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
    155         
    156     }
    157     
    158 
    159 }

     局部定时器

      1 /**
      2  * 局部定时器
      3  * 
      4  * 数字累加求和
      5  * 先添加storm依赖
      6  * 
      7  * @author Administrator
      8  *
      9  */
     10 public class LocalTopologySumTimer2 {
     11     
     12     
     13     /**
     14      * spout需要继承baserichspout,实现未实现的方法
     15      * @author Administrator
     16      *
     17      */
     18     public static class MySpout extends BaseRichSpout{
     19         private Map conf;
     20         private TopologyContext context;
     21         private SpoutOutputCollector collector;
     22         
     23         /**
     24          * 初始化方法,只会执行一次
     25          * 在这里面可以写一个初始化的代码
     26          * Map conf:其实里面保存的是topology的一些配置信息
     27          * TopologyContext context:topology的上下文,类似于servletcontext
     28          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     29          */
     30         @Override
     31         public void open(Map conf, TopologyContext context,
     32                 SpoutOutputCollector collector) {
     33             this.conf = conf;
     34             this.context = context;
     35             this.collector = collector;
     36         }
     37 
     38         int num = 1;
     39         /**
     40          * 这个方法是spout中最重要的方法,
     41          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     42          * 每调用一次,会向外发射一条数据
     43          */
     44         @Override
     45         public void nextTuple() {
     46             System.out.println("spout发射:"+num);
     47             //把数据封装到values中,称为一个tuple,发射出去
     48             this.collector.emit(new Values(num++));
     49             Utils.sleep(1000);
     50         }
     51         
     52         /**
     53          * 声明输出字段
     54          */
     55         @Override
     56         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     57             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     58             //fields中定义的参数和values中传递的数值是一一对应的
     59             declarer.declare(new Fields("num"));
     60         }
     61         
     62     }
     63     
     64     
     65     /**
     66      * 自定义bolt需要实现baserichbolt
     67      * @author Administrator
     68      *
     69      */
     70     public static class MyBolt extends BaseRichBolt{
     71         private Map stormConf; 
     72         private TopologyContext context;
     73         private OutputCollector collector;
     74         
     75         /**
     76          * 和spout中的open方法意义一样
     77          */
     78         @Override
     79         public void prepare(Map stormConf, TopologyContext context,
     80                 OutputCollector collector) {
     81             this.stormConf = stormConf;
     82             this.context = context;
     83             this.collector = collector;
     84         }
     85 
     86         int sum = 0;
     87         /**
     88          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
     89          */
     90         @Override
     91         public void execute(Tuple input) {
     92             if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
     93                 //如果满足,就说明这个tuple是系统几倍的组件发送的,也就意味着定时时间到了
     94                 System.out.println("定时任务执行了。");
     95                 
     96             }else{
     97                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
     98                 Integer value = input.getIntegerByField("num");
     99                 sum+=value;
    100                 System.out.println("和:"+sum);
    101             }
    102             
    103         }
    104         
    105         /**
    106          * 声明输出字段
    107          */
    108         @Override
    109         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    110             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    111             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    112         }
    113 
    114         /**
    115          * 局部定时任务  
    116          * 只针对当前的bolt  对其他的bolt中没有影响 
    117          * 加对系统级别tuple的判断只需要在当前bolt中判断就可以...其他bolt不需要..
    118          * 这种在工作中最常用....
    119          * 全局定时任务在 main方法中 设置  局部的定时任务只需要在Bolt类中覆盖getComponentConfiguration()方法
    120          * 这个还是比较有用,有意思的
    121          */
    122         @Override
    123         public Map<String, Object> getComponentConfiguration() {
    124             HashMap<String, Object> hashMap = new HashMap<String, Object>();
    125             hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
    126             return hashMap;
    127         }
    128     }
    129     /**
    130      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    131      * @param args
    132      */
    133     public static void main(String[] args) {
    134         //组装topology
    135         TopologyBuilder topologyBuilder = new TopologyBuilder();
    136         topologyBuilder.setSpout("spout1", new MySpout());
    137         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    138         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
    139         
    140         //创建本地storm集群
    141         LocalCluster localCluster = new LocalCluster();
    142         Config config = new Config();
    143         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
    144         
    145     }
    146     
    147 
    148 }
  • 相关阅读:
    16. 3Sum Closest
    17. Letter Combinations of a Phone Number
    20. Valid Parentheses
    77. Combinations
    80. Remove Duplicates from Sorted Array II
    82. Remove Duplicates from Sorted List II
    88. Merge Sorted Array
    257. Binary Tree Paths
    225. Implement Stack using Queues
    113. Path Sum II
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6671496.html
Copyright © 2011-2022 走看看