1. 自定义窗口分配器(flink1.11.2)
package com.atguigu.exercise.ETL.caiutil
import java.text.SimpleDateFormat
import java.util
import java.util.{Collections, Date}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
class CustomWindowAssigner[T] extends WindowAssigner[T, TimeWindow]{
override def assignWindows(t: T, timestamp: Long, windowAssignerContext: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow] = {
var offset: (Long, Long) = null
offset = getTimestampFromFiveMinute(timestamp)
//分配窗口
Collections.singletonList(new TimeWindow(offset._1, offset._2))
}
//注意此处需要进行类型的转换,否则或编译出错,java版本好像没问题,但是java对于上面的offset处理有点难搞,所以放弃了
override def getDefaultTrigger(streamExecutionEnvironment: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]]
override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer
//是否使用事件时间
override def isEventTime: Boolean = true
/**
* 获取指定时间戳五分钟范围
*
* @param timestamp 时间戳
* @return
*/
def getTimestampFromFiveMinute(timestamp: Long): (Long, Long) ={
val timeString= getByinterMinute(timestamp+"")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmm")
val start_date = dateFormat.parse(timeString._1)
val end_date = dateFormat.parse(timeString._2)
(start_date.getTime,end_date.getTime)
}
def getByinterMinute(timeinfo:String): (String,String)={
val timeMillons = timeinfo.toLong
val date = new Date(timeMillons)
val dateFormatMinute = new SimpleDateFormat("mm")
val dateFormatHour = new SimpleDateFormat("yyyyMMddHH")
val minute = dateFormatMinute.format(date)
val hour = dateFormatHour.format(date)
val minuteLong = minute.toLong
var endMinute = ""
var startMinute = ""
if(minuteLong >= 0 && minuteLong <5){//0-5
startMinute = "00"
endMinute = "05"
}else if (minuteLong >= 5 && minuteLong <10){
startMinute = "05"
endMinute = "10"
}else if (minuteLong >= 10 && minuteLong <15){
startMinute = "10"
endMinute = "15"
}else if (minuteLong >= 15 && minuteLong <20){
startMinute = "15"
endMinute = "20"
}else if (minuteLong >= 20 && minuteLong <25){
startMinute = "20"
endMinute = "25"
}else if (minuteLong >= 25 && minuteLong <30){
startMinute = "25"
endMinute = "30"
}else if (minuteLong >= 30 && minuteLong <35){
startMinute = "30"
endMinute = "35"
}else if (minuteLong >= 35 && minuteLong <40){
startMinute = "35"
endMinute = "40"
}else if (minuteLong >= 40 && minuteLong <45){
startMinute = "40"
endMinute = "45"
}else if (minuteLong >= 45 && minuteLong <50){
startMinute = "45"
endMinute = "50"
}else if (minuteLong >= 50 && minuteLong <55){
startMinute = "50"
endMinute = "55"
}else if (minuteLong >= 55 && minuteLong <60){
startMinute = "55"
endMinute = "60"
}
val endTime = hour+endMinute // 窗口结束时间
val startTime = hour+startMinute //窗口开始时间
(startTime,endTime)
}
def main(args: Array[String]): Unit = {
val testtime = getTimestampFromFiveMinute(1536268066000L)
}
}
2 主程序
package com.atguigu.exercise.day4
import java.time.Duration
import com.atguigu.day2.{SensorReading, SensorSource}
import com.atguigu.exercise.ETL.caiutil.CustomWindowAssigner
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object CustomWindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
}
)
)
stream
.keyBy(data => data.id)
.window(new CustomWindowAssigner)
.process(new ProcessWindowFunction[SensorReading,String,String,TimeWindow](){
override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[String]): Unit = {
val startTime = context.window.getStart
val endTime = context.window.getEnd
val timeString = "startTime" + startTime +" "+"endTime"+endTime
out.collect(timeString)
}
}
)
.print()
env.execute()
}
}