zoukankan      html  css  js  c++  java
  • FLINK基础(107): DS算子与窗口(18)窗口 (3) window functions(二)ProcessWindowFunction

    ProcessWindowFunction

      一些业务场景,我们需要收集窗口内所有的数据进行计算,例如计算窗口数据的中位数,或者计算窗口数据中出现频率最高的值。这样的需求,使用ReduceFunction和AggregateFunction就无法实现了。这个时候就需要ProcessWindowFunction了。

    先来看接口定义

    public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
      extends AbstractRichFunction {
    
      // Evaluates the window
      void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out)
        throws Exception;
    
      // Deletes any custom per-window state when the window is purged
      public void clear(Context ctx) throws Exception {}
    
      // The context holding window metadata
      public abstract class Context implements Serializable {
        // Returns the metadata of the window
        public abstract W window();
    
        // Returns the current processing time
        public abstract long currentProcessingTime();
    
        // Returns the current event-time watermark
        public abstract long currentWatermark();
    
        // State accessor for per-window state
        public abstract KeyedStateStore windowState();
    
        // State accessor for per-key global state
        public abstract KeyedStateStore globalState();
    
        // Emits a record to the side output identified by the OutputTag.
        public abstract <X> void output(OutputTag<X> outputTag, X value);
      }
    }

    process()方法接受的参数为:

      window的key,

      Iterable迭代器包含窗口的所有元素,

      Collector用于输出结果流。

      Context参数和别的process方法一样。而ProcessWindowFunction的Context对象还可以访问window的元数据(窗口开始和结束时间),当前处理时间和水位线,per-window state和per-key global state,side outputs。

    • per-window state: 用于保存一些信息,这些信息可以被process()访问,只要process所处理的元素属于这个窗口。
    • per-key global state: 同一个key,也就是在一条KeyedStream上,不同的window可以访问per-key global state保存的值。

    实例一:

    例子:计算5s滚动窗口中的最低和最高的温度。输出的元素包含了(流的Key, 最低温度, 最高温度, 窗口结束时间)。

    val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .process(new HighAndLowTempProcessFunction)
    
    case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)
    
    class HighAndLowTempProcessFunction  extends ProcessWindowFunction[SensorReading,MinMaxTemp, String, TimeWindow] {
      override def process(key: String,
                           ctx: Context,
                           vals: Iterable[SensorReading],
                           out: Collector[MinMaxTemp]): Unit = {
        val temps = vals.map(_.temperature)
        val windowEnd = ctx.window.getEnd
    
        out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
      }
    }

    实例二:

    // 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
    // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
    
    // API
    // IN:  输入元素类型
    // OUT: 输出元素类型
    // KEY: Key类型
    // W: Window类型
    public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
        ......
        
        public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
        
        ........
    }
    
    // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品总价值(ProcessWindowFunction)
    kafkaStream
        // 将从Kafka获取的JSON数据解析成Java Bean
        .process(new KafkaProcessFunction())
        // 提取时间戳生成水印
        .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
        // 按用户分组
        .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
        // 构造TimeWindow
        .timeWindow(Time.seconds(windowLengthSeconds))
        // 窗口函数: 用ProcessWindowFunction计算这段时间内每个用户浏览的商品总价值
        .process(new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
    
                int sum=0;
                for (UserActionLog element : elements) {
                    sum += element.getProductPrice();
                }
    
                String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
                String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
    
                String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品总价值: "+sum;
                out.collect(record);
    
            }
        })
        .print();
    
    // 结果
    Key: user_1 窗口开始时间: 2019-11-09 13:32:00 窗口结束时间: 2019-11-09 13:32:10 浏览的商品总价值: 60
    Key: user_5 窗口开始时间: 2019-11-09 13:32:00 窗口结束时间: 2019-11-09 13:32:10 浏览的商品总价值: 30
    Key: user_5 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 80
    Key: user_3 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 40
    Key: user_4 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 70

    实例三:

    ProcessWindowFunction业务实践:每隔5秒统计每个基站的日志数量

    1.创建日志数据对象

    case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)

    2.业务实现

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
     
    /**
      * 全量聚合函数
      */
    object TestProcessFunctionByWindow {
       
      // 每隔5秒统计每个基站的日志数量
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // source
        var stream = env.socketTextStream("flink101", 8888)
          .map(line => {
            var arr = line.split(",")
            Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          })
     
        // 设置并行度
        stream.setParallelism(1)
        stream.map(log=> (log.sid, 1))
          .keyBy(_._1)
    //        .timeWindow(Time.seconds(5))
          .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
          .process(new MyProcessWindowFunction)  // 一个窗口结束的时候调用一次(在一个并行度中)
            .print()
     
     
        env.execute("TestReduceFunctionByWindow")
      }
    }
     
    class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
     
      // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
      override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
        // 聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据, Iterable的size就是日志的总行数
        out.collect(key, elements.size)
      }
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13782384.html

  • 相关阅读:
    Day3-B-Round Marriage CodeForces-981F
    Day3-A-Problem H. Monster Hunter HDU6326
    Day3-G
    Day3-I-Squares POJ2002
    Day3-M-Cable master POJ1064
    Day3-N
    Day3-O-Median POJ3579
    Day3-P
    Day3-L-Cup HDU2289
    LeetCode "Majority Element"
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13782384.html
Copyright © 2011-2022 走看看