在flink中设置事件时间时需要将时间的表示转换为毫秒
如果不需要转换
def main(args: Array[String]): Unit = { // ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间 env.setParallelism(1) val clickStream = env .fromElements( UserClickLog("user_2", "1500", "click", "page_1"), UserClickLog("user_2", "2000", "click", "page_1") ) .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 选择事件时间的字段 // ... }
如果需要转换
import java.text.SimpleDateFormat import org.joda.time.DateTime //..... def main(args: Array[String]): Unit = { // ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间 val clickStream = env .fromElements( UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1") ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0)) { override def extractTimestamp(t: UserClickLog): Long = { val pattern = "yyyy-MM-dd HH:mm:ss" val date = new SimpleDateFormat(pattern).parse(t.evntTime) date.getTime // 返回事件时间 milliseconds } } ) // ... }
Time.seconds(0): MaxOutOfOrderness 延迟时间, 水位线用于延迟窗口的触发时间
附录:Scala日期操作
https://zhuanlan.zhihu.com/p/50088687
前言
本文主要记录我自己对日期格式数据的一些常用操作,主要目的是备忘,方便随时查阅。本文没有将代码封装为函数,如果有需要的可以自行封装,注意每一部分的代码会依赖前面代码里的变量。
代码可以直接在spark-shell里运行(在scala里有的包没有)
1、字符串转日期
import java.text.SimpleDateFormat
import org.joda.time.DateTime
val dateStr = "2018-06-01"
val pattern = "yyyy-MM-dd"
val date = new SimpleDateFormat(pattern).parse(dateStr)
val dateTime = new DateTime(date)
println(date)
println(dateTime)
Fri Jun 01 00:00:00 CST 2018
2018-06-01T00:00:00.000+08:00
2、日期转字符串
将上面的日期转成其他格式的字符串
println(new SimpleDateFormat("yyyyMMdd").format(date))
20180601
3、字符串转时间戳
println(date.getTime)
println(date.getTime)
4、计算时间差
val startDateStr = "2018-03-21"
val endDateStr = "2018-03-22"
val startDate = new SimpleDateFormat(pattern).parse(startDateStr)
val endDate = new SimpleDateFormat(pattern).parse(endDateStr)
val between = endDate.getTime - startDate.getTime
val second = between / 1000
val hour = between / 1000 / 3600
val day = between / 1000 / 3600 / 24
val year = between / 1000 / 3600 / 24 / 365
如果需要结果为小数,以hour举例
import java.text.DecimalFormat
val hour: Float = between.toFloat / 1000 / 3600
val decf: DecimalFormat = new DecimalFormat("#.00")
println(hour)
println(decf.format(hour)) //格式化为两位小数
24.0
24.00