zoukankan      html  css  js  c++  java
  • Flink assignAscendingTimestamps 生成水印的三个重载方法

    先简单介绍一下Timestamp 和Watermark 的概念: 

    1. Timestamp和Watermark都是基于事件的时间字段生成的
    2. Timestamp和Watermark是两个不同的东西,并且一旦生成都跟事件数据没有关系了(所有即使事件中不再包含生成Timestamp和Watermark的字段也没关系)
    3. 事件数据和 Timestamp 一一对应(事件在流中传递以StreamRecord对象表示,value 和 timestamp 是它的两个成员变量)
    4. Watermark 在生成之后与事件数据没有直接关系,Watermark 作为一个消息,和事件数据一样在流中传递(Watermark 和StreamRecord 具有相同的父类:StreamElement)
    5. Timestamp 与 Watermark 在生成之后,会在下游window算子中做比较,判断事件数据是否是过期数据
    6. 只有window算子才会用Watermark判断事件数据是否过期

    Flink 在流上手动生成水印有三个重载的方法(忽略过期的一个)

     

    1. assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]

    此方法是数据流的快捷方式,其中已知元素时间戳在每个并行流中单调递增。在这种情况下,系统可以通过跟踪上升时间戳自动且完美地生成水印。

    val input = env.addSource(source)
    .map(json => {
            val id = json.get("id").asText()
            val createTime = json.get("createTime").asText()
            val amt = json.get("amt").asText()
            LateDataEvent("key", id, createTime, amt)
          })
          // flink auto create timestamp & watermark
          .assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

    注:这种方法创建时间戳与水印最简单,返回一个long类型的数字就可以了

    2.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 

    基于给定的水印生成器生成水印,即使没有新元素到达也会定期检查给定水印生成器的新水印,以指定允许延迟时间
    val input = env.addSource(source)
          .map(json => {
            val id = json.get("id").asText()
            val createTime = json.get("createTime").asText()
            val amt = json.get("amt").asText()
            LateDataEvent("key", id, createTime, amt)
          })
          // assign timestamp & watermarks periodically(定期生成水印)
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
           override def extractTimestamp(element: LateDataEvent): Long = {
             println("want watermark : " + sdf.parse(element.createTime).getTime)
             sdf.parse(element.createTime).getTime
           }
         })
          

    3.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

    此方法仅基于流元素创建水印,对于通过[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]处理的每个元素,
    调用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大于以前的
    水印,会发出新的水印,
    此方法可以完全控制水印的生成,但是要注意,每秒生成数百个水印会影响性能


    val input = env.addSource(source)
          .map(json => {
            val id = json.get("id").asText()
            val createTime = json.get("createTime").asText()
            val amt = json.get("amt").asText()
            LateDataEvent("key", id, createTime, amt)
          })
          // assign timestamp & watermarks every event
          .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
          // check extractTimestamp emitted watermark is non-null and large than previously
          override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
            new Watermark(extractedTimestamp)
          }
          // generate next watermark
          override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
            val eventTime = sdf.parse(element.createTime).getTime
            eventTime
          }
        })

    注:本文基于全部事件时间

     欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
  • 相关阅读:
    仿百度翻页(转)
    文字顺时针旋转90度(纵向)&古诗词排版
    微信小程序使用canvas绘制图片的注意事项
    PHP即时实时输出内容
    使用Android Studio遇到的问题
    RuntimeError: Model class users.models.UserProfile doesn't declare an explicit app_label and isn't in an application in INSTALLED_APPS.
    drf中的各种view,viewset
    代码审计:covercms 1.6
    windows下安装phpredis扩展
    python练习:异常
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11403665.html
Copyright © 2011-2022 走看看