zoukankan      html  css  js  c++  java
  • Flink的时间类型和watermark机制

    一FlinkTime类型

        有3类时间,分别是数据本身的产生时间、进入Flink系统的时间和被处理的时间,在Flink系统中的数据可以有三种时间属性:

    Event Time 是每条数据在其生产设备上发生的时间。这段时间通常嵌入在记录数据中,然后进入Flink,可以从记录中提取事件的时间戳;Event Time即使在数据发生乱序,延迟或者从备份或持久性日志中重新获取数据的情况下,也能提供正确的结果。这个时间是最有价值的,和挂在任何电脑/操作系统的时钟时间无关。

    Processing Time 是指执行相应操作的机器的系统时间。如果流计算系统基于Processing Time来处理,对流处理系统来说是最简单的,所有基于时间的操作(如Time Window)将使用运行相应算子的机器的系统时钟。然而,在分布式和异步环境中,Processing Time并不能保证确定性,它容易受到Event到达系统的速度(例如来自消息队列)以及数据在Flink系统内部处理的先后顺序的影响,所以Processing Time不能准确地反应数据发生的时间序列情况。

    Ingestion Time是事件进入Flink的时间。 在Source算子处产生,也就是在Source处获取到这个数据的时间,Ingestion Time在概念上位于Event Time和Processing Time之间。在Source处获取数据的时间,不受Flink分布式系统内部处理Event的先后顺序和数据传输的影响,相对稳定一些,但是Ingestion Time和Processing Time一样,不能准确地反应数据发生的时间序列情况。

    二 Watermark机制

    上面提到Event Time是最能反映数据时间属性的,但是Event Time可能会发生延迟或乱序,Flink系统本身只能逐个处理数据,如何应对Event Time可能会发生延迟或乱序情况呢?

    比如需要统计从10:00到11:00发生某个事件的次数,也就是对Event Time是在10:00和11:00之间的数据统计个数。Event Time可能会发生延迟或乱序的情况下,Flink系统怎么判断10:00到11:00发生的事件数据都已到达,可以给出统计结果了呢?长时间地等待会推迟结果输出时间,而且占用更多系统资源。

    Watermark是一个对Event Time的标识,内容方面Watermark是个时间戳,一个带有时间戳X的Watermark到达,相当于告诉Flink系统,任何Event Time小于X的数据都已到达。比如上面的例子,如果Flink收到一个时间戳是11:01的Watermark,它就可以把之前统计的Event Time在[10:00,11:01)之间的事件个数输出,清空相关被占用的资源。这里需要注意窗口的长度问题,只有窗口采集完成的数据,才会统计。

    三 Watermark生成

    Periodic - 一定时间间隔或者达到一定的记录条数会产生一个watermark。

    Punctuated – 基于event time通过一定的逻辑产生watermark,比如收到一个数据就产生一个WaterMark,时间是event time - 5秒。

    这两种产生方式,都有机制来保证产生的watermark是单调递增的。

    即使有了watermark,如果现实中,数据没有满足watermark所保证的条件怎么办?比如Flink处理了11:01的watermark,但是之后遇到了event time是10:00~11:00之间的数据怎么办?首先如果这种事情出现的概率非常小,不影响所要求的准确度,可以直接把数据丢弃;如果这种事情出现的概率比较大,就要调整产生water mark的机制了。

    除了把违反watermark机制的数据丢弃,也有不丢弃的处理方法,比如通过一些机制来更新之前统计的结果,这种方式会有一定的性能开销。

    四代码示例

    package org.tonny.flink.bi.job.water;

    import org.apache.commons.lang3.ArrayUtils;
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.StringUtils;

    /**
     *
    在指定的linux机器上开启nc -l 9900
     *
    输入的数据格式:
     
    * hello1 1567059808519
     * hello2 1567059809519
     * hello3 1567059810519
     */
    public class WaterMarkJob {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().disableSysoutLogging();//关闭日志打印
           
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //设置时间分配器

           
    env.setParallelism(1);  //设置并行度
           
    env.getConfig().setAutoWatermarkInterval(3000);//9秒发出一个watermark

           
    DataStream<String> text = env.socketTextStream("localhost", 9900);

            DataStream<Tuple3<String, Long, Integer>> counts = text
                    // 设置过滤
                   
    .filter(new FilterClass())
                    // 设置分词
                   
    .map(new LineSplitter())
                    //设置watermark方法
                   
    .assignTimestampsAndWatermarks(new PeriodicWatermarks())
                    .keyBy(0)
                    //设置滚动窗口大小
                   
    .timeWindow(Time.seconds(60))
                    .sum(2);

            counts.print();
            env.execute("Window WordCount");

        }

        public static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>> {
            private long currentMaxTimestamp = 0L;

            private final long maxOutOfOrderness = 10000L;   //这个控制失序已经延迟的度量,时间戳10秒以前的数据

           
    //获取EventTime
            
    @Override
            public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) {
                if (element == null) {
                    return currentMaxTimestamp;
                }

                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                System.out.println("get timestamp is " + timestamp + " currentMaxTimestamp " + currentMaxTimestamp);
                return timestamp;
            }

            //获取Watermark
           
    @Override
            public Watermark getCurrentWatermark() {
                System.out.println("wall clock is " + System.currentTimeMillis() + " new watermark " + (currentMaxTimestamp - maxOutOfOrderness));
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }
        }

        //构造出element以及它的event time.然后把次数赋值为1
       
    public static final class LineSplitter implements MapFunction<String, Tuple3<String, Long, Integer>> {
            @Override
            public Tuple3<String, Long, Integer> map(String value) throws Exception {
                if (org.apache.commons.lang3.StringUtils.isBlank(value)) {
                    return null;
                }

                String[] tokens = value.toLowerCase().split("\W+");
                if (ArrayUtils.isEmpty(tokens) || ArrayUtils.getLength(tokens) < 2) {
                    return null;
                }
                long eventtime = 0L;
                try {
                    eventtime = Long.parseLong(tokens[1]);
                } catch (NumberFormatException e) {
                    return null;
                }
                return new Tuple3<String, Long, Integer>(tokens[0], eventtime, 1);
            }
        }

        /**
         *
    过滤掉为nullwhitespace的字符串
        
    */
       
    public static final class FilterClass implements FilterFunction<String> {
            @Override
            public boolean filter(String value) throws Exception {
                if (StringUtils.isNullOrWhitespaceOnly(value)) {
                    return false;
                } else {
                    return true;
                }
            }

        }
    }

  • 相关阅读:
    Oracle SQL语句大全—查看表空间
    Class to disable copy and assign constructor
    在moss上自己总结了点小经验。。高手可以飘过 转贴
    在MOSS中直接嵌入ASP.NET Page zt
    Project Web Access 2007自定义FORM验证登录实现 zt
    SharePoint Portal Server 2003 中的单一登录 zt
    vs2008 开发 MOSS 顺序工作流
    VS2008开发MOSS工作流几个需要注意的地方
    向MOSS页面中添加服务器端代码的另外一种方式 zt
    状态机工作流的 SpecialPermissions
  • 原文地址:https://www.cnblogs.com/supertonny/p/11430145.html
Copyright © 2011-2022 走看看