zoukankan      html  css  js  c++  java
  • Flink 窗口聚合函数之ProcessWindowFunction实践

    一、ProcessWindowFunction使用场景

    前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数 据元素的结果计算,例如对整个窗口数据排序取 TopN,这样的需要就必须使用 ProcessWindowFunction。

    二、ProcessWindowFunction业务实践:每隔5秒统计每个基站的日志数量

    1.创建日志数据对象

    case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)
    

    2.业务实现

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    /**
      * 全量聚合函数
      */
    object TestProcessFunctionByWindow {
      
      // 每隔5秒统计每个基站的日志数量
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // source
        var stream = env.socketTextStream("flink101", 8888)
          .map(line => {
            var arr = line.split(",")
            Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          })
    
        // 设置并行度
        stream.setParallelism(1)
        stream.map(log=> (log.sid, 1))
          .keyBy(_._1)
    //        .timeWindow(Time.seconds(5))
          .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
          .process(new MyProcessWindowFunction)  // 一个窗口结束的时候调用一次(在一个并行度中)
            .print()
    
    
        env.execute("TestReduceFunctionByWindow")
      }
    }
    
    class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
    
      // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
      override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
        // 聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据, Iterable的size就是日志的总行数
        out.collect(key, elements.size)
      }
    }
    

      

    三、总结

    ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止,使用该函数需要注意数据量,数据量太大,全量数据保存在内存中,会造成内存溢出。

     

  • 相关阅读:
    怎样去掉a标签的蓝框
    textarea中的内容的获取
    移动端rem布局
    Array的push与unshift方法性能比较分析
    浅谈移动前端性能优化(转)
    移动端高清、多屏适配方案 (转)
    js关于事件的一些总结(系列一)
    移动端实用的meta标签
    浅析js绑定同一个事件依次触发问题系列(一)
    关于移动端input框 在微信中 和ios中无法输入文字的问题
  • 原文地址:https://www.cnblogs.com/zfwwdz/p/13099017.html
Copyright © 2011-2022 走看看