zoukankan      html  css  js  c++  java
  • Flink中设置事件时间

    在flink中设置事件时间时需要将时间的表示转换为毫秒

    如果不需要转换

    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 将时间特性设置为事件时间
        env.setParallelism(1)
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "1500", "click", "page_1"),
            UserClickLog("user_2", "2000", "click", "page_1")
          )
          .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 选择事件时间的字段
        // ...
    
    }

    如果需要转换

    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
          )
          .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0))  {
              override def extractTimestamp(t: UserClickLog): Long = {
                val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
                val dateTime = DateTime.parse(t.eventTime, dateTimeFormatter)
                dateTime.getMillis   // 返回事件时间
              }
            }
          )
        // ...
    }

    Time.seconds(0): MaxOutOfOrderness 延迟时间, 水位线用于延迟窗口的触发时间

  • 相关阅读:
    结束咯
    在Ubuntu上不能使用PPA下载
    月亮+大环
    piano
    花都论坛,广州花都本地生活
    LLVM的调用协议与内存对齐
    SALVIA 0.5.2优化谈
    LLVM随笔
    OS之争:永不停歇的战争(二,完结)
    OS之争:永不停歇的战争(一)
  • 原文地址:https://www.cnblogs.com/bitbitbyte/p/13137746.html
Copyright © 2011-2022 走看看