zoukankan      html  css  js  c++  java
  • Flink watermark 练习

    生成WaterMark对应的源码

    方式1:assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<T>)

    /**
     * Assigns timestamps to the elements in the data stream and periodically creates
     * watermarks to signal event time progress.
     *
     * <p>This method creates watermarks periodically (for example every second), based
     * on the watermarks indicated by the given watermark generator. Even when no new elements
     * in the stream arrive, the given watermark generator will be periodically checked for
     * new watermarks. The interval in which watermarks are generated is defined in
     * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * <p>Use this method for the common cases, where some characteristic over all elements
     * should generate the watermarks, or where watermarks are simply trailing behind the
     * wall clock time by a certain amount.
     *
     * <p>For the second case and when the watermarks are required to lag behind the maximum
     * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
     * amount is known in advance, use the
     * {@link BoundedOutOfOrdernessTimestampExtractor}.
     *
     * <p>For cases where watermarks should be created in an irregular fashion, for example
     * based on certain markers that some element carry, use the
     * {@link AssignerWithPunctuatedWatermarks}.
     *
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     *
     * @see AssignerWithPeriodicWatermarks
     * @see AssignerWithPunctuatedWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
          AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
    ​
       // match parallelism to input, otherwise dop=1 sources could lead to some strange
       // behaviour: the watermark will creep along very slowly because the elements
       // from the source go to each extraction operator round robin.
       final int inputParallelism = getTransformation().getParallelism();
       final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
    ​
       TimestampsAndPeriodicWatermarksOperator<T> operator =
             new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
    ​
       return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
             .setParallelism(inputParallelism);
    }

    方式2:assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<T>)

    /**
     * Assigns timestamps to the elements in the data stream and creates watermarks to
     * signal event time progress based on the elements themselves.
     *
     * <p>This method creates watermarks based purely on stream elements. For each element
     * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
     * the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
     * method is called, and a new watermark is emitted, if the returned watermark value is
     * non-negative and greater than the previous watermark.
     *
     * <p>This method is useful when the data stream embeds watermark elements, or certain elements
     * carry a marker that can be used to determine the current event time watermark.
     * This operation gives the programmer full control over the watermark generation. Users
     * should be aware that too aggressive watermark generation (i.e., generating hundreds of
     * watermarks every second) can cost some performance.
     *
     * <p>For cases where watermarks should be created in a regular fashion, for example
     * every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}.
     *
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     *
     * @see AssignerWithPunctuatedWatermarks
     * @see AssignerWithPeriodicWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
          AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {
    ​
       // match parallelism to input, otherwise dop=1 sources could lead to some strange
       // behaviour: the watermark will creep along very slowly because the elements
       // from the source go to each extraction operator round robin.
       final int inputParallelism = getTransformation().getParallelism();
       final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
    ​
       TimestampsAndPunctuatedWatermarksOperator<T> operator =
             new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner);
    ​
       return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
             .setParallelism(inputParallelism);
    }

    WaterMarkDemo

    5s跨度的基于事件时间的翻滚窗口统计每个商品的下单数

    使用AssignerWithPeriodicWatermarks生成Watermark

        public static void main(String[] args) throws Exception {
    ​
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置为eventTime时间类型
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置Watermark生成时间间隔100ms
            env.getConfig().setAutoWatermarkInterval(100);
    ​
            DataStreamSource<String> source = env.socketTextStream("172.xx.x.xxx", 9001);//nc -lk 9001
            DataStream<String> dataStream = source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
    ​
                private Long currentTimeStamp = 0L;
                //设置允许数据延迟的时间
                private Long maxOutOfOrderness = 5000L;
    ​
                @Override
                public Watermark getCurrentWatermark() {
                    return new Watermark(currentTimeStamp - maxOutOfOrderness);
                }
    ​
                @Override
                public long extractTimestamp(String element, long previousElementTimestamp) {
                    String[] arr = element.split(",");
                    long eventTime = Long.parseLong(arr[2]);
                    currentTimeStamp = Math.max(eventTime, currentTimeStamp);
                    System.err.println("element:" + element + "; eventTime:" + eventTime + "; watermark:" + (currentTimeStamp - maxOutOfOrderness));
                    return eventTime;
                }
            });
    ​
            //根据商品类型分组统计下单的个数
            //5s跨度的基于事件时间的翻滚窗口,执行统计时候是前闭后开的,比如0-5秒的时候,计算的数据是[0,5)
            dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
                }
            }).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();
    ​
            env.execute();
        }

     

    5s跨度的基于事件时间的翻滚窗口,执行统计时候是前闭后开的,比如0-5秒的时候,计算的数据是[0,5)

    比如第一条来的数据是 tie,1,1590542810000(可以看成是第0秒的数据)

    执行窗口统计触发条件是第5秒的数据过来,比如以下数据:

    tie,1,1590542810000 
    tie,2,1590542811000
    shoes,1,1590542812000
    cup,2,1590542813000
    shoes,2,1590542814000
    cup,3,1590542815000 --在这条数据过来之后触发window操作

    统计的范围是:

    tie,1,1590542810000 
    tie,2,1590542811000
    shoes,1,1590542812000
    cup,2,1590542813000
    shoes,2,1590542814000

    不包括 cup,3,1590542815000 这条数据,也就是说这次窗口统计的结果是:

    (cup,2)
    (shoes,3)
    (tie,3)

    这个逻辑的前提是数据产生的时间(也就是event_time) 等于 watermark,不接受任何延迟数据

    不同watermark定义下的数据延迟处理

    如果出现数据延迟过来的情况,在我们定义watermark时候如果设置的允许数据延迟的时间为0的话(private Long maxOutOfOrderness = 0L),就会丢弃延迟的数据不作计算,比如以下情况:

    element:tie,1,1590542810000; eventTime:1590542810000; watermark:1590542810000
    element:tie,2,1590542811000; eventTime:1590542811000; watermark:1590542811000
    element:shoes,1,1590542812000; eventTime:1590542812000; watermark:1590542812000
    element:shoes,2,1590542814000; eventTime:1590542814000; watermark:1590542814000
    element:cup,3,1590542815000; eventTime:1590542815000; watermark:1590542815000
    6> (tie,3)
    8> (shoes,3)
    element:tie,3,1590542816000; eventTime:1590542816000; watermark:1590542816000
    element:cup,2,1590542813000; eventTime:1590542813000; watermark:1590542816000  //这条数据出现乱序,注意看生成的watermark
    element:tie,1,1590542817000; eventTime:1590542817000; watermark:1590542817000
    element:cup,1,1590542818000; eventTime:1590542818000; watermark:1590542818000
    element:shoes,1,1590542819000; eventTime:1590542819000; watermark:1590542819000
    element:tie,1,1590542820000; eventTime:1590542820000; watermark:1590542820000
    6> (tie,4)
    3> (cup,4)
    8> (shoes,1) 
    ​
    可以发现:
    1.第1次窗口计算是等到cup,3,1590542815000数据来了以后开始计算的。参与的数据是[0,5)秒
    2.第2次窗口计算是等到tie,1,1590542820000数据来了以后开始计算的。参与的数据是[5,10)秒
    3.两次窗口计算结果都不包含那条乱序的数据。
    4.生成的watermark是递增的

    如果允许数据延迟,比如延迟5秒,如下情况:

    element:tie,1,1590542810000; eventTime:1590542810000; watermark:1590542805000
    element:tie,2,1590542811000; eventTime:1590542811000; watermark:1590542806000
    element:shoes,1,1590542812000; eventTime:1590542812000; watermark:1590542807000
    element:shoes,2,1590542814000; eventTime:1590542814000; watermark:1590542809000
    element:cup,3,1590542815000; eventTime:1590542815000; watermark:1590542810000
    element:tie,3,1590542816000; eventTime:1590542816000; watermark:1590542811000
    element:cup,2,1590542813000; eventTime:1590542813000; watermark:1590542811000
    element:tie,1,1590542817000; eventTime:1590542817000; watermark:1590542812000
    element:cup,1,1590542818000; eventTime:1590542818000; watermark:1590542813000
    element:shoes,1,1590542819000; eventTime:1590542819000; watermark:1590542814000
    element:tie,1,1590542820000; eventTime:1590542820000; watermark:1590542815000
    6> (tie,3)
    3> (cup,2)
    8> (shoes,3)
    element:cup,1,1590542821000; eventTime:1590542821000; watermark:1590542816000
    ​
    可以发现:
    1.在设置了允许延迟5秒以后,在第1个窗口统计时候,一直等到数据tie,1,1590542820000出现之后才触发计算
    2.参与计算的数据仍然是[0,5)秒对应的数据
    3.对应的watermark是递增的,即便出现了cup,2,1590542813000乱序数据,其整体watermark还是递增。

     

  • 相关阅读:
    Kafka 再均衡监听器示例
    Spring boot中异步线程池
    【Java&Go并发编程系列】4.等待一组并发任务完成——CountDownLatch VS sync.WaitGroup
    Redis常用命令对应到Redisson对象操作
    js清空缓存,ajax
    phpexcel用法 转、
    composer 使用
    转:git操作
    手机微信内支付
    微信扫码支付
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12971239.html
Copyright © 2011-2022 走看看