zoukankan      html  css  js  c++  java
  • flink自定义窗口分配器 周、月

    关于分配器介绍内容来自官网

    窗口分配的概念

    窗口分配程序(Window Assigners)定义如何将元素分配给窗口。
    通过window(...) (for keyed streams)windowAll()for non-keyed streams)指定需要的WindowAssigner。

    WindowAssigner负责将每个传入元素分配给一个或多个窗口。

    Flink为最常见的用例提供了预定义的窗口分配程序如:tumbling windows, sliding windows, session windows and global windows.

    同时还可以通过扩展WindowAssigner类来实现自定义窗口assigner

    所有内置的窗口分配程序(除了global windows)都根据时间将元素分配给窗口,时间可以是处理时间,也可以是事件时间。

    基于时间的窗口有一个开始时间戳(包括)和一个结束时间戳(不包括),它们一起描述窗口的大小。[starTimestamp,endTimestamp):左闭右开

    Flink预定义的窗口分配程序

    Tumbling Windows (滚动窗口)

    翻转窗口分配程序将每个元素分配给指定窗口大小的窗口,滚动窗口有一个固定的大小并且元素之间不重叠。
    Tumbling Windows.png

    val input: DataStream[T] = ...
    
    // tumbling event-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // tumbling processing-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    //偏移量的一个重要用例是将窗口调整到UTC-0以外的时区。例如,在中国,您必须指定时间偏移量(-8)。
    //事件时间窗口偏移-8小时
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    Sliding Windows (滑动窗口)

    滑动窗口分配程序将元素分配给固定长度的窗口。类似于滚动窗口分配程序,窗口的大小由窗口大小参数配置。
    同时还有一个附加的窗口滑动距离参数控制滑动窗口启动的频率。

    滑动距离与窗口大小的的不同会导致数据元素是否重叠、丢失、恰好一次

    具体情况如下:

    1. slideSize>windowSize 丢失数据
    2. slideSize<windowSize 数据重叠
    3. slideSize=windowSize 恰好一次(此时等同TumblingWindows)

    Sliding Windows.png

    val input: DataStream[T] = ...
    
    // sliding event-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // sliding processing-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // sliding processing-time windows offset by -8 hours
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    Session Windows(会话窗口)

    会话窗口分配程序根据活动的会话对元素进行分组。
    与滚动、滑动窗口不同的是,会话窗口没有数据重叠,也没有固定的开始和结束时间。
    当某个会话窗口在一段时间内没有接收到元素时,它将关闭窗口

    Session Windows.png

    val input: DataStream[T] = ...
    
    // event-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>)
    
    // event-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
          override def extract(element: String): Long = {
            // determine and return session gap
          }
        }))
        .<windowed transformation>(<window function>)
    
    // processing-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>)
    
    
    // processing-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
          override def extract(element: String): Long = {
            // determine and return session gap
          }
        }))
        .<windowed transformation>(<window function>)
    

    Global Windows(全局窗口)

    全局窗口分配程序将具有相同键的所有元素分配给同一个全局窗口。

    全局窗口模式仅在指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有一个可以处理聚合元素的自然末端。(窗口没有结束,没有出发计算的条件,除非自定义触发器)
    Global Windows.png

    val input: DataStream[T] = ...
    
    input
        .keyBy(<key selector>)
        .window(GlobalWindows.create())
        .<windowed transformation>(<window function>)
    

    上面就是flink提供一些窗口分配程序,基本能满足大多数情况。

    但是对于某些特殊情况flink提供的分配成程序没法满足要求,此时就需要根据需求自定义分配程序。

    实现自定的分配程序需要实现org.apache.flink.streaming.api.windowing.assigners.WindowAssigner


    自定义 WindowAssigner

    如果我们定义按天、小时、分钟的滚动窗口都很容易实现。

    但是如果我们要定义一周(周日开始或周一),一个月(1号开始)的滚动窗口,那么现有API基本没法实现或很难实现。

    对此就需要我们实现一个自定义的窗口分配器。

    import java.text.SimpleDateFormat
    import java.util
    import java.util.{Calendar, Collections, Date}
    
    import com.meda.utils.DateHelper
    import org.apache.flink.api.common.ExecutionConfig
    import org.apache.flink.api.common.typeutils.TypeSerializer
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
    import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger}
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    
    /**
     * 实现根据周或月的窗口划分窗口
     * 比如按照周日的00:00:00到下一个周六23:59:59
     * 或者每个月第一天的00:00:00到最后一天的23:59:59
     * 实现参考了 TumblingEventTimeWindows
     *
     * @param tag 标签 month  or  week
     * @tparam T 需要划分窗口的数据类型(输入类型)
     */
    class CustomWindowAssigner[T](tag: String) extends WindowAssigner[T, TimeWindow] {
      //窗口分配的主要方法,需要为每一个元素指定所属的分区
      override def assignWindows(element: T, timestamp: Long, context: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow] = {
        var offset: (Long, Long) = null
        tag match {
          case "month" => offset = getTimestampFromMon(timestamp)
          case "week" => offset = getTimestampFromWeek(timestamp)
        }
        //分配窗口
        Collections.singletonList(new TimeWindow(offset._1, offset._2))
      }
    
      //注意此处需要进行类型的转换,否则或编译出错,java版本好像没问题,但是java对于上面的offset处理有点难搞,所以放弃了
      override def getDefaultTrigger(env: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]]
    
      override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer
    
      //是否使用事件时间
      override def isEventTime: Boolean = true
    
    
      /**
       * 获取指定时间戳当月时间戳范围
       * eg:2020-03-12 11:35:13 (timestamp=1583984113960l)
       * 结果为:(1582992000000,1585670399999)=>(2020-03-01 00:00:00,2020-03-31 23:59:59)
       *
       * @param timestamp 时间戳
       * @return
       */
      def getTimestampFromMon(timestamp: Long): (Long, Long) = {
        val calendar = Calendar.getInstance()
        calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMM01000000").format(new Date(timestamp)), "yyyyMMddHHmmss"))
        val numsOfMon: Long = calendar.getActualMaximum(Calendar.DAY_OF_MONTH)
        calendar.set(Calendar.DAY_OF_MONTH, 1)
        val start: Long = calendar.getTimeInMillis
        val end: Long = start + numsOfMon * 24 * 60 * 60 * 1000 - 1
        (start, end)
      }
    
      /**
       * 获取指定时间戳本周时间范围(从周日开始)
       * eg:2020-03-14 23:59:59 (timestamp=1583895064000l)
       * 结果为:(1583596800000,1584201599999)=>(2020-03-08 00:00:00,2020-03-14 23:59:59)
       *
       * @param timestamp 时间戳
       * @return
       */
      def getTimestampFromWeek(timestamp: Long): (Long, Long) = {
        val calendar = Calendar.getInstance()
        calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMMdd000000").format(new Date(timestamp)), "yyyyMMddHHmmss"))
        //    calendar.setFirstDayOfWeek(Calendar.SUNDAY)//设置周日为首日  默认值,一般不用设置
        calendar.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY)
        val start: Long = calendar.getTimeInMillis
        (start, start + 7 * 24 * 60 * 60 * 1000l - 1)
      }
    }
    

    //输入数据
    case class Top100Input(event_id: String, date_d: String, timeStamp: Long, uid: Long, weekTag: String, monthTag: String)
    
    //调用
    val dStream: DataStream[Top100Input] = ...
    
    dStream
          .keyBy(_.weekTag)
          .window(new CustomWindowAssigner[Top100Input]("week"))
    
    dStream
          .keyBy(_.monthTag)
          .window(new CustomWindowAssigner[Top100Input]("month"))
    
  • 相关阅读:
    idea中pom.xml相关操作
    Java集合1-集合与数组的区别
    idea中各种图标的含义
    testng之多线程执行(并发执行)
    testng之DataProvider参数化
    fastjson将java对象与json字符串相互转换
    testng -依赖测试
    testng- 异常测试
    转-selenium3 webdriver启动火狐、chrome、edge、Safari浏览器的方法
    浏览器兼容性测试
  • 原文地址:https://www.cnblogs.com/lillcol/p/12557892.html
Copyright © 2011-2022 走看看