zoukankan      html  css  js  c++  java
  • Flink

    参考,Flink - Generating Timestamps / Watermarks

    watermark,只有在有window的情况下才用到,所以在window operator前加上assignTimestampsAndWatermarks即可

    不一定需要从source发出

    1. 首先,source可以发出watermark

    我们就看看kafka source的实现

        protected AbstractFetcher(
                SourceContext<T> sourceContext,
                List<KafkaTopicPartition> assignedPartitions,
                SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,  //在创建KafkaConsumer的时候assignTimestampsAndWatermarks
                SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                ProcessingTimeService processingTimeProvider,
                long autoWatermarkInterval,  //env.getConfig().setAutoWatermarkInterval()
                ClassLoader userCodeClassLoader,
                boolean useMetrics) throws Exception
        {    
            //判断watermark的类型
            if (watermarksPeriodic == null) {
                if (watermarksPunctuated == null) {
                    // simple case, no watermarks involved
                    timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
                } else {
                    timestampWatermarkMode = PUNCTUATED_WATERMARKS;
                }
            } else {
                if (watermarksPunctuated == null) {
                    timestampWatermarkMode = PERIODIC_WATERMARKS;
                } else {
                    throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
                }
            }
            
            // create our partition state according to the timestamp/watermark mode 
            this.allPartitions = initializePartitions(
                    assignedPartitions,
                    timestampWatermarkMode,
                    watermarksPeriodic, watermarksPunctuated,
                    userCodeClassLoader);
            
            // if we have periodic watermarks, kick off the interval scheduler
            if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果是定期发出WaterMark
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
                        (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
                
                PeriodicWatermarkEmitter periodicEmitter= 
                        new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
                periodicEmitter.start();
            }
        }

    FlinkKafkaConsumerBase

        public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
            checkNotNull(assigner);
            
            if (this.punctuatedWatermarkAssigner != null) {
                throw new IllegalStateException("A punctuated watermark emitter has already been set.");
            }
            try {
                ClosureCleaner.clean(assigner, true);
                this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
                return this;
            } catch (Exception e) {
                throw new IllegalArgumentException("The given assigner is not serializable", e);
            }
        }

    这个接口的核心函数,定义,如何提取Timestamp和生成Watermark的逻辑

    public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
        Watermark getCurrentWatermark();
    }
    public interface TimestampAssigner<T> extends Function {
        long extractTimestamp(T element, long previousElementTimestamp);
    }

    如果在初始化KafkaConsumer的时候,没有assignTimestampsAndWatermarks,就不会产生watermark

    可以看到watermark有两种,

    PERIODIC_WATERMARKS,定期发送的watermark

    PUNCTUATED_WATERMARKS,由element触发的watermark,比如有element的特征或某种类型的element来表示触发watermark,这样便于开发者来控制watermark

    initializePartitions

    case PERIODIC_WATERMARKS: {
        @SuppressWarnings("unchecked")
        KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
                (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
                        new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
    
        int pos = 0;
        for (KafkaTopicPartition partition : assignedPartitions) {
            KPH kafkaHandle = createKafkaPartitionHandle(partition);
    
            AssignerWithPeriodicWatermarks<T> assignerInstance =
                    watermarksPeriodic.deserializeValue(userCodeClassLoader);
            
            partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                    partition, kafkaHandle, assignerInstance);
        }
    
        return partitions;
    }

    KafkaTopicPartitionStateWithPeriodicWatermarks

    这个类里面最核心的函数,

        public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
            return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
        }
        
        public long getCurrentWatermarkTimestamp() {
            Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
            if (wm != null) {
                partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
            }
            return partitionWatermark;
        }

    可以看到是调用你定义的AssignerWithPeriodicWatermarks来实现

    PeriodicWatermarkEmitter

        private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
    
            public void start() {
                timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //start定时器,定时触发
            }
            
            @Override
            public void onProcessingTime(long timestamp) throws Exception { //触发逻辑
    
                long minAcrossAll = Long.MAX_VALUE;
                for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { //对于每个partitions
                    
                    // we access the current watermark for the periodic assigners under the state
                    // lock, to prevent concurrent modification to any internal variables
                    final long curr;
                    //noinspection SynchronizationOnLocalVariableOrMethodParameter
                    synchronized (state) {
                        curr = state.getCurrentWatermarkTimestamp(); //取出当前partition的WaterMark
                    }
                    
                    minAcrossAll = Math.min(minAcrossAll, curr); //求min,以partition中最小的partition作为watermark
                }
                
                // emit next watermark, if there is one
                if (minAcrossAll > lastWatermarkTimestamp) {
                    lastWatermarkTimestamp = minAcrossAll;
                    emitter.emitWatermark(new Watermark(minAcrossAll)); //emit
                }
                
                // schedule the next watermark
                timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //重新设置timer
            }
        }

    2. DataStream也可以设置定时发送Watermark

    其实实现是加了个chain的TimestampsAndPeriodicWatermarksOperator

    DataStream

       /**
         * Assigns timestamps to the elements in the data stream and periodically creates
         * watermarks to signal event time progress.
         * 
         * <p>This method creates watermarks periodically (for example every second), based
         * on the watermarks indicated by the given watermark generator. Even when no new elements
         * in the stream arrive, the given watermark generator will be periodically checked for
         * new watermarks. The interval in which watermarks are generated is defined in
         * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
         * 
         * <p>Use this method for the common cases, where some characteristic over all elements
         * should generate the watermarks, or where watermarks are simply trailing behind the
         * wall clock time by a certain amount.
         *
         * <p>For the second case and when the watermarks are required to lag behind the maximum
         * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
         * amount is known in advance, use the
         * {@link BoundedOutOfOrdernessTimestampExtractor}.
         * 
         * <p>For cases where watermarks should be created in an irregular fashion, for example
         * based on certain markers that some element carry, use the
         * {@link AssignerWithPunctuatedWatermarks}.
         * 
         * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
         *                                      watermark generator.   
         * @return The stream after the transformation, with assigned timestamps and watermarks.
         * 
         * @see AssignerWithPeriodicWatermarks
         * @see AssignerWithPunctuatedWatermarks
         * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) 
         */
        public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
                AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
            
            // match parallelism to input, otherwise dop=1 sources could lead to some strange
            // behaviour: the watermark will creep along very slowly because the elements
            // from the source go to each extraction operator round robin.
            final int inputParallelism = getTransformation().getParallelism();
            final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
            
            TimestampsAndPeriodicWatermarksOperator<T> operator = 
                    new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
            
            return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                    .setParallelism(inputParallelism);
        }

    TimestampsAndPeriodicWatermarksOperator

      public class TimestampsAndPeriodicWatermarksOperator<T>
            extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
            implements OneInputStreamOperator<T, T>, Triggerable {
        
        private transient long watermarkInterval;
        private transient long currentWatermark;
    
        public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
            super(assigner); //AbstractUdfStreamOperator(F userFunction)
            this.chainingStrategy = ChainingStrategy.ALWAYS; //一定是chain
        }
    
        @Override
        public void open() throws Exception {
            super.open();
    
            currentWatermark = Long.MIN_VALUE;
            watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
            
            if (watermarkInterval > 0) {
                registerTimer(System.currentTimeMillis() + watermarkInterval, this); //注册到定时器
            }
        }
    
        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            final long newTimestamp = userFunction.extractTimestamp(element.getValue(), //由element中基于AssignerWithPeriodicWatermarks提取时间戳
                    element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
            
            output.collect(element.replace(element.getValue(), newTimestamp)); //更新element的时间戳,再次发出
        }
    
        @Override
        public void trigger(long timestamp) throws Exception { //定时器触发trigger
            // register next timer
            Watermark newWatermark = userFunction.getCurrentWatermark(); //取得watermark
            if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
                currentWatermark = newWatermark.getTimestamp();
                // emit watermark
                output.emitWatermark(newWatermark); //发出watermark
            }
    
            registerTimer(System.currentTimeMillis() + watermarkInterval, this); //重新注册到定时器
        }
    
        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // if we receive a Long.MAX_VALUE watermark we forward it since it is used
            // to signal the end of input and to not block watermark progress downstream
            if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
                currentWatermark = Long.MAX_VALUE;
                output.emitWatermark(mark); //forward watermark
            }
        }

    可以看到在processElement会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time

    然后更新StreamRecord的时间

    然后在Window Operator中,

    @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            final Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);

    会在windowAssigner.assignWindows时以element的timestamp作为assign时间

    对于watermark的处理,参考,Flink – window operator

  • 相关阅读:
    在Fragment中保存WebView状态
    Code First下迁移数据库更改
    脚本解决.NET MVC按钮重复提交问题
    1.1C++入门 未完待续。。。
    0.0C语言重点问题回顾
    12F:数字变换
    12G:忍者道具
    12D:迷阵
    12C:未名冰场
    12B:要变多少次
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6560874.html
Copyright © 2011-2022 走看看