zoukankan      html  css  js  c++  java
  • Flink实例(114):自定义时间和窗口的操作符(十三)自定义窗口分配器之设定窗口开始与结束时刻

    1. 自定义窗口分配器(flink1.11.2)

    package com.atguigu.exercise.ETL.caiutil
    
    import java.text.SimpleDateFormat
    import java.util
    import java.util.{Collections, Date}
    
    import org.apache.flink.api.common.ExecutionConfig
    import org.apache.flink.api.common.typeutils.TypeSerializer
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
    import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger}
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    
    class CustomWindowAssigner[T] extends WindowAssigner[T, TimeWindow]{
      override def assignWindows(t: T, timestamp: Long, windowAssignerContext: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow] = {
        var offset: (Long, Long) = null
        offset = getTimestampFromFiveMinute(timestamp)
    
        //分配窗口
        Collections.singletonList(new TimeWindow(offset._1, offset._2))
      }
    
    
      //注意此处需要进行类型的转换,否则或编译出错,java版本好像没问题,但是java对于上面的offset处理有点难搞,所以放弃了
      override def getDefaultTrigger(streamExecutionEnvironment: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]]
    
      override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer
    
      //是否使用事件时间
      override def isEventTime: Boolean = true
    
      /**
       * 获取指定时间戳五分钟范围
       *
       * @param timestamp 时间戳
       * @return
       */
    
      def getTimestampFromFiveMinute(timestamp: Long): (Long, Long) ={
        val timeString= getByinterMinute(timestamp+"")
        val dateFormat = new SimpleDateFormat("yyyyMMddHHmm")
        val start_date = dateFormat.parse(timeString._1)
        val end_date = dateFormat.parse(timeString._2)
    
        (start_date.getTime,end_date.getTime)
    
      }
    
      def getByinterMinute(timeinfo:String): (String,String)={
        val timeMillons = timeinfo.toLong
        val date = new Date(timeMillons)
        val dateFormatMinute = new SimpleDateFormat("mm")
        val dateFormatHour = new SimpleDateFormat("yyyyMMddHH")
        val minute = dateFormatMinute.format(date)
        val hour = dateFormatHour.format(date)
        val minuteLong = minute.toLong
        var endMinute = ""
        var startMinute = ""
        if(minuteLong >= 0 && minuteLong <5){//0-5
          startMinute = "00"
          endMinute = "05"
        }else if (minuteLong >= 5 && minuteLong <10){
          startMinute = "05"
          endMinute = "10"
        }else if (minuteLong >= 10 && minuteLong <15){
          startMinute = "10"
          endMinute = "15"
        }else if (minuteLong >= 15 && minuteLong <20){
          startMinute = "15"
          endMinute = "20"
        }else if (minuteLong >= 20 && minuteLong <25){
          startMinute = "20"
          endMinute = "25"
        }else if (minuteLong >= 25 && minuteLong <30){
          startMinute = "25"
          endMinute = "30"
        }else if (minuteLong >= 30 && minuteLong <35){
          startMinute = "30"
          endMinute = "35"
        }else if (minuteLong >= 35 && minuteLong <40){
          startMinute = "35"
          endMinute = "40"
        }else if (minuteLong >= 40 && minuteLong <45){
          startMinute = "40"
          endMinute = "45"
        }else if (minuteLong >= 45 && minuteLong <50){
          startMinute = "45"
          endMinute = "50"
        }else if (minuteLong >= 50 && minuteLong <55){
          startMinute = "50"
          endMinute = "55"
        }else if (minuteLong >= 55 && minuteLong <60){
          startMinute = "55"
          endMinute = "60"
        }
    
        val endTime = hour+endMinute // 窗口结束时间
        val startTime = hour+startMinute //窗口开始时间
        (startTime,endTime)
    
      }
    
      def main(args: Array[String]): Unit = {
        val testtime = getTimestampFromFiveMinute(1536268066000L)
      }
    
    
    }

    2 主程序

    package com.atguigu.exercise.day4
    
    import java.time.Duration
    
    import com.atguigu.day2.{SensorReading, SensorSource}
    import com.atguigu.exercise.ETL.caiutil.CustomWindowAssigner
    import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object CustomWindowTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val stream = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
          WatermarkStrategy
            .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5))
            .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
              override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
            }
            )
        )
    
        stream
          .keyBy(data => data.id)
          .window(new CustomWindowAssigner)
            .process(new ProcessWindowFunction[SensorReading,String,String,TimeWindow](){
              override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[String]): Unit = {
                val startTime = context.window.getStart
                val endTime = context.window.getEnd
                val timeString = "startTime" + startTime +" "+"endTime"+endTime
                out.collect(timeString)
              }
            }
         )
            .print()
    
    
    
        env.execute()
    
    
      }
    
    }
  • 相关阅读:
    虚拟机网络模式
    js读取json包装的map集合
    LeetCode 94:Binary Tree Inorder Traversal
    tornado+ansible+twisted+mongodb运维自己主动化系统开发(四)
    UVA
    解决request.getRemoteAddr()获取的值为0:0:0:0:0:0:0:1这个小问题
    eclipse调试web项目
    Action的mapping.findFoward(forwardName)必须要在struts-config.xml中的对应的action节点配置一个forward节点
    使用struts的时候form用struts的,不用html本身的
    eclipse的源代码编辑窗口可以拖出来单独使用的哦
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14218002.html
Copyright © 2011-2022 走看看