zoukankan      html  css  js  c++  java
  • storm定时任务【tick】

    一. 简介
         storm作为流计算,处理数据通常以数据驱动。即只有当spout发射数据才会进行计算。那么如果想要做定时任务如何处理哪,例如有的bolt需要输出一段时间统计的结果,这里一段时间可能是几秒、几分钟或者几小时。如果还是以数据进行驱动的话必然会输出时间不可确定。同样也可以启一个线程一段时间执行一次,这也是一种解决方案。但是我希望有种更优雅的解决方案,就是这里说的tick。tick是由storm框架进行计时,到达设定时间会发送一个特殊的tuple:ticktuple,此时处理定时任务就可以了。
    二. 代码
         如果是某一个bolt由定时需求的话,可以按照一下方式设置
    1. 继承BaseBasicBolt
    2. getComponentConfiguration方法设置发送ticktuple间隔时间(单位s)
    3. execute方法判断tuple类型,如果为ticktuple处理定时任务,如果不是处理其他任务。
    以下是wordCount中CountBolt代码,每5s输出各单词统计的数据。
     1 //继承 BaseBasicBolt
     2 public class CountTickBolt extends BaseBasicBolt {
     3     private static final Logger logger = LoggerFactory.getLogger(CountTickBolt.class);
     4     private Map<String, Integer> count;
     5     private Long time;
     6 
     7     @Override
     8     public Map<String, Object> getComponentConfiguration() {
     9         //设置发送ticktuple的时间间隔
    10         Config conf = new Config();
    11         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
    12         return conf;
    13     }
    14 
    15     @Override
    16     public void prepare(Map stormConf, TopologyContext context) {
    17         super.prepare(stormConf, context);
    18         count = new HashMap<String, Integer>();
    19         time = System.currentTimeMillis();
    20     }
    21 
    22     @Override
    23     public void cleanup() {
    24         super.cleanup();
    25     }
    26 
    27     @Override
    28     public void execute(Tuple input, BasicOutputCollector collector) {
    29         //判断是否为tickTuple
    30         if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
    31                 input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
    32             //是,处理定时任务
    33             Long nowTime = System.currentTimeMillis();
    34             Long useTime = nowTime - time;
    35             StringBuffer sb = new StringBuffer();
    36             sb.append("======== Use Time :" + useTime + "========
    ");
    37             for (Map.Entry wordCount : count.entrySet()){
    38                 sb.append(wordCount.getKey() + "------>" + wordCount.getValue() + "
    ");
    39             }
    40             Long nnTime = System.currentTimeMillis();
    41             logger.info(sb.toString() + (nnTime - nowTime) );
    42             time = nnTime;
    43         }else {
    44             //否,处理其他数据
    45             String word = input.getString(0);
    46             if (count.containsKey(word)){
    47                 int thisWordCont = count.get(word);
    48                 count.put(word, ++thisWordCont);
    49             }else {
    50                 count.put(word,1);
    51             }
    52         }
    53     }
    54 
    55     @Override
    56     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    57 
    58     }
    三. 总结
         以上是一个简单的介绍,需要说明的是由于设置时间间隔是秒级的,所以在处理时会有毫秒级的误差,通常是± 2ms。
      以下是没有介绍或者测试到的地方,在以后会补上。
    1. 如何设置此拓扑中所有的spout和bolt都定时处理。
    2. 由于是tuple类型数据,当正常tuple数据量过大时是否会造成tickTuple延时消费。
  • 相关阅读:
    out/host/linuxx86/obj/EXECUTABLES/aapt_intermediates/aapt 64 32 操作系统
    linux 查看路由器 电脑主机 端口号 占用
    linux proc进程 pid stat statm status id 目录 解析 内存使用
    linux vim 设置大全详解
    ubuntu subclipse svn no libsvnjavahl1 in java.library.path no svnjavahl1 in java.library.path no s
    win7 安装 ubuntu 双系统 详解 easybcd 工具 不能进入 ubuntu 界面
    Atitit.json xml 序列化循环引用解决方案json
    Atitit.编程语言and 自然语言的比较and 编程语言未来的发展
    Atitit.跨语言  文件夹与文件的io操作集合  草案
    Atitit.atijson 类库的新特性设计与实现 v3 q31
  • 原文地址:https://www.cnblogs.com/nayt/p/6942707.html
Copyright © 2011-2022 走看看