zoukankan      html  css  js  c++  java
  • Flink – CEP NFA

    看看Flink cep如何将pattern转换为NFA?

    当来了一条event,如果在NFA中执行的?

    前面的链路,CEP –> PatternStream –> select –> CEPOperatorUtils.createPatternStream

    1. 产生NFACompiler.compileFactory,完成pattern到state的转换

    final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);
                final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
                nfaFactoryCompiler.compileFactory();
                return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
    调用,nfaFactoryCompiler.compileFactory
            void compileFactory() {
                // we're traversing the pattern from the end to the beginning --> the first state is the final state
                State<T> sinkState = createEndingState();
                // add all the normal states
                sinkState = createMiddleStates(sinkState);
                // add the beginning state
                createStartState(sinkState);
            }
    可以看到做的工作,主要是生成state,即把pattern转换为NFA中的state和stateTransition
    因为加pattern的是不断往后加,通过private final Pattern<T, ? extends T> previous来指向前面的pattern,所以在遍历pattern的时候只能回溯
    先创建最后的final state
            private State<T> createEndingState() {
                State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final);
                windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
                return endState;
            }
    很简单,就单纯的创建state
            private State<T> createState(String name, State.StateType stateType) {
                String stateName = getUniqueInternalStateName(name);
                usedNames.add(stateName);
                State<T> state = new State<>(stateName, stateType);
                states.add(state);
                return state;
            }
     
    继续加middle的state,
            private State<T> createMiddleStates(final State<T> sinkState) {
                State<T> lastSink = sinkState; //记录上一个state
                while (currentPattern.getPrevious() != null) {
    
                    checkPatternNameUniqueness(currentPattern.getName());
                    lastSink = convertPattern(lastSink); //convert pattern到state
    
                    // we traverse the pattern graph backwards
                    followingPattern = currentPattern;
                    currentPattern = currentPattern.getPrevious(); //往前回溯
    
                    final Time currentWindowTime = currentPattern.getWindowTime();
                    if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
                        // the window time is the global minimum of all window times of each state
                        windowTime = currentWindowTime.toMilliseconds();
                    }
                }
                return lastSink;
            }
    调用convertPattern,
            private State<T> convertPattern(final State<T> sinkState) {
                final State<T> lastSink;
    
                lastSink = createSingletonState(sinkState); //只看singleton state
                addStopStates(lastSink);
    
                return lastSink;
            }
     
    createSingletonState
            private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
                final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition(); //从pattern里面取出condition
                final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    
                final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal); //对currentPattern创建singletonState
                // if event is accepted then all notPatterns previous to the optional states are no longer valid
                singletonState.addTake(sink, currentCondition); //设置take StateTransition
    
                if (isOptional) {
                    // if no element accepted the previous nots are still valid.
                    singletonState.addProceed(sinkState, trueFunction); //如果有Optional,设置Proceed StateTransition
                }
    
                return singletonState;
            }
    addTake
    addStateTransition
        public void addStateTransition(
                final StateTransitionAction action,
                final State<T> targetState,
                final IterativeCondition<T> condition) {
            stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
        }
     
    createStartState
            private State<T> createStartState(State<T> sinkState) {
                checkPatternNameUniqueness(currentPattern.getName());
                final State<T> beginningState = convertPattern(sinkState);
                beginningState.makeStart();
                return beginningState;
            }
     
     
    2. 当event coming,如何处理?
    AbstractKeyedCEPPatternOperator.processElement
                NFA<IN> nfa = getNFA();
                processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
                updateNFA(nfa);
     
    如果statebackend里面有就取出来,否则nfaFactory.createNFA
        private NFA<IN> getNFA() throws IOException {
            NFA<IN> nfa = nfaOperatorState.value();
            return nfa != null ? nfa : nfaFactory.createNFA();
        }
     
    createNFA
        NFA<T> result =  new NFA<>(inputTypeSerializer.duplicate(), windowTime, timeoutHandling);
        result.addStates(states);
     
    addState
        public void addStates(final Collection<State<T>> newStates) {
            for (State<T> state: newStates) {
                addState(state);
            }
        }
    
        public void addState(final State<T> state) {
            states.add(state);
    
            if (state.isStart()) {
                computationStates.add(ComputationState.createStartState(this, state));
            }
        }
    把states加入到NFA,
    start state会加入computationStates,因为pattern的识别总是从start开始
     
    KeyedCEPPatternOperator – > processEvent
        protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
            Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
                nfa.process(event, timestamp);
    
            emitMatchedSequences(patterns.f0, timestamp);
        }
     
    NFA –> process
        public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(final T event, final long timestamp) {
            final int numberComputationStates = computationStates.size();
            final Collection<Map<String, List<T>>> result = new ArrayList<>();
            final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
    
            // iterate over all current computations
            for (int i = 0; i < numberComputationStates; i++) { //遍历所有的当前state
                ComputationState<T> computationState = computationStates.poll(); //poll一个state
    
                final Collection<ComputationState<T>> newComputationStates;
    
                newComputationStates = computeNextStates(computationState, event, timestamp); //通过NFA计算下一批的state
        
                //delay adding new computation states in case a stop state is reached and we discard the path.
                final Collection<ComputationState<T>> statesToRetain = new ArrayList<>(); //newComputationStates中有可能是stop state,所以不一定会放到statesToRetain
                //if stop state reached in this path
                boolean shouldDiscardPath = false;
                for (final ComputationState<T> newComputationState: newComputationStates) {
                    if (newComputationState.isFinalState()) { //如果是final state,说明完成匹配
                        // we've reached a final state and can thus retrieve the matching event sequence
                        Map<String, List<T>> matchedPattern = extractCurrentMatches(newComputationState);
                        result.add(matchedPattern);
    
                        // remove found patterns because they are no longer needed
                        eventSharedBuffer.release(
                                newComputationState.getPreviousState().getName(),
                                newComputationState.getEvent(),
                                newComputationState.getTimestamp(),
                                computationState.getCounter());
                    } else if (newComputationState.isStopState()) { //如果是stop state,那么删除该path
                        //reached stop state. release entry for the stop state
                        shouldDiscardPath = true;
                        eventSharedBuffer.release(
                            newComputationState.getPreviousState().getName(),
                            newComputationState.getEvent(),
                            newComputationState.getTimestamp(),
                            computationState.getCounter());
                    } else { //中间状态,放入statesToRetain
                        // add new computation state; it will be processed once the next event arrives
                        statesToRetain.add(newComputationState);
                    }
                }
    
                if (shouldDiscardPath) { //释放discardPath
                    // a stop state was reached in this branch. release branch which results in removing previous event from
                    // the buffer
                    for (final ComputationState<T> state : statesToRetain) {
                        eventSharedBuffer.release(
                            state.getPreviousState().getName(),
                            state.getEvent(),
                            state.getTimestamp(),
                            state.getCounter());
                    }
                } else { //将中间state加入computationStates
                    computationStates.addAll(statesToRetain);
                }
    
            }
    
            // prune shared buffer based on window length
            if (windowTime > 0L) { //prune超时过期的pattern
                long pruningTimestamp = timestamp - windowTime;
    
                if (pruningTimestamp < timestamp) {
                    // the check is to guard against underflows
    
                    // remove all elements which are expired
                    // with respect to the window length
                    eventSharedBuffer.prune(pruningTimestamp);
                }
            }
    
            return Tuple2.of(result, timeoutResult);
        }
     
    computeNextStates
     
        private Collection<ComputationState<T>> computeNextStates(
                final ComputationState<T> computationState,
                final T event,
                final long timestamp) {
    
            final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event); //找出state的所有出边
    
             final List<StateTransition<T>> edges = outgoingEdges.getEdges();
    
            final List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
            for (StateTransition<T> edge : edges) {
                switch (edge.getAction()) {
                    case IGNORE: {
                        if (!computationState.isStartState()) {
                            final DeweyNumber version;
                            if (isEquivalentState(edge.getTargetState(), computationState.getState())) {
                                //Stay in the same state (it can be either looping one or singleton)
                                final int toIncrease = calculateIncreasingSelfState(
                                    outgoingEdges.getTotalIgnoreBranches(),
                                    outgoingEdges.getTotalTakeBranches());
                                version = computationState.getVersion().increase(toIncrease);
                            } else {
                                //IGNORE after PROCEED
                                version = computationState.getVersion()
                                    .increase(totalTakeToSkip + ignoreBranchesToVisit)
                                    .addStage();
                                ignoreBranchesToVisit--;
                            }
    
                            addComputationState( //对于ignore state,本身不用take,把target state加到computation state中
                                    resultingComputationStates,
                                    edge.getTargetState(),
                                    computationState.getPreviousState(),
                                    computationState.getEvent(),
                                    computationState.getCounter(),
                                    computationState.getTimestamp(),
                                    version,
                                    computationState.getStartTimestamp()
                            );
                        }
                    }
                    break;
                    case TAKE:
                        final State<T> nextState = edge.getTargetState();
                        final State<T> currentState = edge.getSourceState();
                        final State<T> previousState = computationState.getPreviousState();
    
                        final T previousEvent = computationState.getEvent();
    
                        final int counter;
                        final long startTimestamp;
                        //对于take,需要把当前state记录到path里面,即放到eventSharedBuffer
                        if (computationState.isStartState()) {
                            startTimestamp = timestamp;
                            counter = eventSharedBuffer.put(
                                currentState.getName(),
                                event,
                                timestamp,
                                currentVersion);
                        } else {
                            startTimestamp = computationState.getStartTimestamp();
                            counter = eventSharedBuffer.put(
                                currentState.getName(),
                                event,
                                timestamp,
                                previousState.getName(),
                                previousEvent,
                                computationState.getTimestamp(),
                                computationState.getCounter(),
                                currentVersion);
                        }
    
                        addComputationState(
                                resultingComputationStates,
                                nextState,
                                currentState,
                                event,
                                counter,
                                timestamp,
                                nextVersion,
                                startTimestamp);
    
                        //check if newly created state is optional (have a PROCEED path to Final state)
                        final State<T> finalState = findFinalStateAfterProceed(nextState, event, computationState);
                        if (finalState != null) {
                            addComputationState(
                                    resultingComputationStates,
                                    finalState,
                                    currentState,
                                    event,
                                    counter,
                                    timestamp,
                                    nextVersion,
                                    startTimestamp);
                        }
                        break;
                }
            }
    
            return resultingComputationStates;
        }
     
    private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
            final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
    
            final Stack<State<T>> states = new Stack<>();
            states.push(computationState.getState());
    
            //First create all outgoing edges, so to be able to reason about the Dewey version
            while (!states.isEmpty()) {
                State<T> currentState = states.pop();
                Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions(); //取出state所有的stateTransitions
    
                // check all state transitions for each state
                for (StateTransition<T> stateTransition : stateTransitions) {
                    try {
                        if (checkFilterCondition(computationState, stateTransition.getCondition(), event)) {
                            // filter condition is true
                            switch (stateTransition.getAction()) {
                                case PROCEED:  //如果是proceed,直接跳到下个state
                                    // simply advance the computation state, but apply the current event to it
                                    // PROCEED is equivalent to an epsilon transition
                                    states.push(stateTransition.getTargetState());
                                    break;
                                case IGNORE:
                                case TAKE: //default,把stateTransition加入边
                                    outgoingEdges.add(stateTransition);
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("Failure happened in filter function.", e);
                    }
                }
            }
            return outgoingEdges;
        }
     
     
     
     
     
     
     
     
  • 相关阅读:
    201621123028《Java程序设计》第二周学习总结
    tensorflow——乘法
    最近做的事儿
    BlurUse Compute Shader
    又翻出来老电视剧看了看....
    exercise: toon shader
    semantic SV_
    SimpleDateFormat使用详解
    各种数字类型转换成字符串型:
    android 代码混淆配置
  • 原文地址:https://www.cnblogs.com/fxjwind/p/7597279.html
Copyright © 2011-2022 走看看