zoukankan      html  css  js  c++  java
  • storm定时任务【tick】

    一. 简介
         storm作为流计算,处理数据通常以数据驱动。即只有当spout发射数据才会进行计算。那么如果想要做定时任务如何处理哪,例如有的bolt需要输出一段时间统计的结果,这里一段时间可能是几秒、几分钟或者几小时。如果还是以数据进行驱动的话必然会输出时间不可确定。同样也可以启一个线程一段时间执行一次,这也是一种解决方案。但是我希望有种更优雅的解决方案,就是这里说的tick。tick是由storm框架进行计时,到达设定时间会发送一个特殊的tuple:ticktuple,此时处理定时任务就可以了。
    二. 代码
         如果是某一个bolt由定时需求的话,可以按照一下方式设置
    1. 继承BaseBasicBolt
    2. getComponentConfiguration方法设置发送ticktuple间隔时间(单位s)
    3. execute方法判断tuple类型,如果为ticktuple处理定时任务,如果不是处理其他任务。
    以下是wordCount中CountBolt代码,每5s输出各单词统计的数据。
     1 //继承 BaseBasicBolt
     2 public class CountTickBolt extends BaseBasicBolt {
     3     private static final Logger logger = LoggerFactory.getLogger(CountTickBolt.class);
     4     private Map<String, Integer> count;
     5     private Long time;
     6 
     7     @Override
     8     public Map<String, Object> getComponentConfiguration() {
     9         //设置发送ticktuple的时间间隔
    10         Config conf = new Config();
    11         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
    12         return conf;
    13     }
    14 
    15     @Override
    16     public void prepare(Map stormConf, TopologyContext context) {
    17         super.prepare(stormConf, context);
    18         count = new HashMap<String, Integer>();
    19         time = System.currentTimeMillis();
    20     }
    21 
    22     @Override
    23     public void cleanup() {
    24         super.cleanup();
    25     }
    26 
    27     @Override
    28     public void execute(Tuple input, BasicOutputCollector collector) {
    29         //判断是否为tickTuple
    30         if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
    31                 input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
    32             //是,处理定时任务
    33             Long nowTime = System.currentTimeMillis();
    34             Long useTime = nowTime - time;
    35             StringBuffer sb = new StringBuffer();
    36             sb.append("======== Use Time :" + useTime + "========
    ");
    37             for (Map.Entry wordCount : count.entrySet()){
    38                 sb.append(wordCount.getKey() + "------>" + wordCount.getValue() + "
    ");
    39             }
    40             Long nnTime = System.currentTimeMillis();
    41             logger.info(sb.toString() + (nnTime - nowTime) );
    42             time = nnTime;
    43         }else {
    44             //否,处理其他数据
    45             String word = input.getString(0);
    46             if (count.containsKey(word)){
    47                 int thisWordCont = count.get(word);
    48                 count.put(word, ++thisWordCont);
    49             }else {
    50                 count.put(word,1);
    51             }
    52         }
    53     }
    54 
    55     @Override
    56     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    57 
    58     }
    三. 总结
         以上是一个简单的介绍,需要说明的是由于设置时间间隔是秒级的,所以在处理时会有毫秒级的误差,通常是± 2ms。
      以下是没有介绍或者测试到的地方,在以后会补上。
    1. 如何设置此拓扑中所有的spout和bolt都定时处理。
    2. 由于是tuple类型数据,当正常tuple数据量过大时是否会造成tickTuple延时消费。
  • 相关阅读:
    angularjs下载地址
    Could not calculate build plan: Plugin org.apache.maven.plugins:maven-resources-plugin:2.6 or one of
    java.net.SocketException: No buffer space available 异常
    在ubuntu上安装svn+apache2
    MyBatis配置
    npm被墙解决方法
    XMPP_05_网络编程
    XMPP_04_环境安装(配置客户端)
    XMPP_03_环境安装(配置服务器)
    XMPP_02_环境安装(准备工作和配置数据库)
  • 原文地址:https://www.cnblogs.com/nayt/p/6942707.html
Copyright © 2011-2022 走看看