zoukankan      html  css  js  c++  java
  • Flink中设置事件时间

    在flink中设置事件时间时需要将时间的表示转换为毫秒

    如果不需要转换

    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 将时间特性设置为事件时间
        env.setParallelism(1)
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "1500", "click", "page_1"),
            UserClickLog("user_2", "2000", "click", "page_1")
          )
          .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 选择事件时间的字段
        // ...
    
    }

    如果需要转换

    def main(args: Array[String]): Unit = {
    
        // ...
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间
    
        val clickStream = env
          .fromElements(
            UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
          )
          .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0))  {
              override def extractTimestamp(t: UserClickLog): Long = {
                val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
                val dateTime = DateTime.parse(t.eventTime, dateTimeFormatter)
                dateTime.getMillis   // 返回事件时间
              }
            }
          )
        // ...
    }

    Time.seconds(0): MaxOutOfOrderness 延迟时间, 水位线用于延迟窗口的触发时间

  • 相关阅读:
    POJ3233]Matrix Power Series && [HDU1588]Gauss Fibonacci
    [codeforces 508E]Maximum Matching
    [SDOI2011]染色
    [CSU1806]Toll
    [HDU4969]Just a Joke
    [HDU1071]The area
    [HDU1724]Ellipse
    [VIJOS1889]天真的因数分解
    [BZOJ3379] Turning in Homework
    [BZOJ1572] WorkScheduling
  • 原文地址:https://www.cnblogs.com/bitbitbyte/p/13137746.html
Copyright © 2011-2022 走看看