一. 简介
storm作为流计算,处理数据通常以数据驱动。即只有当spout发射数据才会进行计算。那么如果想要做定时任务如何处理哪,例如有的bolt需要输出一段时间统计的结果,这里一段时间可能是几秒、几分钟或者几小时。如果还是以数据进行驱动的话必然会输出时间不可确定。同样也可以启一个线程一段时间执行一次,这也是一种解决方案。但是我希望有种更优雅的解决方案,就是这里说的tick。tick是由storm框架进行计时,到达设定时间会发送一个特殊的tuple:ticktuple,此时处理定时任务就可以了。
二. 代码
如果是某一个bolt由定时需求的话,可以按照一下方式设置
- 继承BaseBasicBolt
- getComponentConfiguration方法设置发送ticktuple间隔时间(单位s)
- execute方法判断tuple类型,如果为ticktuple处理定时任务,如果不是处理其他任务。
以下是wordCount中CountBolt代码,每5s输出各单词统计的数据。
1 //继承 BaseBasicBolt 2 public class CountTickBolt extends BaseBasicBolt { 3 private static final Logger logger = LoggerFactory.getLogger(CountTickBolt.class); 4 private Map<String, Integer> count; 5 private Long time; 6 7 @Override 8 public Map<String, Object> getComponentConfiguration() { 9 //设置发送ticktuple的时间间隔 10 Config conf = new Config(); 11 conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); 12 return conf; 13 } 14 15 @Override 16 public void prepare(Map stormConf, TopologyContext context) { 17 super.prepare(stormConf, context); 18 count = new HashMap<String, Integer>(); 19 time = System.currentTimeMillis(); 20 } 21 22 @Override 23 public void cleanup() { 24 super.cleanup(); 25 } 26 27 @Override 28 public void execute(Tuple input, BasicOutputCollector collector) { 29 //判断是否为tickTuple 30 if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && 31 input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ 32 //是,处理定时任务 33 Long nowTime = System.currentTimeMillis(); 34 Long useTime = nowTime - time; 35 StringBuffer sb = new StringBuffer(); 36 sb.append("======== Use Time :" + useTime + "======== "); 37 for (Map.Entry wordCount : count.entrySet()){ 38 sb.append(wordCount.getKey() + "------>" + wordCount.getValue() + " "); 39 } 40 Long nnTime = System.currentTimeMillis(); 41 logger.info(sb.toString() + (nnTime - nowTime) ); 42 time = nnTime; 43 }else { 44 //否,处理其他数据 45 String word = input.getString(0); 46 if (count.containsKey(word)){ 47 int thisWordCont = count.get(word); 48 count.put(word, ++thisWordCont); 49 }else { 50 count.put(word,1); 51 } 52 } 53 } 54 55 @Override 56 public void declareOutputFields(OutputFieldsDeclarer declarer) { 57 58 }
三. 总结
以上是一个简单的介绍,需要说明的是由于设置时间间隔是秒级的,所以在处理时会有毫秒级的误差,通常是± 2ms。
以下是没有介绍或者测试到的地方,在以后会补上。
- 如何设置此拓扑中所有的spout和bolt都定时处理。
- 由于是tuple类型数据,当正常tuple数据量过大时是否会造成tickTuple延时消费。
WordCout源码:https://github.com/youtNa/stormTick