zoukankan      html  css  js  c++  java
  • Flink实例(五十五):自定义时间和窗口的操作符(十)TimestampAssigner接口 (一)设置事件时间

    在flink中设置事件时间时需要将时间的表示转换为毫秒

    如果不需要转换

    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 将时间特性设置为事件时间
        env.setParallelism(1)
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "1500", "click", "page_1"),
            UserClickLog("user_2", "2000", "click", "page_1")
          )
          .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 选择事件时间的字段
        // ...
    
    }

    如果需要转换

    import java.text.SimpleDateFormat
    import org.joda.time.DateTime
    
    //.....
    
    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
          )
          .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0))  {
              override def extractTimestamp(t: UserClickLog): Long = {
                val pattern = "yyyy-MM-dd HH:mm:ss"
                val date = new SimpleDateFormat(pattern).parse(t.evntTime)
                date.getTime   // 返回事件时间 milliseconds
              }
            }
          )
        // ...
    }    

    Time.seconds(0): MaxOutOfOrderness 延迟时间, 水位线用于延迟窗口的触发时间

    附录:Scala日期操作

    https://zhuanlan.zhihu.com/p/50088687

    前言

    本文主要记录我自己对日期格式数据的一些常用操作,主要目的是备忘,方便随时查阅。本文没有将代码封装为函数,如果有需要的可以自行封装,注意每一部分的代码会依赖前面代码里的变量。

    代码可以直接在spark-shell里运行(在scala里有的包没有)

    1、字符串转日期

    import java.text.SimpleDateFormat
    import org.joda.time.DateTime
    val dateStr = "2018-06-01"
    val pattern = "yyyy-MM-dd"
    val date = new SimpleDateFormat(pattern).parse(dateStr)
    val dateTime = new DateTime(date)
    println(date)
    println(dateTime)
    Fri Jun 01 00:00:00 CST 2018
    2018-06-01T00:00:00.000+08:00

    2、日期转字符串

    将上面的日期转成其他格式的字符串

    println(new SimpleDateFormat("yyyyMMdd").format(date))
    20180601

    3、字符串转时间戳

    println(date.getTime)
    println(date.getTime)

    4、计算时间差

    val startDateStr = "2018-03-21"
    val endDateStr = "2018-03-22"
    val startDate = new SimpleDateFormat(pattern).parse(startDateStr)
    val endDate = new SimpleDateFormat(pattern).parse(endDateStr)
    val between = endDate.getTime - startDate.getTime
    val second = between / 1000
    val hour = between / 1000 / 3600
    val day = between / 1000 / 3600 / 24
    val year = between / 1000 / 3600 / 24 / 365

    如果需要结果为小数,以hour举例

    import java.text.DecimalFormat
    val hour: Float = between.toFloat / 1000 / 3600
    val decf: DecimalFormat = new DecimalFormat("#.00")
    println(hour)
    println(decf.format(hour)) //格式化为两位小数
    24.0
    24.00

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13866355.html

  • 相关阅读:
    Mysql字符串字段判断是否包含某个字符串的方法
    mysql中用group_concat把selct中的数据列表转换成逗号分隔的字符串
    iview中对列标题头进行格式渲染render
    iview中父组件的数据通过props属性传值给子组件
    iview分页问题
    iview实现国际化
    MySQL查询优化:GROUP BY
    使用@ContextConfiguration替换@SpringBootTest
    WebAppConfiguration in Spring Tests
    使用@ContextConfiguration或者@ContextWebConfiguration注解调用resource文件夹下面的yml文件
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13866355.html
Copyright © 2011-2022 走看看