zoukankan      html  css  js  c++  java
  • Flink – process watermark

    WindowOperator.processElement

    主要的工作,将当前的element的value加到对应的window中,

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
    
                triggerContext.key = key;
                triggerContext.window = window;
    
                TriggerResult triggerResult = triggerContext.onElement(element);

     

    调用triggerContext.onElement

    这里的Context只是一个简单的封装,

            public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
                return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
            }

     

    EventTimeTrigger

    onElement

        @Override
        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                // if the watermark is already past the window fire immediately
                return TriggerResult.FIRE;
            } else {
                ctx.registerEventTimeTimer(window.maxTimestamp());
                return TriggerResult.CONTINUE;
            }
        }

    如果当前window.maxTimestamp已经小于CurrentWatermark,直接触发

    否则将window.maxTimestamp注册到TimeService中,等待触发

     

    WindowOperator.Context

            public void registerEventTimeTimer(long time) {
                internalTimerService.registerEventTimeTimer(window, time);
            }

     

    InternalTimerService
     
    在AbstractStreamOperator
    public abstract class AbstractStreamOperator<OUT>
            implements StreamOperator<OUT>, Serializable, KeyContext {
    注意这里实现了KeyContext 
    所以AbstractStreamOperator实现了
        public Object getCurrentKey() {
            if (keyedStateBackend != null) {
                return keyedStateBackend.getCurrentKey();
            } else {
                throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream.");
            }
        }
     
    在AbstractStreamOperator初始化InternalTimeServiceManager
    private transient InternalTimeServiceManager<?, ?> timeServiceManager;
    @Override
        public final void initializeState(OperatorStateHandles stateHandles) throws Exception {
    
            if (getKeyedStateBackend() != null && timeServiceManager == null) {
                timeServiceManager = new InternalTimeServiceManager<>(
                    getKeyedStateBackend().getNumberOfKeyGroups(),
                    getKeyedStateBackend().getKeyGroupRange(),
                    this,
                    getRuntimeContext().getProcessingTimeService());
            }

     

    WindowOperator中InternalTimerService初始化,
    internalTimerService =
    getInternalTimerService("window-timers", windowSerializer, this);

    在AbstractStreamOperator调用,

        public <K, N> InternalTimerService<N> getInternalTimerService(
                String name,
                TypeSerializer<N> namespaceSerializer,
                Triggerable<K, N> triggerable) {
    
            checkTimerServiceInitialization();
    
            // the following casting is to overcome type restrictions.
            TypeSerializer<K> keySerializer = (TypeSerializer<K>) getKeyedStateBackend().getKeySerializer();
            InternalTimeServiceManager<K, N> keyedTimeServiceHandler = (InternalTimeServiceManager<K, N>) timeServiceManager;
            return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);
        }

    其实就是调用InternalTimeServiceManager.getInternalTimerService

    最终得到HeapInternalTimerService

     

    HeapInternalTimerService.registerEventTimeTimer
        @Override
        public void registerEventTimeTimer(N namespace, long time) {
            InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
            Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
            if (timerSet.add(timer)) {
                eventTimeTimersQueue.add(timer);
            }
        }

    创建InternalTimer,包含,time(window.maxTimestamp), key(keyContext.getCurrentKey), namespace(window)

     

    getEventTimeTimerSetForTimer

        private Set<InternalTimer<K, N>> getEventTimeTimerSetForTimer(InternalTimer<K, N> timer) {
            checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
            int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), this.totalKeyGroups);
            return getEventTimeTimerSetForKeyGroup(keyGroupIdx);
        }

        private Set<InternalTimer<K, N>> getEventTimeTimerSetForKeyGroup(int keyGroupIdx) {
            int localIdx = getIndexForKeyGroup(keyGroupIdx);
            Set<InternalTimer<K, N>> timers = eventTimeTimersByKeyGroup[localIdx];
            if (timers == null) {
                timers = new HashSet<>();
                eventTimeTimersByKeyGroup[localIdx] = timers;
            }
            return timers;
        }

    先找到key所对应的,keygroup,每个keygroup对应于一个Timer集合

    这样设计的目的,因为最终timer也是要checkpoint的,而checkpoint的最小单位是keygroup,所以不同keygroup所对应的timer需要分离开

    最终把timer加到eventTimeTimersQueue,

    private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;

    PriorityQueue是堆实现的,所以只要在InternalTimer里面实现compareTo,就可以让timer排序

    AbstractStreamOperator.processWatermark

       public void processWatermark(Watermark mark) throws Exception {
            if (timeServiceManager != null) {
                timeServiceManager.advanceWatermark(mark);
            }
            output.emitWatermark(mark);
        }

    timeServiceManager.advanceWatermark

        public void advanceWatermark(Watermark watermark) throws Exception {
            for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
                service.advanceWatermark(watermark.getTimestamp());
            }
        }

    HeapInternalTimerService.advanceWatermark

        public void advanceWatermark(long time) throws Exception {
            currentWatermark = time;
    
            InternalTimer<K, N> timer;
    
            while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    
                Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
                timerSet.remove(timer);
                eventTimeTimersQueue.remove();
    
                keyContext.setCurrentKey(timer.getKey());
                triggerTarget.onEventTime(timer);
            }
        }

    从eventTimeTimersQueue从小到大取timer,如果小于传入的water mark,那么说明这个window需要触发

    设置operater的current key,keyContext.setCurrentKey(timer.getKey())

    这里注意watermarker是没有key的,所以当一个watermark来的时候是会触发所有timer,而timer的key是不一定的,所以这里一定要设置keyContext,否则就乱了

    最终触发triggerTarget.onEventTime

    triggerTarget就是WindowOperator

    WindowOperator.onEventTime

            windowState.setCurrentNamespace(triggerContext.window);
    
            ACC contents = null;
            if (windowState != null) {
                contents = windowState.get();
            }
    
            if (contents != null) {
                TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
                if (triggerResult.isFire()) {
                    emitWindowContents(triggerContext.window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
            }

    这里调用triggerContext.onEventTime,得到TriggerResult

    如果fire,走到这,这个肯定满足的,emitWindowContents

    如果purge,就把windowState清空

    emitWindowContents,调用用户定义的windowFunction来处理window的contents

        private void emitWindowContents(W window, ACC contents) throws Exception {
            timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
            processContext.window = window;
            userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
        }

  • 相关阅读:
    Java
    Java
    Python 浮点数类型的精度问题
    Ubuntu下pip的更新问题
    初章
    第二次结对编程作业
    第一次结对编程作业
    Shengnan的《构建之法》读书笔记
    Backend事后诸葛亮
    ASE Alpha Sprint
  • 原文地址:https://www.cnblogs.com/fxjwind/p/7657058.html
Copyright © 2011-2022 走看看