zoukankan      html  css  js  c++  java
  • FLINK基础(108): DS算子与窗口(19)窗口 (4) window functions (三)增量聚合函数与全窗口函数结合

      我们还可以将ReduceFunction/AggregateFunction和ProcessWindowFunction结合起来使用。ReduceFunction/AggregateFunction做增量聚合,ProcessWindowFunction提供更多的对数据流的访问权限。如果只使用ProcessWindowFunction(底层的实现为将事件都保存在ListState中),将会非常占用空间。分配到某个窗口的元素将被提前聚合,而当窗口的trigger触发时,也就是窗口收集完数据关闭时,将会把聚合结果发送到ProcessWindowFunction中,这时Iterable参数将会只有一个值,就是前面聚合的值。

    实例一

    input
      .keyBy(...)
      .timeWindow(...)
      .reduce(
        incrAggregator: ReduceFunction[IN],
        function: ProcessWindowFunction[IN, OUT, K, W])
    
    input
      .keyBy(...)
      .timeWindow(...)
      .aggregate(
        incrAggregator: AggregateFunction[IN, ACC, V],
        windowFunction: ProcessWindowFunction[V, OUT, K, W])

    我们把之前的需求重新使用以上两种方法实现一下。

    case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)
    
    val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
      .map(r => (r.id, r.temperature, r.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .reduce(
        (r1: (String, Double, Double), r2: (String, Double, Double)) => {
          (r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
        },
        new AssignWindowEndProcessFunction
      )
    
    class AssignWindowEndProcessFunction
      extends ProcessWindowFunction[(String, Double, Double),
        MinMaxTemp, String, TimeWindow] {
        override def process(key: String,
                           ctx: Context,
                           minMaxIt: Iterable[(String, Double, Double)],
                           out: Collector[MinMaxTemp]): Unit = {
        val minMax = minMaxIt.head
        val windowEnd = ctx.window.getEnd
        out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
      }
    }

    实例二

    1. 可将ProcessWindowFunction与增量聚合函数ReduceFunctionAggregateFunction结合。
    2. 元素到达窗口时增量聚合,当窗口关闭时对增量聚合的结果用ProcessWindowFunction再进行全量聚合。
    3. 既可以增量聚合,也可以访问窗口的元数据信息(如开始结束时间、状态等)。

    ProcessWindowFunction与ReduceFunction结合

    // 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
    // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
    
    // API: 如上ReduceFunction与ProcessWindowFunction
    
    // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction),并获得Key和Window信息。
    kafkaStream
        // 将从Kafka获取的JSON数据解析成Java Bean
        .process(new KafkaProcessFunction())
        // 提取时间戳生成水印
        .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
        // 按用户分组
        .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
        // 构造TimeWindow
        .timeWindow(Time.seconds(windowLengthSeconds))
        // 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录
        .reduce(
                new ReduceFunction<UserActionLog>() {
                    @Override
                    public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
                        return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
                    }
                },
                new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
                        
                        UserActionLog max = elements.iterator().next();
        
                        String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
                        String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
        
                        String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的最大价值对应的那条记录: "+max;
                        out.collect(record);
        
                    }
                }
        )
        .print();
        
    // 结果
    Key: user_2 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_2', eventTime='2019-11-09 13:54:10', eventType='browse', productID='product_3', productPrice=30}
    Key: user_4 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_4', eventTime='2019-11-09 13:54:15', eventType='browse', productID='product_3', productPrice=30}
    Key: user_3 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_3', eventTime='2019-11-09 13:54:12', eventType='browse', productID='product_2', productPrice=20}
    Key: user_5 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_5', eventTime='2019-11-09 13:54:17', eventType='browse', productID='product_2', productPrice=20}

    ProcessWindowFunction与AggregateFunction结合

    // 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
    // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
    
    // API: 如上AggregateFunction与ProcessWindowFunction
    
    // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction),并获得Key和Window信息。
    kafkaStream
        // 将从Kafka获取的JSON数据解析成Java Bean
        .process(new KafkaProcessFunction())
        // 提取时间戳生成水印
        .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
        // 按用户分组
        .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
        // 构造TimeWindow
        .timeWindow(Time.seconds(windowLengthSeconds))
        // 窗口函数: 获取这段窗口时间内,每个用户浏览的商品的平均价值,并发出Key和Window信息
        .aggregate(
             new AggregateFunction<UserActionLog, Tuple2<Long, Long>, Double>() {
    
                 // 1、初始值
                 // 定义累加器初始值
                 @Override
                 public Tuple2<Long, Long> createAccumulator() {
                     return new Tuple2<>(0L, 0L);
                 }
    
                 // 2、累加
                 // 定义累加器如何基于输入数据进行累加
                 @Override
                 public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
                     accumulator.f0 += 1;
                     accumulator.f1 += value.getProductPrice();
                     return accumulator;
                 }
    
                 // 3、合并
                 // 定义累加器如何和State中的累加器进行合并
                 @Override
                 public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
                     acc1.f0 += acc2.f0;
                     acc1.f1 += acc2.f1;
                     return acc1;
                 }
    
                 // 4、输出
                 // 定义如何输出数据
                 @Override
                 public Double getResult(Tuple2<Long, Long> accumulator) {
                     return accumulator.f1 / (accumulator.f0 * 1.0);
                 }
             },
             new ProcessWindowFunction<Double, String, String, TimeWindow>() {
                 @Override
                 public void process(String key, Context context, Iterable<Double> elements, Collector<String> out) throws Exception {
    
                     Double avg = elements.iterator().next();
    
                     String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
                     String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
    
                     String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg);
                     out.collect(record);
    
                 }
             }
    
        )
        .print();
        
    //结果
    Key: user_2 窗口开始时间: 2019-11-09 14:05:40 窗口结束时间: 2019-11-09 14:05:50 浏览的商品的平均价值: 13.33
    Key: user_3 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 25.00
    Key: user_4 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00
    Key: user_2 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 30.00
    Key: user_5 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00
    Key: user_1 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 23.33

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13783226.html

  • 相关阅读:
    75.Java异常处理机制-自定义异常
    75.Java异常处理机制-手动抛出异常
    75.Java异常处理机制throws
    mybatis的xml文件中如何处理大小于号
    JS 拼装代码的HTML onClick方法传递字符串
    Java 日期往后推迟n天
    MySql 去重且指定某字段在前的排序方法
    java运行内存分配图(转)
    Java中正则Matcher类的matches()、lookAt()和find()的区别<转>
    图片在父元素中上下居中(vertical-align的有效性)
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13783226.html
Copyright © 2011-2022 走看看