zoukankan      html  css  js  c++  java
  • flink window的early计算

    转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9391815.html

    背景
    flink 提供了完善的窗口机制, api中支持常见的三种窗口形式,滚动窗口,滑动窗口和session窗口。下面的图片显示了三种窗口的划分区别:
    滚动窗口
    5b28d86219d81.png
    滑动窗口
    sw.png
    session窗口
    pimg_5b5ec2ffb9039.png

    Tumbing Windows:滚动窗口,窗口之间时间点不重叠。它是按照固定的时间,或固定的事件个数划分的,分别可以叫做滚动时间窗口和滚动事件窗口。
    Sliding Windows:滑动窗口,窗口之间时间点存在重叠。对于某些应用,它们需要的时间是不间断的,需要平滑的进行窗口聚合。例如,可以每30s记算一次最近1分钟用户所购买的商品数量的总数,这个就是时间滑动窗口;或者每10个客户点击购买,然后就计算一下最近100个客户购买的商品的总和,这个就是事件滑动窗口。
    Session Windows:会话窗口,经过一段设置时间无数据认为窗口完成。

    在默认的场景下,所有的窗口都是到达时间语义上的windown end time后触发对整个窗口元素的计算,但是在部分场景的情况下,业务方需要在窗口时间没有结束的情况下也可以获得当前的聚合结果,比如每隔五分钟获取当前小时的sum值,这种情况下,官方提供了对于上述窗口的定制化计算器ContinuousEventTimeTriggerContinuousProcessingTimeTrigger

    下面是一个使用ContinuousProcessingTimeTrigger的简单例子:

    public class ContinueTriggerDemo {
    
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
    
            String hostName = "localhost";
            Integer port = Integer.parseInt("8001");
            ;
    
            // set up the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment
                    .getExecutionEnvironment();
    
            // 从指定socket获取输入数据
            DataStream<String> text = env.socketTextStream(hostName, port);
    
            text.flatMap(new LineSplitter()) //数据语句分词
                    .keyBy(0) // 流按照单词分区
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 设置一个120s的滚动窗口
                    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//窗口每统计一次当前计算结果
                    .sum(1)// count求和
                    .map(new Mapdemo())//输出结果加上时间戳
                    .print();
    
            env.execute("Java WordCount from SocketTextStream Example");
    
        }
    
        /**
         * Implements the string tokenizer that splits sentences into words as a
         * user-defined FlatMapFunction. The function takes a line (String) and
         * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
         * Integer>).
         */
        public static final class LineSplitter implements
                FlatMapFunction<String, Tuple2<String, Integer>> {
    
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\W+");
    
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    
        public static final class Mapdemo
                implements
                MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
    
            @Override
            public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value)
                    throws Exception {
                // TODO Auto-generated method stub
    
                DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String s = format2.format(new Date());
    
                return new Tuple3<String, String, Integer>(value.f0, s, value.f1);
            }
        }
        
    
    
    }

    在本地启动端口 :nc -lk 8001 并启动flink程序
    输入数据:

               aa
               aa
               bb

    观察程序数据结果日志

    5> (aa,2018-07-30 16:08:20,2)
    5> (bb,2018-07-30 16:08:20,1)
    5> (aa,2018-07-30 16:08:40,2)
    5> (bb,2018-07-30 16:08:40,1)
    5> (aa,2018-07-30 16:09:00,2)
    5> (bb,2018-07-30 16:09:00,1)
    5> (aa,2018-07-30 16:09:20,2)
    5> (bb,2018-07-30 16:09:20,1)
    5> (aa,2018-07-30 16:09:40,2)
    5> (bb,2018-07-30 16:09:40,1)

    在上述输入后继续输入

        aa

    日志结果统计为

    5> (aa,2018-07-30 16:10:00,3)
    5> (bb,2018-07-30 16:10:00,1)

    根据日志数据可见,flink轻松实现了一个窗口时间长度为120s并每20s向下游发送一次窗口当前聚合结果的功能。

  • 相关阅读:
    使用Bootstrap后,关于IE与Chrome显示字体的问题
    利用百度接口,识别身份证
    双日历日期选择控件
    回复一个朋友:如何理解委托
    IIS7增加mine类型,以便可以访问apk
    关于SqlBulkCopy SQL批量导入需要注意,列名是区分大小写的
    关于取表中id最大值+1的select语句,哪种效率更高?
    MySQL中如何分析查询语句
    判断同名股票是否存在的MyBatis查询函数写法
    Thymeleaf中model设一个值 页面显示此值 JS取此值
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/9391815.html
Copyright © 2011-2022 走看看