zoukankan      html  css  js  c++  java
  • Storm中的定时任务

    1.全局定时器

      1 import java.util.Map;
      2 
      3 import backtype.storm.Config;
      4 import backtype.storm.Constants;
      5 import backtype.storm.LocalCluster;
      6 import backtype.storm.spout.SpoutOutputCollector;
      7 import backtype.storm.task.OutputCollector;
      8 import backtype.storm.task.TopologyContext;
      9 import backtype.storm.topology.OutputFieldsDeclarer;
     10 import backtype.storm.topology.TopologyBuilder;
     11 import backtype.storm.topology.base.BaseRichBolt;
     12 import backtype.storm.topology.base.BaseRichSpout;
     13 import backtype.storm.tuple.Fields;
     14 import backtype.storm.tuple.Tuple;
     15 import backtype.storm.tuple.Values;
     16 import backtype.storm.utils.Utils;
     17 
     18 /**
     19  * 全局定时器
     20  * 
     21  * 数字累加求和
     22  * 先添加storm依赖
     23  * 
     24  * @author Administrator
     25  *
     26  */
     27 public class LocalTopologySumTimer1 {
     28     
     29     
     30     /**
     31      * spout需要继承baserichspout,实现未实现的方法
     32      * @author Administrator
     33      *
     34      */
     35     public static class MySpout extends BaseRichSpout{
     36         private Map conf;
     37         private TopologyContext context;
     38         private SpoutOutputCollector collector;
     39         
     40         /**
     41          * 初始化方法,只会执行一次
     42          * 在这里面可以写一个初始化的代码
     43          * Map conf:其实里面保存的是topology的一些配置信息
     44          * TopologyContext context:topology的上下文,类似于servletcontext
     45          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     46          */
     47         @Override
     48         public void open(Map conf, TopologyContext context,
     49                 SpoutOutputCollector collector) {
     50             this.conf = conf;
     51             this.context = context;
     52             this.collector = collector;
     53         }
     54 
     55         int num = 1;
     56         /**
     57          * 这个方法是spout中最重要的方法,
     58          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     59          * 每调用一次,会向外发射一条数据
     60          */
     61         @Override
     62         public void nextTuple() {
     63             System.out.println("spout发射:"+num);
     64             //把数据封装到values中,称为一个tuple,发射出去
     65             this.collector.emit(new Values(num++));
     66             Utils.sleep(1000);
     67         }
     68         
     69         /**
     70          * 声明输出字段
     71          */
     72         @Override
     73         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     74             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     75             //fields中定义的参数和values中传递的数值是一一对应的
     76             declarer.declare(new Fields("num"));
     77         }
     78         
     79     }
     80     
     81     
     82     /**
     83      * 自定义bolt需要实现baserichbolt
     84      * @author Administrator
     85      *
     86      */
     87     public static class MyBolt extends BaseRichBolt{
     88         private Map stormConf; 
     89         private TopologyContext context;
     90         private OutputCollector collector;
     91         
     92         /**
     93          * 和spout中的open方法意义一样
     94          */
     95         @Override
     96         public void prepare(Map stormConf, TopologyContext context,
     97                 OutputCollector collector) {
     98             this.stormConf = stormConf;
     99             this.context = context;
    100             this.collector = collector;
    101         }
    102 
    103         int sum = 0;
    104         /**
    105          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    106          */
    107         @Override
    108         public void execute(Tuple input) {
    109             if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
    110                 //如果满足,就说明这个tuple是系统几倍的组件发送的,也就意味着定时时间到了
    111                 System.out.println("定时任务执行了。");
    112                 
    113             }else{//这个地方必须要做判断,否则让系统级别的tuple去取"num"会取不到报错的.
    114                 //这个地方的逻辑可以将产生的数据封装成一个map或者是list放在内存中.到达定时任务的时候取出来,使用batch批处理向数据库中操作.
    115                 //然后再把集合中的数据清空...之后再添加.
    116                 
    117                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
    118                 Integer value = input.getIntegerByField("num");
    119                 sum+=value;
    120                 System.out.println("和:"+sum);
    121             }
    122             
    123         }
    124         
    125         /**
    126          * 声明输出字段
    127          */
    128         @Override
    129         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    130             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    131             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    132         }
    133         
    134     }
    135     /**
    136      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    137      * @param args
    138      */
    139     public static void main(String[] args) {
    140         //组装topology
    141         TopologyBuilder topologyBuilder = new TopologyBuilder();
    142         topologyBuilder.setSpout("spout1", new MySpout());
    143         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    144         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
    145         
    146         //创建本地storm集群
    147         LocalCluster localCluster = new LocalCluster();
    148         Config config = new Config();
    149         //下面这样设置就是一个全局的定时任务  还有局部的定时任务.
    150         config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//表示每隔5秒storm会给Topology中的所有bolt发射一个系统级别的tuple
    151         //前面的单词计数的例子 我们可能只需要在最后一个CountBolt中做定时任务  SpiltBolt中不需要做定时任务  但是两个Bolt中都可以收到这个系统级别的tuple
    152         //所以需要每个Bolt中都做判断...SplitBolt可以加上一个判断  没有方法体...if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){  }
    153         //否则会出错...从系统级别的tuple取你定义的值 取不到 报错.
    154         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
    155         
    156     }
    157     
    158 
    159 }

     局部定时器

      1 /**
      2  * 局部定时器
      3  * 
      4  * 数字累加求和
      5  * 先添加storm依赖
      6  * 
      7  * @author Administrator
      8  *
      9  */
     10 public class LocalTopologySumTimer2 {
     11     
     12     
     13     /**
     14      * spout需要继承baserichspout,实现未实现的方法
     15      * @author Administrator
     16      *
     17      */
     18     public static class MySpout extends BaseRichSpout{
     19         private Map conf;
     20         private TopologyContext context;
     21         private SpoutOutputCollector collector;
     22         
     23         /**
     24          * 初始化方法,只会执行一次
     25          * 在这里面可以写一个初始化的代码
     26          * Map conf:其实里面保存的是topology的一些配置信息
     27          * TopologyContext context:topology的上下文,类似于servletcontext
     28          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     29          */
     30         @Override
     31         public void open(Map conf, TopologyContext context,
     32                 SpoutOutputCollector collector) {
     33             this.conf = conf;
     34             this.context = context;
     35             this.collector = collector;
     36         }
     37 
     38         int num = 1;
     39         /**
     40          * 这个方法是spout中最重要的方法,
     41          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     42          * 每调用一次,会向外发射一条数据
     43          */
     44         @Override
     45         public void nextTuple() {
     46             System.out.println("spout发射:"+num);
     47             //把数据封装到values中,称为一个tuple,发射出去
     48             this.collector.emit(new Values(num++));
     49             Utils.sleep(1000);
     50         }
     51         
     52         /**
     53          * 声明输出字段
     54          */
     55         @Override
     56         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     57             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     58             //fields中定义的参数和values中传递的数值是一一对应的
     59             declarer.declare(new Fields("num"));
     60         }
     61         
     62     }
     63     
     64     
     65     /**
     66      * 自定义bolt需要实现baserichbolt
     67      * @author Administrator
     68      *
     69      */
     70     public static class MyBolt extends BaseRichBolt{
     71         private Map stormConf; 
     72         private TopologyContext context;
     73         private OutputCollector collector;
     74         
     75         /**
     76          * 和spout中的open方法意义一样
     77          */
     78         @Override
     79         public void prepare(Map stormConf, TopologyContext context,
     80                 OutputCollector collector) {
     81             this.stormConf = stormConf;
     82             this.context = context;
     83             this.collector = collector;
     84         }
     85 
     86         int sum = 0;
     87         /**
     88          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
     89          */
     90         @Override
     91         public void execute(Tuple input) {
     92             if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
     93                 //如果满足,就说明这个tuple是系统几倍的组件发送的,也就意味着定时时间到了
     94                 System.out.println("定时任务执行了。");
     95                 
     96             }else{
     97                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
     98                 Integer value = input.getIntegerByField("num");
     99                 sum+=value;
    100                 System.out.println("和:"+sum);
    101             }
    102             
    103         }
    104         
    105         /**
    106          * 声明输出字段
    107          */
    108         @Override
    109         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    110             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    111             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    112         }
    113 
    114         /**
    115          * 局部定时任务  
    116          * 只针对当前的bolt  对其他的bolt中没有影响 
    117          * 加对系统级别tuple的判断只需要在当前bolt中判断就可以...其他bolt不需要..
    118          * 这种在工作中最常用....
    119          * 全局定时任务在 main方法中 设置  局部的定时任务只需要在Bolt类中覆盖getComponentConfiguration()方法
    120          * 这个还是比较有用,有意思的
    121          */
    122         @Override
    123         public Map<String, Object> getComponentConfiguration() {
    124             HashMap<String, Object> hashMap = new HashMap<String, Object>();
    125             hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
    126             return hashMap;
    127         }
    128     }
    129     /**
    130      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    131      * @param args
    132      */
    133     public static void main(String[] args) {
    134         //组装topology
    135         TopologyBuilder topologyBuilder = new TopologyBuilder();
    136         topologyBuilder.setSpout("spout1", new MySpout());
    137         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    138         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
    139         
    140         //创建本地storm集群
    141         LocalCluster localCluster = new LocalCluster();
    142         Config config = new Config();
    143         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
    144         
    145     }
    146     
    147 
    148 }
  • 相关阅读:
    springboot使用MockMvc测试controller
    MySQL5.6版本之后设置DATETIME类型自动更新
    MAVEN打包时跳过Junit测试
    没看这篇干货,别和我说你会IDEA Debug
    java通过HtmlUnit工具和J4L实现模拟带验证码登录
    Vue+Java实现在页面树形展示文件目录
    exceptions: django2.2/ mysql ImproperlyConfigured: mysqlclient 1.3.13 or newer is required; you have 0.9.3
    linux 软连接的使用
    Linux pip命令报错 -bash: pip: command not found
    MySQL使用命令导出/导入数据
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6671496.html
Copyright © 2011-2022 走看看