zoukankan      html  css  js  c++  java
  • Flink实例(五十五):自定义时间和窗口的操作符(十)TimestampAssigner接口 (一)设置事件时间

    在flink中设置事件时间时需要将时间的表示转换为毫秒

    如果不需要转换

    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 将时间特性设置为事件时间
        env.setParallelism(1)
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "1500", "click", "page_1"),
            UserClickLog("user_2", "2000", "click", "page_1")
          )
          .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 选择事件时间的字段
        // ...
    
    }

    如果需要转换

    import java.text.SimpleDateFormat
    import org.joda.time.DateTime
    
    //.....
    
    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
          )
          .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0))  {
              override def extractTimestamp(t: UserClickLog): Long = {
                val pattern = "yyyy-MM-dd HH:mm:ss"
                val date = new SimpleDateFormat(pattern).parse(t.evntTime)
                date.getTime   // 返回事件时间 milliseconds
              }
            }
          )
        // ...
    }    

    Time.seconds(0): MaxOutOfOrderness 延迟时间, 水位线用于延迟窗口的触发时间

    附录:Scala日期操作

    https://zhuanlan.zhihu.com/p/50088687

    前言

    本文主要记录我自己对日期格式数据的一些常用操作,主要目的是备忘,方便随时查阅。本文没有将代码封装为函数,如果有需要的可以自行封装,注意每一部分的代码会依赖前面代码里的变量。

    代码可以直接在spark-shell里运行(在scala里有的包没有)

    1、字符串转日期

    import java.text.SimpleDateFormat
    import org.joda.time.DateTime
    val dateStr = "2018-06-01"
    val pattern = "yyyy-MM-dd"
    val date = new SimpleDateFormat(pattern).parse(dateStr)
    val dateTime = new DateTime(date)
    println(date)
    println(dateTime)
    Fri Jun 01 00:00:00 CST 2018
    2018-06-01T00:00:00.000+08:00

    2、日期转字符串

    将上面的日期转成其他格式的字符串

    println(new SimpleDateFormat("yyyyMMdd").format(date))
    20180601

    3、字符串转时间戳

    println(date.getTime)
    println(date.getTime)

    4、计算时间差

    val startDateStr = "2018-03-21"
    val endDateStr = "2018-03-22"
    val startDate = new SimpleDateFormat(pattern).parse(startDateStr)
    val endDate = new SimpleDateFormat(pattern).parse(endDateStr)
    val between = endDate.getTime - startDate.getTime
    val second = between / 1000
    val hour = between / 1000 / 3600
    val day = between / 1000 / 3600 / 24
    val year = between / 1000 / 3600 / 24 / 365

    如果需要结果为小数,以hour举例

    import java.text.DecimalFormat
    val hour: Float = between.toFloat / 1000 / 3600
    val decf: DecimalFormat = new DecimalFormat("#.00")
    println(hour)
    println(decf.format(hour)) //格式化为两位小数
    24.0
    24.00

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13866355.html

  • 相关阅读:
    【转】我是一个线程
    前端之 JS 实现全选、反选、取消选中
    Python文件操作——逐行插入内容
    angularJs实现数据双向绑定的原理
    手机连接电脑调试页面
    工程化框架之feather
    网页上线后音频不能自动播放
    FormData对象
    地图热区自适应
    需求移交会
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13866355.html
Copyright © 2011-2022 走看看