zoukankan      html  css  js  c++  java
  • Storm Windowing storm滑动窗口简介

    Storm Windowing

    简介

    Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,由如下两个因素决定:

    • 窗口的长度,可以是时间间隔或Tuple数量;
    • 滑动间隔(sliding Interval),可以是时间间隔或Tuple数量;

    要确保topo的过期时间大于窗口的大小加上滑动间隔

    Sliding Window:滑动窗口

    按照固定的时间间隔或者Tuple数量滑动窗口。

    • 如果滑动间隔和窗口大小一样则等同于滚窗,
    • 如果滑动间隔大于窗口大小则会丢失数据,
    • 如果滑动间隔小于窗口大小则会窗口重叠。

    Tumbling Window:滚动窗口

    元组被单个窗口处理,一个元组只属于一个窗口,不会有窗口重叠。
    根据我自己的经验其实一般用滚动就可以了

    构造builder的时候支持以下的配置

    (时间和数量的排列组合):

    • withWindow(Count windowLength, Count slidingInterval)
      滑窗 窗口长度:tuple数, 滑动间隔: tuple数
    • withWindow(Count windowLength)
      滑窗 窗口长度:tuple数, 滑动间隔: 每个tuple进来都滑
    • withWindow(Count windowLength, Duration slidingInterval)
      滑窗 窗口长度:tuple数, 滑动间隔: 时间间隔
    • withWindow(Duration windowLength, Duration slidingInterval)
      滑窗 窗口长度:时间间隔, 滑动间隔: 时间间隔
    • withWindow(Duration windowLength)
      滑窗 窗口长度:时间间隔, 滑动间隔: 每个tuple进来都滑
    • withWindow(Duration windowLength, Count slidingInterval)
      滑窗 窗口长度:时间间隔, 滑动间隔: 时间间隔
    • withTumblingWindow(BaseWindowedBolt.Count count)
      滚窗 窗口长度:Tuple数
    • withTumblingWindow(BaseWindowedBolt.Duration duration)
      滚窗 窗口长度:时间间隔

    Tuple时间戳和乱序

    storm支持追踪源数据的时间戳。
    Event time 和Process time
    默认的时间戳是处理元组时的bolt窗口生成的,
    Event time,事件时间,通常这个时间会带在Tuple中;
    Process time,到某一个处理环节的时间。
    举例:A今天早上9点告诉B,说C昨天晚上9点在滨江国际;
    这条信息中,可以认为C在滨江国际的Event time是昨天晚上9点,B接收到这条信息的时间,即Process time,是今天早上9点。

    配置时间戳字段(timestamp field)

    windows按照时间划分时,默认是Process time,也可以指定为Tuple中的Event time。
    如果以Event time来划分窗口:

    1. Tuple落入到哪个窗口,是看tuple里的Event time。
    2. 窗口向后推进,主要依靠Event time的增长;
    public BaseWindowedBolt withTimestampField(String fieldName)
    

    延时(lag)和水位线(watermark)

    从当前最后一条数据算起,往前减去lag,得到一个时间,这个时间就是watermark;
    认为watermark之前的数据都已经到了。收到06:01:00的数据时,认为06:00:00的数据都到了。给他们入window。
    这样实际是一个延时处理,等到了06:01:00时,我才开始将06:00:00的数据放入窗口。

    如果很不巧,06:00:00的数据在06:01:00之后,lag为60s,不好意思,进不了窗口。此数据不会被处理,并且会在worker的日志中加一行INFO信息。

    public class SlidingWindowBolt extends BaseWindowedBolt {
        private OutputCollector collector;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(TupleWindow inputWindow) {
          for(Tuple tuple: inputWindow.get()) {
            // do the windowing computation
            ...
          }
          // emit the results
          collector.emit(new Values(computedValue));
        }
    }
    
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new RandomSentenceSpout(), 1);
         builder.setBolt("slidingwindowbolt",
                         new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
                         1).shuffleGrouping("spout");
        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(1);
    
        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    
    }
    
  • 相关阅读:
    阿里巴巴开源故障注入工具_chaosblade
    一步一步解决centos6.5配置无线网卡的问题
    python自动化测试三部曲之request+django实现接口测试
    python自动化测试三部曲之unittest框架
    python子类如何继承父类的实例变量?
    tp5.0 的 系统变量
    tp5.1 相同控制器不同方法session无法取出的问题
    php 常用自定义函数
    tp5.1 配置多个项目共用同一个核心库
    git LF 和 CRLF换行的问题
  • 原文地址:https://www.cnblogs.com/cutd/p/6188314.html
Copyright © 2011-2022 走看看