zoukankan      html  css  js  c++  java
  • Flink – window operator

     

    参考,

    http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

    http://wuchong.me/blog/2016/06/06/flink-internals-session-window/ 

     

    WindowOperator

    window operator通过WindowAssigner和Trigger来实现它的逻辑

    当一个element到达时,通过KeySelector先assign一个key,并且通过WindowAssigner assign若干个windows,这样这个element会被放入若干个pane

    一个pane会存放所有相同key和相同window的elements

    /**
     * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
     * {@link Trigger}.
     *
     * <p>
     * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
     * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
     * is put into panes. A pane is the bucket of elements that have the same key and same
     * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
     * {@code WindowAssigner}.
     *
     * <p>
     * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
     * the contents of the pane should be processed to emit results. When a trigger fires,
     * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
     * the pane to which the {@code Trigger} belongs.
     *
     * @param <K> The type of key returned by the {@code KeySelector}.
     * @param <IN> The type of the incoming elements.
     * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
     * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
     */
    @Internal
    public class WindowOperator<K, IN, ACC, OUT, W extends Window>
        extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
    
        // ------------------------------------------------------------------------
        // Configuration values and user functions
        // ------------------------------------------------------------------------
    
        protected final WindowAssigner<? super IN, W> windowAssigner;
    
        protected final KeySelector<IN, K> keySelector;
    
        protected final Trigger<? super IN, ? super W> trigger;
    
        protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    
        /**
         * The allowed lateness for elements. This is used for:
         * <ul>
         *     <li>Deciding if an element should be dropped from a window due to lateness.
         *     <li>Clearing the state of a window if the system time passes the
         *         {@code window.maxTimestamp + allowedLateness} landmark.
         * </ul>
         */
        protected final long allowedLateness; //允许late多久,即当watermark已经触发后
    
    
        /**
         * To keep track of the current watermark so that we can immediately fire if a trigger
         * registers an event time callback for a timestamp that lies in the past.
         */
        protected transient long currentWatermark = Long.MIN_VALUE;
    
        protected transient Context context = new Context(null, null); //Trigger Context
    
        protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //只为获取getCurrentProcessingTime
    
        // ------------------------------------------------------------------------
        // State that needs to be checkpointed
        // ------------------------------------------------------------------------
    
        /**
         * Processing time timers that are currently in-flight.
         */
        protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; //Timer用于存储timestamp,key,window, queue按时间排序
    
        /**
         * Current waiting watermark callbacks.
         */
        protected transient Set<Timer<K, W>> watermarkTimers;
        protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; //
    
        protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey; //用于记录merge后的stateWindow和window的对应关系

     

    对于window operator而已,最关键的是WindowAssigner和Trigger

     

    WindowAssigner

    WindowAssigner,用于指定一个tuple应该被分配到那些windows去

    借用个图,可以看出有多少种WindowAssigner

    image

    对于WindowAssigner,最关键的接口是,assignWindows

    为一个element,分配一组windows, Collection<W>

    @PublicEvolving
    public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = 1L;
    
        /**
         * Returns a {@code Collection} of windows that should be assigned to the element.
         *
         * @param element The element to which windows should be assigned.
         * @param timestamp The timestamp of the element.
         * @param context The {@link WindowAssignerContext} in which the assigner operates.
         */
        public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    
        /**
         * Returns the default trigger associated with this {@code WindowAssigner}.
         */
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    
        /**
         * Returns a {@link TypeSerializer} for serializing windows that are assigned by
         * this {@code WindowAssigner}.
         */
        public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

    实际看下,具体WindowAssigner的实现

    public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            final long now = context.getCurrentProcessingTime();
            long start = now - (now % size);
            return Collections.singletonList(new TimeWindow(start, start + size)); //很简单,分配一个TimeWindow
        }
        
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create(); //默认给出的是ProcessingTimeTrigger,如其名
        }
    public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    
        private final long size;
        private final long slide;
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
                long lastStart = timestamp - timestamp % slide;
                for (long start = lastStart;
                    start > timestamp - size;
                    start -= slide) {
                    windows.add(new TimeWindow(start, start + size)); //可以看到这里会assign多个TimeWindow,因为是slide
                }
                return windows;
            } else {
    
            }
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }

     

    Trigger, Evictor

    参考,Flink – Trigger,Evictor

     

    下面看看3个主要的接口,分别触发,onElement,onEventTime,onProcessingTime

    processElement

    处理element到达的逻辑,触发onElement

    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = windowAssigner.assignWindows(  //通过WindowAssigner为element分配一系列windows
            element.getValue(), element.getTimestamp(), windowAssignerContext);
    
        final K key = (K) getStateBackend().getCurrentKey();
    
        if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow
            //.......
        } else { //如果是普通window
            for (W window: elementWindows) {
    
                // drop if the window is already late
                if (isLate(window)) { //late data的处理,默认是丢弃  
                    continue;
                }
    
                AppendingState<IN, ACC> windowState = getPartitionedState( //从backend中取出该window的状态,就是buffer的element
                    window, windowSerializer, windowStateDescriptor);
                windowState.add(element.getValue()); //把当前的element加入buffer state
    
                context.key = key;
                context.window = window; //context的设计相当tricky和晦涩
    
                TriggerResult triggerResult = context.onElement(element); //触发onElment,得到triggerResult
    
                if (triggerResult.isFire()) { //对triggerResult做各种处理
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    fire(window, contents); //如果fire,真正去计算窗口中的elements
                }
    
                if (triggerResult.isPurge()) {
                    cleanup(window, windowState, null); //purge,即去cleanup elements
                } else {
                    registerCleanupTimer(window);
                }
            }
        }
    }

     

    判断是否是late data的逻辑

    protected boolean isLate(W window) {
        return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
    }
    private long cleanupTime(W window) {
        long cleanupTime = window.maxTimestamp() + allowedLateness; //allowedLateness; 
        return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    }

     

    fire逻辑

    private void fire(W window, ACC contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
        userFunction.apply(context.key, context.window, contents, timestampedCollector);
    }

     

    processWatermark

    处理watermark,onEvent触发

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        boolean fire;
        do {
            Timer<K, W> timer = watermarkTimersQueue.peek(); //这叫watermarkTimersQueue,是否有些歧义,叫eventTimerQueue更好理解些
            if (timer != null && timer.timestamp <= mark.getTimestamp()) {
                fire = true;
    
                watermarkTimers.remove(timer);
                watermarkTimersQueue.remove();
    
                context.key = timer.key;
                context.window = timer.window;
                setKeyContext(timer.key);  //stateBackend.setCurrentKey(key);
    
                AppendingState<IN, ACC> windowState;
                MergingWindowSet<W> mergingWindows = null;
    
                if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindow
                    mergingWindows = getMergingWindowSet();
                    W stateWindow = mergingWindows.getStateWindow(context.window);
                    if (stateWindow == null) {
                        // then the window is already purged and this is a cleanup
                        // timer set due to allowed lateness that has nothing to clean,
                        // so it is safe to just ignore
                        continue;
                    }
                    windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                } else { //普通window
                    windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state
                }
    
                ACC contents = windowState.get();
                if (contents == null) {
                    // if we have no state, there is nothing to do
                    continue;
                }
    
                TriggerResult triggerResult = context.onEventTime(timer.timestamp); //触发onEvent
                if (triggerResult.isFire()) {
                    fire(context.window, contents);
                }
    
                if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
                    cleanup(context.window, windowState, mergingWindows);
                }
    
            } else {
                fire = false;
            }
        } while (fire); //如果fire为true,继续看下个waterMarkTimer是否需要fire
    
        output.emitWatermark(mark); //把waterMark传递下去
    
        this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark
    }

     

    trigger

    首先,这个函数的命名有问题,为何和前面的process…不匹配

    这个是用来触发onProcessingTime,这个需要依赖系统时间的定时器来触发,逻辑和processWatermark基本等同,只是触发条件不一样

    @Override
    public void trigger(long time) throws Exception {
        boolean fire;
    
        //Remove information about the triggering task
        processingTimeTimerFutures.remove(time);
        processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));
    
        do {
            Timer<K, W> timer = processingTimeTimersQueue.peek();
            if (timer != null && timer.timestamp <= time) {
                fire = true;
    
                processingTimeTimers.remove(timer);
                processingTimeTimersQueue.remove();
    
                context.key = timer.key;
                context.window = timer.window;
                setKeyContext(timer.key);
    
                AppendingState<IN, ACC> windowState;
                MergingWindowSet<W> mergingWindows = null;
    
                if (windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindows = getMergingWindowSet();
                    W stateWindow = mergingWindows.getStateWindow(context.window);
                    if (stateWindow == null) {
                        // then the window is already purged and this is a cleanup
                        // timer set due to allowed lateness that has nothing to clean,
                        // so it is safe to just ignore
                        continue;
                    }
                    windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
                } else {
                    windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
                }
    
                ACC contents = windowState.get();
                if (contents == null) {
                    // if we have no state, there is nothing to do
                    continue;
                }
    
                TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
                if (triggerResult.isFire()) {
                    fire(context.window, contents);
                }
    
                if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
                    cleanup(context.window, windowState, mergingWindows);
                }
    
            } else {
                fire = false;
            }
        } while (fire);
    }

     

    EvictingWindowOperator

    Evicting对于WindowOperator而言,就是多了Evictor

    private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    
        // Work around type system restrictions...
        int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //执行evict
    
        FluentIterable<IN> projectedContents = FluentIterable
            .from(contents)
            .skip(toEvict)
            .transform(new Function<StreamRecord<IN>, IN>() {
                @Override
                public IN apply(StreamRecord<IN> input) {
                    return input.getValue();
                }
            });
        userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
    }

    关键的逻辑就是在fire的时候,在apply function之前,会先remove需要evict的elements

  • 相关阅读:
    使用静态全局对象自动做初始化与清理工作
    ThinkpadR617755BH1安装Mac Leopard10.5.2
    ubuntu常用快捷键
    linux常用命令
    c++对象内存模型【内存对齐】
    将ubuntu引导项加入windowsXP启动菜单中
    ISO C++委员会批准C++0x最终草案
    图片转eps格式
    Latex 点滴记录
    我是一个硬盘
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6137608.html
Copyright © 2011-2022 走看看