窗口分配的概念
窗口分配程序(Window Assigners)定义如何将元素分配给窗口。负责将每个传入的元素分配给一个或多个窗口
通过window(...) (for keyed streams)
或windowAll()for non-keyed streams)
指定需要的WindowAssigner。
WindowAssigner负责将每个传入元素分配给一个或多个窗口。
Flink为最常见的用例提供了预定义的窗口分配程序如:tumbling windows, sliding windows, session windows and global windows.
同时还可以通过扩展WindowAssigner
类来实现自定义窗口assigner
。
所有内置的窗口分配程序(除了global windows)都根据时间将元素分配给窗口,时间可以是处理时间,也可以是事件时间。
基于时间的窗口有一个开始时间戳(包括)和一个结束时间戳(不包括),它们一起描述窗口的大小。[starTimestamp,endTimestamp)
:左闭右开
WindowAssigner将会把元素分配到0个,1个或者多个窗口中去。我们看一下WindowAssigner接口:
public abstract class WindowAssigner<T, W extends Window> implements Serializable { public abstract Collection<W> assignWindows( T element, long timestamp, WindowAssignerContext context); public abstract Trigger<T, W> getDefaultTriger( StreamExecutionEnvironment env); public abstract TypeSerializer<W> getWindowSerializer( ExecutionConfig executionConfig); public abstract boolean isEventTime(); public abstract static class WindowAssignerContext { public abstract long getCurrentProcessingTime(); } }
WindowAssigner有两个泛型参数:
- T: 事件的数据类型
- W: 窗口的类型
自定义 WindowAssigner
实例一
如果我们定义按天、小时、分钟的滚动窗口都很容易实现。
但是如果我们要定义一周(周日开始或周一),一个月(1号开始)的滚动窗口,那么现有API基本没法实现或很难实现。
对此就需要我们实现一个自定义的窗口分配器。
import java.text.SimpleDateFormat import java.util import java.util.{Calendar, Collections, Date} import com.meda.utils.DateHelper import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow /** * 实现根据周或月的窗口划分窗口 * 比如按照周日的00:00:00到下一个周六23:59:59 * 或者每个月第一天的00:00:00到最后一天的23:59:59 * 实现参考了 TumblingEventTimeWindows * * @param tag 标签 month or week * @tparam T 需要划分窗口的数据类型(输入类型) */ class CustomWindowAssigner[T](tag: String) extends WindowAssigner[T, TimeWindow] { //窗口分配的主要方法,需要为每一个元素指定所属的分区 override def assignWindows(element: T, timestamp: Long, context: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow] = { var offset: (Long, Long) = null tag match { case "month" => offset = getTimestampFromMon(timestamp) case "week" => offset = getTimestampFromWeek(timestamp) } //分配窗口 Collections.singletonList(new TimeWindow(offset._1, offset._2)) } //注意此处需要进行类型的转换,否则或编译出错,java版本好像没问题,但是java对于上面的offset处理有点难搞,所以放弃了 override def getDefaultTrigger(env: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]] override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer //是否使用事件时间 override def isEventTime: Boolean = true /** * 获取指定时间戳当月时间戳范围 * eg:2020-03-12 11:35:13 (timestamp=1583984113960l) * 结果为:(1582992000000,1585670399999)=>(2020-03-01 00:00:00,2020-03-31 23:59:59) * * @param timestamp 时间戳 * @return */ def getTimestampFromMon(timestamp: Long): (Long, Long) = { val calendar = Calendar.getInstance() calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMM01000000").format(new Date(timestamp)), "yyyyMMddHHmmss")) val numsOfMon: Long = calendar.getActualMaximum(Calendar.DAY_OF_MONTH) calendar.set(Calendar.DAY_OF_MONTH, 1) val start: Long = calendar.getTimeInMillis val end: Long = start + numsOfMon * 24 * 60 * 60 * 1000 - 1 (start, end) } /** * 获取指定时间戳本周时间范围(从周日开始) * eg:2020-03-14 23:59:59 (timestamp=1583895064000l) * 结果为:(1583596800000,1584201599999)=>(2020-03-08 00:00:00,2020-03-14 23:59:59) * * @param timestamp 时间戳 * @return */ def getTimestampFromWeek(timestamp: Long): (Long, Long) = { val calendar = Calendar.getInstance() calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMMdd000000").format(new Date(timestamp)), "yyyyMMddHHmmss")) // calendar.setFirstDayOfWeek(Calendar.SUNDAY)//设置周日为首日 默认值,一般不用设置 calendar.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY) val start: Long = calendar.getTimeInMillis (start, start + 7 * 24 * 60 * 60 * 1000l - 1) } }
//输入数据 case class Top100Input(event_id: String, date_d: String, timeStamp: Long, uid: Long, weekTag: String, monthTag: String)
//调用 val dStream: DataStream[Top100Input] = ... dStream .keyBy(_.weekTag) .window(new CustomWindowAssigner[Top100Input]("week")) dStream .keyBy(_.monthTag) .window(new CustomWindowAssigner[Top100Input]("month"))
作者:利伊奥克儿
链接:https://www.jianshu.com/p/01e62c37dd71
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
链接:https://www.jianshu.com/p/01e62c37dd71
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
实例二:
下面的代码创建了一个自定义窗口分配器,是一个30秒的滚动事件时间窗口。
class ThirtySecondsWindows extends WindowAssigner[Object, TimeWindow] { val windowSize: Long = 30 * 1000L override def assignWindows( o: Object, ts: Long, ctx: WindowAssigner.WindowAssignerContext ): java.util.List[TimeWindow] = { val startTime = ts - (ts % windowSize) val endTime = startTime + windowSize Collections.singletonList(new TimeWindow(startTime, endTime)) } override def getDefaultTrigger( env: environment.StreamExecutionEnvironment ): Trigger[Object, TimeWindow] = { EventTimeTrigger.create() } override def getWindowSerializer( executionConfig: ExecutionConfig ): TypeSerializer[TimeWindow] = { new TimeWindow.Serializer } override def isEventTime = true }