zoukankan      html  css  js  c++  java
  • Flink之Watermark的设置和使用

    具体实现代码如下所示:

    main函数中代码如下:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可
    env.getConfig.setAutoWatermarkInterval(5000)
    
    val sensorStream: DataStream[SensorReading] = env
        .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
        .map(new MyMapToSensorReading)
    
    // 1、引入Watermark(使用已有的类)
    // 1.1、给一个没有乱序,时间为升序的流设置一个EventTime
    val ascendingStream: DataStream[SensorReading] = sensorStream.assignAscendingTimestamps(_.timestamp)
    // 1.2、当流中存在时间乱序问题,引入watermark,并设置延迟时间
    /**
     * 知识点:
     * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型
     * 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间)
     * 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回
     * 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便
     */
    val watermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = {
            element.timestamp * 1000
        }
    })
    
    // 2、使用 TimestampAssigner 引入 Watermark
    // 2.1、Assigner with periodic watermarks(周期性引入watermark)
    /**
     * 知识点:
     * 1、系统会周期性的将watermark插入到流中,默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置,单位为毫秒
     * 2、产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大于流中最大watermark就插入,小于就不插入
     * 3、如下,可以自定义一个周期性的时间戳抽取(需要实现 AssignerWithPeriodicWatermarks 接口)
     */
    env.getConfig.setAutoWatermarkInterval(5000)
    val periodicWatermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10))
    
    env.execute("WatermarkDemo")

    自定义类实现ProcessFunction接口:

    /**
     * 自定义一个周期生成watermark的类
     * @param bound watermark的延时时间(毫秒)
     */
    class MyPeriodicAssigner(bound: Long) extends AssignerWithPeriodicWatermarks[SensorReading] {
    
        // 当前为止的最大时间戳(毫秒)
        var maxTs: Long = Long.MinValue
    
        /**
         * 获取当前的watermark(默认200毫秒获取一次,可以通过 env.getConfig.setAutoWatermarkInterval(5000) 来设置)
         * @return 当前watermark,当前最大时间戳 - 延时时间
         */
        override def getCurrentWatermark: Watermark = {
            new Watermark(maxTs - bound)
        }
    
        /**
         * 指定eventTime对应的字段(流中每条数据都会调用一次此方法)
         * @param element 流中的每条数据
         * @param previousElementTimestamp 无
         * @return 当前流的eventTime(单位:毫秒)
         */
        override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
            // 每条数据都获取其中的时间戳,跟最大时间戳取大,并重新赋值给最大时间戳
            maxTs = maxTs.max(element.timestamp * 1000)
            element.timestamp * 1000
        }
    }
  • 相关阅读:
    2017免费获取正版win10的方法
    Apache <Directory>… </Directory>配置
    针对left join以及limit的两条优化小技巧
    win10打印机突然无法启动
    mysql中的分组统计函数及其用法实例
    程序猿的日常生活-雨中
    java中的反射
    mysql中的截取函数及其实例
    集合与数组
    方法重写
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133587.html
Copyright © 2011-2022 走看看