zoukankan      html  css  js  c++  java
  • 【转】apache storm 内置的定时机制

    原文: http://www.cnblogs.com/kqdongnanf/p/4778672.html

    ------------------------------------------------------------------------------------------------------------------------------------

    关于Storm tick

    1. tick的功能

    Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自__systemd的__tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理

    Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.1上测试。

    2. 在代码中使用tick及其作用

    在代码中如需使用tick,可以参照下面的方式:

    2.1. 为bolt设置tick

    若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。

    getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以”Topology.*”开头的此bolt特定的Config

     

    这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自__systemd的__tick stream的tick tuple,因此execute()方法可以实现如下:

     

    2.2. 为Topology全局设置tick

    若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:

     

    2.3. tick设置的优先级

    与Linux中的环境变量的优先级类似,storm中的tick也有优先级,即全局tick的作用域是全局bolt,但对每个bolt其优先级低于此bolt定义的tick。

    这个参数的名字TOPOLOGY_TICK_TUPLE_FREQ_SECS具有一定的迷惑性,一眼看上去应该是Topology全局的,但实际上每个bolt也可以自己定义。

    2.4. tick的精确度

    Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1ms左右。测试环境:3台虚拟机做supervisor,每台配置:4Cpu、16G内存、千兆网卡。

    3. storm tick的实现原理

    在bolt中的getComponentConfiguration()定义了该bolt的特定的配置后,storm框架会在TopologyBuilder.setBolt()方法中调用bolt的getComponentConfiguration()方法,从而设置该bolt的配置。

    调用路径为:TopologyBuilder.setBolt()

                -> TopologyBuilder.initCommon()

                -> getComponentConfiguration()

    4. 附件

    测试使用的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    package storm.starter;
     
    import backtype.storm.Config;
    import backtype.storm.Constants;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.task.ShellBolt;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import storm.starter.spout.RandomSentenceSpout;
     
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
     
     
    public class MyTickTestTopology {
     
      public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
         
         
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
           
          if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
             && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
              System.out.println("################################WorldCount bolt: "
                                     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
          }
          else{
              collector.emit(new Values("a"1));
          }
        }
     
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word""count"));
        }
         
        @Override
        public Map<String, Object> getComponentConfiguration() {
                Config conf = new Config();
                conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);
                return conf;
        }
      }
       
      public static class TickTest extends BaseBasicBolt{
           
           
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                // 收到的tuple是tick tuple
              if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
                 && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
                  System.out.println("################################TickTest bolt: "
                                      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
                }
                // 收到的tuple时正常的tuple
              else{
                  collector.emit(new Values("a"));
                }
               
            }
     
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("test"));
            }
             
            @Override
            public Map<String, Object> getComponentConfiguration() {
                    Config conf = new Config();
                    conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,20);
                    return conf;
            }
          }
     
      public static void main(String[] args) throws Exception {
     
        TopologyBuilder builder = new TopologyBuilder();
     
        builder.setSpout("spout"new RandomSentenceSpout(), 3);
        builder.setBolt("count"new WordCount(), 3).shuffleGrouping("spout");
        builder.setBolt("tickTest"new TickTest(), 3).shuffleGrouping("count");
     
        Config conf = new Config();
        conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
        conf.setDebug(false);
     
        if (args != null && args.length > 0) {
          conf.setNumWorkers(3);
          StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
          conf.setMaxTaskParallelism(3);
     
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("word-count", conf, builder.createTopology());
     
    //      Thread.sleep(10000);
    //      cluster.shutdown();
        }
      }
    }

      

    原创文章,转载请注明:转载自kqdongnanf-博客园;Email:kqdongnanf@yahoo.com。
  • 相关阅读:
    PHP为fopen,file_get_contents等函数请求web地址时增加Http头的方法
    php一些技术要点连接地址
    PHP之open_ssl
    加密解密知识 php非对称加密
    python摸爬滚打之day17----类与类之间的关系
    python摸爬滚打之day16----类的成员
    python摸爬滚打之day15----初识类
    python摸爬滚打之day14----内置函数,递归函数
    python摸爬滚打之day12----生成器, 各种生成式
    python摸爬滚打之day11----函数闭包,迭代器
  • 原文地址:https://www.cnblogs.com/oxspirt/p/8432960.html
Copyright © 2011-2022 走看看