zoukankan      html  css  js  c++  java
  • storm 1.0版本滑动窗口的实现及原理

    滑动窗口在监控和统计应用的场景比较广泛,比如每隔一段时间(10s)统计最近30s的请求量或者异常次数,根据请求或者异常次数采取相应措施。在storm1.0版本之前,没有提供关于滑动窗口的实现,需要开发者自己实现滑动窗口的功能(storm1.0以前实现滑动窗口的实现原理可以自行百度)。

    原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6481588.html

    微信:intsmaze

    这里主要演示在storm1.0以后如何通过继承storm1.0提供的类来快速开发出窗口滑动的功能。窗口可以从时间或数量上来划分,由如下两个因素决定:窗口的长度,可以是时间间隔或Tuple数量;滑动间隔(sliding Interval),可以是时间间隔或Tuple数量。比如:每两秒统计最近6秒的请求数量;每接收2个Tuple就统计最近接收的6个Tuple的平均值......。

    storm1.0支持的时间和数量的排列组合有如下:

    withWindow(Count windowLength, Count slidingInterval)

      每收到slidingInterval条数据统计最近的windowLength条数据。

    withWindow(Count windowLength)

      每收到1条数据统计最近的windowLength条数据。

    withWindow(Count windowLength, Duration slidingInterval)

      每过slidingInterval秒统计最近的windowLength条数据。

    withWindow(Duration windowLength, Count slidingInterval)

      每收到slidingInterval条数据统计最近的windowLength秒的数据。

    withWindow(Duration windowLength, Duration slidingInterval)

      每过slidingInterval秒统计最近的windowLength秒的数据。

    public withWindow(Duration windowLength)

      每收到1条数据统计最近的windowLength秒的数据。

    接下来,简单的演示如何使用storm1.0实现滑动窗口的功能,先编写spout类,RandomSentenceSpout负责发送一个整形数值,数值每次发送都会自动加一,且RandomSentenceSpout固定每隔两秒向bolt发送一次数据。RandomSentenceSpout和前面关于spout的讲解一样。

    1.public class RandomSentenceSpout extends BaseRichSpout {
    2.
    3.    private static final long serialVersionUID = 5028304756439810609L;  
    4.
    5.    private SpoutOutputCollector collector;  
    6.
    7.    int intsmaze=0;
    8.
    9.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    10.        declarer.declare(new Fields("intsmaze"));
    11.    }
    12.
    13.    public void open(Map conf, TopologyContext context, 
    14.                          SpoutOutputCollector collector) {
    15.        this.collector = collector;
    16.    }
    17.
    18.    public void nextTuple() {
    19.        System.out.println("发送数据:"+intsmaze);
    20.        collector.emit(new Values(intsmaze++));
    21.        try {
    22.            Thread.sleep(2000);
    23.//         Thread.sleep(1000);
    24.        } catch (InterruptedException e) {
    25.            e.printStackTrace();
    26.        }
    27.    }
    }

    滑动窗口的逻辑实现的重点是bolt类,这里我们编写SlidingWindowBolt类让它继承一个新的类名为BaseWindowedBolt来获得窗口计数的功能。BaseWindowedBolt和前面的BaseBaseBoltBaseWindowedBolt提供的方法名都一样,只是execute方法的参数类型为TupleWindow,TupleWindow参数里面装载了一个窗口长度类的tuple数据。通过对TupleWindow遍历,我们可以计算这一个窗口内tuple数的平均值或总和等指标。具体见代码12-16行,统计了一个窗口内的数值型数据的总和。

    1.public class SlidingWindowBolt extends BaseWindowedBolt {
    2.
    3.    private OutputCollector collector;
    4.
    5.    @Override
    6.    public void prepare(Map stormConf, TopologyContext context, 
    7.            OutputCollector collector) {
    8.        this.collector = collector;
    9.    }
    10.
    11.    public void execute(TupleWindow inputWindow) {        
    12.        int sum=0;
    13.        System.out.print("一个窗口内的数据");
    14.        for(Tuple tuple: inputWindow.get()) {
    15.            int str=(Integer) tuple.getValueByField("intsmaze");
    16.            System.out.print(" "+str);
    17.            sum+=str;
    18.        }
    19.        System.out.println("======="+sum);
    20. //        collector.emit(new Values(sum));
    21.    }
    22.
    23.    @Override
    24.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    25.//       declarer.declare(new Fields("count"));
    26.    }
    }

    我们已经实现了窗口计数的逻辑代码,现在我们需要提供topology来指明各个组件的关系,以及指定SlidingWindowBolt的窗口的组合,这里我们演示了如何每两秒统计最近6秒的数值总和,如果注释掉10-13行代码,去掉5-8行的注释,这个topology就是告诉SlidingWindowBolt每接收到两条tuple就统计最近接收到的6条tuple的数值的总和。

    1.public class WindowsTopology {
    2.
    3.    public static void main(String[] args) throws Exception {
    4.       TopologyBuilder builder = new TopologyBuilder();
    5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1);
    6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
    7.//       .withWindow(new Count(6), new Count(2)),1)
    8.//       .shuffleGrouping("spout");
    9.//滑窗 窗口长度:tuple数, 滑动间隔: tuple数 每收到2条数据统计当前6条数据的总和。  
    10.     
    11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
    12.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
    13.               new Duration(2, TimeUnit.SECONDS)),1)
    14.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据       
    15.
    16.       Config conf = new Config();
    17.       conf.setNumWorkers(1);
    18.       LocalCluster cluster = new LocalCluster();
    19.       cluster.submitTopology("word-count", conf, builder.createTopology());
    20.   }
    }

    这里演示的是bolt节点并发度为1的窗口功能,实际生产中,因为数据量很大,往往将bolt节点的并发度设置为多个,这个时候我们的SlidingWindowBolt就无法统计出一个窗口的数值总和了。因为每一个bolt的并行节点只能统计自己一个窗口接收到数据的总和,无法统计出一个窗口内全局数据的总和,借助redis来实现是可以的,但是必须引入redis的事务机制或者借助分布式锁,否则会出现脏数据的情况。在这里我们介绍另一种实现方式就是灵活的使用storm提供的窗口功能,只是窗口的tuple数。

    仍然是使用上面提供的类,只是我们增加一个bolt类,来统计每个SlidingWindowBolt节点发送给它的数值。

    1.public class CountWord extends BaseWindowedBolt{
    2.    
    3.    private static final long serialVersionUID = -5283595260540124273L;
    4.    
    5.    private OutputCollector collector;
    6.    
    7.    public void prepare(Map stormConf, TopologyContext context
    8.                             , OutputCollector collector) {
    9.        this.collector = collector;
    10.    }
    11.    
    12.    public void execute(TupleWindow inputWindow) {
    13.         int sum=0;
    14.         for(Tuple tuple: inputWindow.get()) {
    15.             int i=(Integer) tuple.getValueByField("count");
    16.               System.out.println("接收到一个bolt的总和值为:"+i);
    17.               sum+=i;
    18.          }
    19.         System.out.println("一个窗口内的总值为:"+sum);
    20.    }
    21.
    22.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    23.    }
    }

    然后我们注释RandomSentenceSpout22行代码,取消对23行代码的注释,方便观察结果。去掉SlidingWindowBolt20和25行代码。

    topology启动类如下:

    1.public class WindowsTopology {
    2.
    3.    public static void main(String[] args) throws Exception {
    4.       TopologyBuilder builder = new TopologyBuilder();
    5.       builder.setSpout("spout", new RandomSentenceSpout(), 1);
    6.       
    7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt()
    8.       .withWindow(new Duration(6, TimeUnit.SECONDS), 
    9.               new Duration(2, TimeUnit.SECONDS)),2)
    10.       .shuffleGrouping("spout");//每两秒统计最近6秒的数据
    11.       
    12.       builder.setBolt("countwordbolt", new CountWord()
    13.       .withWindow(new Count(2), new Count(2)),1)
    14.       .shuffleGrouping("slidingwindowbolt");
    15.       //每收到2条tuple就统计最近两条统的数据
    16.       Config conf = new Config();
    17.       conf.setNumWorkers(1);
    18.       LocalCluster cluster = new LocalCluster();
    19.       cluster.submitTopology("word-count", conf, builder.createTopology());
    20.   }
    }
  • 相关阅读:
    java 实现前面带零的数字进行加减运算(保留前面的0)
    java 判断字符串是否是整数(纯数字:正整数、负整数、0)、至少包含一位小数、数字(可以是整数或小数)
    java 抽象类使用@Autowrited注入对象,子类直接使用父类的该属性
    java、springboot使用proguard混淆代码
    idea 使用阿里云仓库下载的jar包出现证书校验问题(PKIX:unable to find valid certification path to requested target)
    windows10安装zookeeper-3.6.2并生成zookeeper服务
    zookeeper-3.4.8 集群搭建
    zookeeper安装和使用 windows环境
    Dubbo入门---搭建一个最简单的Demo框架
    服务端高并发分布式架构演进之路
  • 原文地址:https://www.cnblogs.com/intsmaze/p/6481588.html
Copyright © 2011-2022 走看看