zoukankan      html  css  js  c++  java
  • 从 demo 到生产

    我们知道因为通常流是无限的(无界的),所以在流上的工作方式与批处理不同,使用相同的方式对流中的所有元素进行计数是不可能的。

    但是很多时候又需要使用聚合事件(比如计数、求和)统计流上的数据,这个时候的聚合就用到了 window,因为需要由 window 来划定范围,比如 "计算过去的5分钟" , "统计最后100个元素的和" 等等。

    window 窗口操作是一种可以把无限数据切割为有限数据块的手段。本文就用实际中的经典场景来看看 Flink 中窗口操作的编程套路。

    本文结构如下:


    窗口中重要的概念

    • Time 分类

    • EventTime 事件时间

    • Ingestion Time 事件进入 Flink 的时间

    • Processing Time 事件被处理时当前系统的时间

    spark Streaming 用的的是Processing Time (Spark Streaming 不支持 EventTime ) ,使用处理时间是不准确的,因为当由延迟的数据涌入时导致数据量不准确。

    实际中更多的情况下使用的是事件时间 EventTime,后面的案例也都是基于 Flink EventTime 进行总结的。

    • Watermark  (水位)

    Watermark  也就是常说的水位机制目的是处理乱序的数据。

    窗口操作需要水位的原因:

    在流式处理时间中,从事件产生,到流经source,再到 oprator ,中间是需要时间的。虽然在大部分情况下,流到 operator 的数据都是按事件产生的时间顺序来的,但是依然不排除由于网络延迟等原因导致乱序的产生,特别是在使用 kafka 的情况下,多个分区数据无法保证有序。

    而在 window 计算的时候,我们又不会无限期的等下去,必须要有个机制来保证一个特定的时间后,必须要触发 window 计算,这个特别的机制就是 watermark。

    • 窗口类型

    window 分类:

    • tumbling windows 滚动窗口 没有重叠

    • sliding windows 滑动窗口  有重叠

    • session windows 会话窗口

    • global windows 没有窗口  ,自定义窗口

    timeWindow :  与时间强相关,可能是   session windows ,tumbling windows,  sliding windows 。

     

    countWindow : 与数量有关,可能是  tumbling windows ,sliding windows。

     

    小提示 :countWindow(3,2) 的意思是每隔 2 条元素 统计最新 3 条数据。

    • 常用的函数

    window 操作中常用的增量聚合函数,意思是 来一条数据处理一条

    常用的聚合算子有:

    • reduce(function)

    • aggregate(function)

    • sum() ,max() ,min()

    window 全量聚合函数,意思是等属于窗口的数据到齐了才开始进行聚合计算(可以实现对窗口内的数据进行排序等需求)

    • apply(function)

    • process(function)

    • processWindowFunction 比 windowFunction 多提供了上下文信息,类似 map 与 richMap 的关系。

    • 触发器和驱逐器

    触发器用来触发程序的运行 ;驱逐器用来 驱逐 淘汰数据。

     

    触发器 Trigger 返回值说明:

    • TriggerResult.CONTINUE:表示啥都不做;

       

    • TriggerResult.FIRE:表示触发计算,同时保留窗口中的数据;

       

    • TriggerResult.PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息;

       

    • TriggerResult.FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。)

     


    窗口编程的套路

    这部分选取了3种比较经典的场景进行编程套路的总结,分别是 求热门 TopN ,黑名单,统计日(月、年)总量 三种场景,在步骤套路上是类似的,只有些许的区别。

    • 求 topN 场景

    需求:每 5 秒统计近10分钟的 前三个 热门页面

    样例数据:

    ipuserIdeventTimemethodurl
    83.149.9.123 - 17/05/2020:10:05:03 +0000 GET presentations/logstash-kafkamonitor-2020/images/kibana-search.png

    需求步骤总结:

    1 获取数据

     

    //数据格式 模拟kafka 中的消费数据
    // 83.149.9.123 - - 17/05/2020:10:05:03 +0000 GET /presentations/logstash-kafkamonitor-2020/images/kibana-search.png
            DataStreamSource<String> readTextFileSource = env.readTextFile(".\data\data2.log");

    2 解析数据

    //将日志数据转成自定义的 MyApacheLogEvent 对象 方便后续处理
    SingleOutputStreamOperator<MyApacheLogEvent> mapResult = readTextFileSource.map(new MyParseLogFunction());

    3 添加水位

     //添加水位 是为了处理延迟过来的数据 ,并且这一步是固定模式 ,可以直接复用
      SingleOutputStreamOperator<MyApacheLogEvent> watermarkResult = mapResult.assignTimestampsAndWatermarks(
                    WatermarkStrategy.forGenerator((context) -> new MyPeriodWatermarkGenerator())//指定水位计算方式,单位毫秒
                            .withTimestampAssigner((context -> new MyTimeStampExtactor()))//指定时间字段,单位毫秒
            );
     

    4 根据业务分组

     

    // 这里需求用到日志中最后的字段 url 统计热门页面 以 url 分组
     KeyedStream<MyApacheLogEvent, String> urlResult = watermarkResult.keyBy(log -> log.getUrl());

    5 滑动窗口/滚动窗口

     

    //需求每5秒统计近10分钟的数据  使用 滑动窗口 SlidingEventTimeWindows
    WindowedStream<MyApacheLogEvent, String, TimeWindow> windowResult = urlResult.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(5)));

    6 窗口分组

     

    //先使用增量聚合函数统计每个 URL 出现的次数 
     SingleOutputStreamOperator<MyUrlView> aggregateResult = windowResult.aggregate(new MyPageCountAgg(), new MyPageWndowResult());
    //因为需求为 topN ,统计的肯定是同一窗口期的数据 ,所以需求将窗口分组
            KeyedStream<MyUrlView, Long> windowKeyByResult = aggregateResult.keyBy(urlView -> urlView.getWindowEnd());//按窗口分组


    7 求topN

      
    // 使用全量聚合函数 process 求 topN ,使用 MapState 进行统计,详见完整代码
      SingleOutputStreamOperator<String> process = windowKeyByResult.process(new MyTopNHotPage(3));
     

    完整代码:

    //由于完整代码太长影响阅读,这里只贴出主类 ,完整版见文末 public static void main(String[] args) throws Exception {
            //1 获取数据        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);        //2 解析数据        //模拟读 kafka
            //数据格式        // 83.149.9.123 - - 17/05/2020:10:05:03 +0000 GET /presentations/logstash-kafkamonitor-2020/images/kibana-search.png        DataStreamSource<String> readTextFileSource = env.readTextFile(".\data\data2.log");
            SingleOutputStreamOperator<MyApacheLogEvent> mapResult = readTextFileSource.map(new MyParseLogFunction());
    
            //3 添加水位        //context  WatermarkGenerator.context  这里添加水位一定要加 context
            SingleOutputStreamOperator<MyApacheLogEvent> watermarkResult = mapResult.assignTimestampsAndWatermarks(                WatermarkStrategy.forGenerator((context) -> new MyPeriodWatermarkGenerator())//指定水位计算方式                        .withTimestampAssigner((context -> new MyTimeStampExtactor()))//指定时间字段        );
            //4 按url分组        KeyedStream<MyApacheLogEvent, String> urlResult = watermarkResult.keyBy(log -> log.getUrl());        //5 实现滑动窗口        WindowedStream<MyApacheLogEvent, String, TimeWindow> windowResult = urlResult.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(5))); //每 5秒 统计 近 10分钟的数据
            //6 按窗口分组  两个参数 第一个是 计算函数 ,第二个是 窗口函数        SingleOutputStreamOperator<MyUrlView> aggregateResult = windowResult.aggregate(new MyPageCountAgg(), new MyPageWndowResult());//统计每个 URL 出现的次数
            KeyedStream<MyUrlView, Long> windowKeyByResult = aggregateResult.keyBy(urlView -> urlView.getWindowEnd());//按窗口分组
            //7 求TopN
            SingleOutputStreamOperator<String> process = windowKeyByResult.process(new MyTopNHotPage(3));
            //8 打印需求        process.print();
            env.execute("HotPage");    }
     
    • 黑名单场景

    需求:每个用户 每个广告 点击次数 大于 100

    样例数据:

    用户ID广告ID省份城市时间
    662867 2244074 广东 广州 1511658060

    需求步骤总结:

    1 获取数据

    //加载测试数据 模拟kafka消费 
    
    DataStreamSource<String> streamSource = env.readTextFile(".\data\data3.csv");

     

    2 数据转换

    
    

    //数据转换 将日志转为对象 --> MyAdClickBean
    SingleOutputStreamOperator<MyAdClickBean> mapStream = streamSource.map(new MyAdClickMapFuntion());

     

    3 添加水位

    //添加水位 是为了处理延迟过来的数据 ,并且这一步是固定模式 ,可以直接复用

    SingleOutputStreamOperator<MyAdClickBean> adStreamWatermark = mapStream.assignTimestampsAndWatermarks(
    WatermarkStrategy.forGenerator((context) -> new MyWatermarkBlackAgg())//指定水位计算方式,单位毫秒
    .withTimestampAssigner((ctx) -> new MyWatermarkBlackAssgin())//指定水位计算方式 ,单位毫秒
    );

    
    
    
    

    4 按业务分组

     
     //由于需要统计每个用户对每个广告的点击,这里使用自定义分组策略 ,按用户id、广告id 分组
      KeyedStream<MyAdClickBean, Tuple2<String, String>> keyedStream = adStreamWatermark.keyBy(new MyBlackKey());
     

    5 统计次数

    //使用全量聚合函数 统计每个分组中的数据量,这里没有进行窗口分组,而是在处理过程中使用了定时器机制,于每天凌晨进行统计,详见完整代码
    SingleOutputStreamOperator<MyAdClickBean> blackListOutput = keyedStream.process(new MyFilterBlackListUser(100));
     

    完整代码:

    
    
    //1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    //2
    DataStreamSource<String> streamSource = env.readTextFile("D:\fpy\workspace\study\flink-window-stream\src\main\data\data3.csv");

    SingleOutputStreamOperator<MyAdClickBean> outPut = streamSource.map(new MyAdClickMapFuntion())
    .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context) -> new MyWatermarkBlackAgg()).withTimestampAssigner((ctx) -> new MyWatermarkBlackAssgin()))
    .keyBy(new MyBlackKey()) //没有窗口
    .process(new MyFilterBlackListUser(100));


    //将上游结果输出到侧输出流 方便下游使用
    outPut.getSideOutput(blackListOutputTag).print();



    env.execute("AdClickCount");
    
    
    • 统计日(月/年)指标场景

    需求:统计每日的订单金额数量

    样例数据:

    订单编号订单金额订单状态用户id支付方式支付流水创建时间支付时间
    6623957652 248800 18 3887511420 CFT null 2020-09-18 00:00:02 2020-09-18 00:30:02

    需求步骤总结:

    1 获取数据

    //加载测试数据 模拟kafka消费
    DataStreamSource<String> dataStreamSource = env.readTextFile(".\data\order_test");
     

    2 解析数据

     // 日志数据转成三元组 类似对象 Tuple3<String, String, Double> <时间(截取到日),完整时间,金额>
     SingleOutputStreamOperator<Tuple3<String, String, Double>> orderMapStream = dataStreamSource.map(new MyOrderInfoMoneyFunction());
     

     

    3 添加水位

     //添加水位 是为了处理延迟过来的数据 ,并且这一步是固定模式 ,可以直接复用
    SingleOutputStreamOperator<Tuple3<String, String, Double>> orderStreamAddWatermark = orderMapStream.assignTimestampsAndWatermarks(
                    WatermarkStrategy.forGenerator((context) -> new MyWaterMarkAggOrd())//指定水位计算方式,单位毫秒
                           . withTimestampAssigner((context) -> new OrderWatermarkAssign())//指定水位字段,单位毫秒
            );


    4 按需求分组

     //按天分组 也就是三元组的第一个元素分组
      KeyedStream<Tuple3<String, String, Double>, String> tuple3StringKeyedStream = orderStreamAddWatermark.keyBy(tuple3 -> tuple3.f0);
     

    5 窗口操作

     //由于需要按天统计 没有时间重叠部分 使用滚动窗口
     //这里为什么 要用 Time.hours(-8)呢? Flink 使用的是时间纪元,而我们使用的是 东八区,所以FLink窗口的时间要比我们当前时间少8小时
     
     // 有时候 TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)) 会报错 解决办法:
     // 改为 TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)) ,最新版本已经修复
     
     WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> windowWindowedStream = tuple3StringKeyedStream.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)));
     

    6 触发器

    //每个5秒触发一次计算,数据会滚动一次 类似 实时大屏效果
     WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> triggerCountStream = windowWindowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));//每5秒
     

    7 驱逐器

    //将老数据删除掉 意思就是将老的计算的数据丢掉,只保留最新计算的数据
    WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> evictorCountStream = triggerCountStream.evictor(TimeEvictor.of(Time.seconds(0), true));
     

    8 统计计算

    //使用全量聚合函数 统计每天的订单金额  valueState 记录金额总量 ,详见完整代码
     SingleOutputStreamOperator<String> orderSumStream = evictorCountStream.process(new OrderInfoSumMoney());

     

    完整代码:

     

     public static void main(String[] args) throws Exception {
    
    
            //1
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //2
            DataStreamSource<String> dataStreamSource = env.readTextFile("D:\fpy\workspace\study\flink-window-stream\src\main\data\order_test");
    
    
         /* 分步操作
    
         SingleOutputStreamOperator<Tuple3<String, String, Double>> orderMapStream = dataStreamSource.map(new MyOrderInfoMoneyFunction());
    
    
            SingleOutputStreamOperator<Tuple3<String, String, Double>> orderStreamAddWatermark = orderMapStream.assignTimestampsAndWatermarks(
                    WatermarkStrategy.forGenerator((context) -> new MyWaterMarkAggOrd()).
                            withTimestampAssigner((context) -> new OrderWatermarkAssign())
            );
    
    
            KeyedStream<Tuple3<String, String, Double>, String> tuple3StringKeyedStream = orderStreamAddWatermark.keyBy(tuple3 -> tuple3.f0);
    
    
            WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> windowWindowedStream = tuple3StringKeyedStream.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)));
    
    
            WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> triggerCountStream = windowWindowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));//每5秒
    
    
            WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> evictorCountStream = triggerCountStream.evictor(TimeEvictor.of(Time.seconds(0), true));
    
    
            SingleOutputStreamOperator<String> orderSumStream = evictorCountStream.process(new OrderInfoSumMoney());*/
    
    
            DataStreamSink<String> streamSink = dataStreamSource.map(new MyOrderInfoMoneyFunction())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context) -> new MyWaterMarkAggOrd()).withTimestampAssigner((context) -> new OrderWatermarkAssign()))
                    .keyBy(tuple3 -> tuple3.f0)
                    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // 每天 凌晨刷新 最近1天的数据
                    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)))
                    .evictor(TimeEvictor.of(Time.seconds(0), true))// 去掉原来的数据
                    .process(new OrderInfoSumMoney()).print();
    
    
    
    
            env.execute("sumcount");
    
    
        }

    这部分选取了3种比较经典的场景进行编程套路的总结,分别是 求热门 TopN ,黑名单,统计日(月、年)总量 三种场景,在步骤套路上是类似的,只有些许的区别。

    大致可总结为如下几步:

     

    1 获取数据

     

    2 解析数据

     

    3 添加水位

     

    4 根据业务需求分组 (可以是多个角度分组)

     

    5 实现滑动窗口/滚动

     

    6 按窗口分组 (增量函数使用)

     

    7 触发器和驱逐器

     

    8 最终的全量数据处理 ( 可能使用到定时器 窗口结束的时候触发定时器)

    PS:  公号后台回复 "Flink窗口或窗口",查看完整案例代码。


       THE END  

  • 相关阅读:
    Magic-Club第六天
    .net工具类——文件操作
    .net工具类——HTML处理
    .net工具类——随机生成
    .net工具类——删除最后结尾的一个逗号
    .net工具类——分割字符串
    .net工具类——对象转换处理
    .net扩展方法——其他(科学计数法、ToDictionary 去重、List<Guid>转为List<Guid?>)
    .net扩展方法——类型转换
    『Linux学习笔记』7. 管道和过滤器 -- pipe
  • 原文地址:https://www.cnblogs.com/fanyi0922/p/14784703.html
Copyright © 2011-2022 走看看