zoukankan      html  css  js  c++  java
  • FlinkCEP

    https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html

    首先目的是匹配pattern sequence

    pattern Sequence是由多个pattern构成

    DataStream<Event> input = ...
    
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) {
                    return event.getId() == 42;
                }
            }
        ).next("middle").subtype(SubEvent.class).where(
            new SimpleCondition<Event>() {
                @Override
                public boolean filter(SubEvent subEvent) {
                    return subEvent.getVolume() >= 10.0;
                }
            }
        ).followedBy("end").where(
             new SimpleCondition<Event>() {
                @Override
                public boolean filter(Event event) {
                    return event.getName().equals("end");
                }
             }
        );
    
    PatternStream<Event> patternStream = CEP.pattern(input, pattern);
    
    DataStream<Alert> result = patternStream.select(
        new PatternSelectFunction<Event, Alert> {
            @Override
            public Alert select(Map<String, List<Event>> pattern) throws Exception {
                return createAlertFrom(pattern);
            }
        }
    });

    如例子中,这个pattern Sequence由3个pattern组成,begin,next,followedBy

    pattern Sequence的第一个pattern都是begin

    每个pattern都需要有一个唯一的名字,比如这里的start,middle,end

    每个pattern也可以设置condition,比如where

    Pattern

    Pattern可以分为两种,Individual PatternsComplex Patterns.

    对于individual patterns,又可以分为singleton pattern, or a looping one

    通俗点,singleton pattern指出现一次,而looping指可能出现多次,在有限自动机里面匹配相同的pattern就形成looping

    比如,对于a b+ c? d

    b+就是looping,而其他的都是singleton

    对于singleton pattern可以加上Quantifiers,就变成looping

    // expecting 4 occurrences
     start.times(4);
      
     // expecting 0 or 4 occurrences
     start.times(4).optional();
     
     // expecting 1 or more occurrences
     start.oneOrMore();
       
     // expecting 0 or more occurrences
     start.oneOrMore().optional();

    同一个pattenr的多次匹配可以定义Contiguity

    illustrate the above with an example, a pattern sequence "a+ b" (one or more "a"’s followed by a "b") with input "a1", "c", "a2", "b" will have the following results:

    1. Strict Contiguity: {a2 b} – the "c" after "a1" causes "a1" to be discarded.

    2. Relaxed Contiguity: {a1 b} and {a1 a2 b}c is simply ignored.

    3. Non-Deterministic Relaxed Contiguity: {a1 b}, {a2 b}, and {a1 a2 b}.

    oneOrMore() and times()) the default is relaxed contiguity. If you want strict contiguity, you have to explicitly specify it by using the consecutive() call, and if you want non-deterministic relaxed contiguity you can use the allowCombinations() call

    consecutive() 的使用例子,

    Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
      @Override
      public boolean filter(Event value) throws Exception {
        return value.getName().equals("c");
      }
    })
    .followedBy("middle").where(new SimpleCondition<Event>() {
      @Override
      public boolean filter(Event value) throws Exception {
        return value.getName().equals("a");
      }
    }).oneOrMore().consecutive()
    .followedBy("end1").where(new SimpleCondition<Event>() {
      @Override
      public boolean filter(Event value) throws Exception {
        return value.getName().equals("b");
      }
    });

    Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B

    with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

    without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

    这是针对单个pattern的Contiguity,后面还可以定义pattern之间的Contiguity

    当然对于Pattern,很关键的是Conditions

    就是条件,怎么样算匹配上?

    Conditions 分为好几种,

    Simple Conditions

    start.where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) {
            return value.getName().startsWith("foo");
        }
    });

    很容易理解,单纯的根据当前Event来判断

    Iterative Conditions

    This is how you can specify a condition that accepts subsequent events based on properties of the previously accepted events or some statistic over a subset of them.

    即当判断这个条件是否满足时,需要参考之前已经匹配过的pattern,所以称为iterative

    Below is the code for an iterative condition that accepts the next event for a pattern named “middle” if its name starts with “foo”, and if the sum of the prices of the previously accepted events for that pattern plus the price of the current event do not exceed the value of 5.0. Iterative conditions can be very powerful, especially in combination with looping patterns, e.g. oneOrMore().

    middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
        @Override
        public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
            if (!value.getName().startsWith("foo")) {
                return false;
            }
            
            double sum = value.getPrice();
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.getPrice();
            }
            return Double.compare(sum, 5.0) < 0;
        }
    });

    首先这是个oneOrMore,可以匹配一个或多个,但匹配每一个时,除了判断是否以“foo”开头外

    还要判断和之前匹配的event的price的求和小于5

    这里主要用到ctx.getEventsForPattern,取出某个名字的pattern当前的所有的匹配

    Combining Conditions

    pattern.where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) {
            return ... // some condition
        }
    }).or(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) {
            return ... // or condition
        }
    });

    可以有多个条件,where表示“and”语义,而or表示“or” 语义

    Pattern Sequence

    sequence是有多个pattern组成,那么多个pattern之间是什么关系?

    A pattern sequence has to start with an initial pattern, as shown below:

    Pattern<Event, ?> start = Pattern.<Event>begin("start");

    每个sequence都必须要有个开始,begin

    Next, you can append more patterns to your pattern sequence by specifying the desired contiguity conditions between them.

    1. next(), for strict,
    2. followedBy(), for relaxed, and
    3. followedByAny(), for non-deterministic relaxed contiguity.

    or

    1. notNext(), if you do not want an event type to directly follow another
    2. notFollowedBy(), if you do not want an event type to be anywhere between two other event types

    在begin开始后, 可以加上各种pattern,pattern之间的Contiguity关系有上面几种

    例子,

    As an example, a pattern a b, given the event sequence"a", "c", "b1", "b2", will give the following results:

    1. Strict Contiguity between a and b: {} (no match) – the "c" after "a" causes "a" to be discarded.

    2. Relaxed Contiguity between a and b: {a b1} – as relaxed continuity is viewed as “skip non-matching events till the next matching one”.

    3. Non-Deterministic Relaxed Contiguity between a and b: {a b1}, {a b2} – as this is the most general form.

    temporal constraint

    一个sequence还可以指定时间限制,supported for both processing and event time

    next.within(Time.seconds(10));

    Detecting Patterns

    当定义好pattern sequence后,我们需要真正的去detect,

    DataStream<Event> input = ...
    Pattern<Event, ?> pattern = ...
    
    PatternStream<Event> patternStream = CEP.pattern(input, pattern);

    生成PatternStream

    The input stream can be keyed or non-keyed depending on your use-case

    Applying your pattern on a non-keyed stream will result in a job with parallelism equal to 1

    如果non-keyed stream,并发只能是1

    如果是keyed stream,不同的key可以单独的detect pattern,所以可以并发

    Once you have obtained a PatternStream you can select from detected event sequences via the select or flatSelect methods.

    对于PatternStream,可以用

    PatternSelectFunction
    PatternFlatSelectFunction
    class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
        @Override
        public OUT select(Map<String, List<IN>> pattern) {
            IN startEvent = pattern.get("start").get(0);
            IN endEvent = pattern.get("end").get(0);
            return new OUT(startEvent, endEvent);
        }
    }

    对于PatternSelectFunction需要实现select接口,

    参数是Map<String, List<IN>> pattern,这是一个匹配成功的pattern sequence,key是pattern名,后面是list是因为对于looping可能有多个匹配值

    而对于PatternFlatSelectFunction,只是在接口上多了Collector,这样可以输出多个值

    class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
        @Override
        public void select(Map<String, List<IN>> pattern, Collector<OUT> collector) {
            IN startEvent = pattern.get("start").get(0);
            IN endEvent = pattern.get("end").get(0);
    
            for (int i = 0; i < startEvent.getValue(); i++ ) {
                collector.collect(new OUT(startEvent, endEvent));
            }
        }
    }

    源码

    首先是定义pattern,虽然pattern定义比较复杂,但是实现比较简单

    最终,

    org.apache.flink.cep.nfa.compiler.NFACompiler

    会将pattern sequence转化为 NFA,非确定有限状态机,sequence匹配的大部分逻辑都是通过NFA来实现的,就不详细描写了

    最终调用到,patternStream.select产生结果流

        public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
            SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
                    CEPOperatorUtils.createPatternStream(inputStream, pattern);
    
            return patternStream.map(
                new PatternSelectMapper<>(
                    patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
                .returns(outTypeInfo);
        }

    CEPOperatorUtils.createPatternStream

            if (inputStream instanceof KeyedStream) {
                // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
                KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;
    
                TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
    
                patternStream = keyedStream.transform(
                    "KeyedCEPPatternOperator",
                    (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
                    new KeyedCEPPatternOperator<>(
                        inputSerializer,
                        isProcessingTime,
                        keySerializer,
                        nfaFactory,
                        true));
            } else {
    
                KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
                TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
    
                patternStream = inputStream.keyBy(keySelector).transform(
                    "CEPPatternOperator",
                    (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
                    new KeyedCEPPatternOperator<>(
                        inputSerializer,
                        isProcessingTime,
                        keySerializer,
                        nfaFactory,
                        false
                    )).forceNonParallel();
            }

    关键就是,生成KeyedCEPPatternOperator

    public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, List<IN>>>

    AbstractKeyedCEPPatternOperator

    最关键的就是当一个StreamRecord过来时,我们怎么处理他

        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            if (isProcessingTime) {
                // there can be no out of order elements in processing time
                NFA<IN> nfa = getNFA();
                processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
                updateNFA(nfa);
    
            } else {
    
                long timestamp = element.getTimestamp();
                IN value = element.getValue();
    
                // In event-time processing we assume correctness of the watermark.
                // Events with timestamp smaller than the last seen watermark are considered late.
                // Late events are put in a dedicated side output, if the user has specified one.
    
                if (timestamp >= lastWatermark) { //只处理非late record
    
                    // we have an event with a valid timestamp, so
                    // we buffer it until we receive the proper watermark.
    
                    saveRegisterWatermarkTimer();
    
                    List<IN> elementsForTimestamp =  elementQueueState.get(timestamp);
                    if (elementsForTimestamp == null) {
                        elementsForTimestamp = new ArrayList<>();
                    }
    
                    if (getExecutionConfig().isObjectReuseEnabled()) {
                        // copy the StreamRecord so that it cannot be changed
                        elementsForTimestamp.add(inputSerializer.copy(value));
                    } else {
                        elementsForTimestamp.add(element.getValue());
                    }
                    elementQueueState.put(timestamp, elementsForTimestamp);
                }
            }
        }

    可以看到,如果是isProcessingTime,非常简单,直接丢给NFA处理就好

    但如果是eventTime,问题就复杂了,因为要解决乱序问题,不能直接交给NFA处理

    需要做cache,所以看看elementQueueState

        private transient MapState<Long, List<IN>> elementQueueState;
        elementQueueState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>(
                    EVENT_QUEUE_STATE_NAME,
                    LongSerializer.INSTANCE,
                    new ListSerializer<>(inputSerializer)
            )
        );

    elementQueueState,记录所有时间点上的record list

    onEventTime中会触发对elementQueueState上数据的处理,

        @Override
        public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
    
            // 1) get the queue of pending elements for the key and the corresponding NFA,
            // 2) process the pending elements in event time order by feeding them in the NFA
            // 3) advance the time to the current watermark, so that expired patterns are discarded.
            // 4) update the stored state for the key, by only storing the new NFA and priority queue iff they
            //        have state to be used later.
            // 5) update the last seen watermark.
    
            // STEP 1
            PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(); // 把elementQueueState的key按时间排序,PriorityQueue就是堆排序
            NFA<IN> nfa = getNFA();
    
            // STEP 2
            while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) { // peek从小的时间取起,如果小于currentWatermark,就触发
                long timestamp = sortedTimestamps.poll();
                for (IN element: elementQueueState.get(timestamp)) { // 把该时间对应的record list拿出来处理
                    processEvent(nfa, element, timestamp);
                }
                elementQueueState.remove(timestamp);
            }
    
            // STEP 3
            advanceTime(nfa, timerService.currentWatermark());
    
            // STEP 4
            if (sortedTimestamps.isEmpty()) {
                elementQueueState.clear();
            }
            updateNFA(nfa);
    
            if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
                saveRegisterWatermarkTimer();
            }
    
            // STEP 5
            updateLastSeenWatermark(timerService.currentWatermark()); // 更新lastWatermark
        }

    onEventTime在何时被调用,

    AbstractStreamOperator中有个

    InternalTimeServiceManager timeServiceManager
    来管理所有的time service
     
    在AbstractKeyedCEPPatternOperator中open的时候会,会创建这个time service,并把AbstractKeyedCEPPatternOperator作为triggerTarget传入
        timerService = getInternalTimerService(
                    "watermark-callbacks",
                    VoidNamespaceSerializer.INSTANCE,
                    this);

    在processElement会调用

    saveRegisterWatermarkTimer();
            long currentWatermark = timerService.currentWatermark();
            // protect against overflow
            if (currentWatermark + 1 > currentWatermark) {
                timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
            }

    这个逻辑看起来非常tricky,其实就是往timeService你们注册currentWatermark + 1的timer

    AbstractStreamOperator中,当收到watermark的时候,

    public void processWatermark(Watermark mark) throws Exception {
            if (timeServiceManager != null) {
                timeServiceManager.advanceWatermark(mark);
            }
            output.emitWatermark(mark);
        }

    timeServiceManager.advanceWatermark其实就是调用其中每一个time service的advanceWatermark

    当前time service的实现,只有HeapInternalTimerService

    HeapInternalTimerService.advanceWatermark

        public void advanceWatermark(long time) throws Exception {
            currentWatermark = time;  // 更新currentWatermark 
    
            InternalTimer<K, N> timer;
    
            while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { // 从eventTimeTimersQueue取出一个timer,判断如果小于当前的watermark,记得我们注册过一个上个watermark+1的timer
    
                Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
                timerSet.remove(timer);
                eventTimeTimersQueue.remove();
    
                keyContext.setCurrentKey(timer.getKey());
                triggerTarget.onEventTime(timer); // 调用到onEventTime
            }
        }

    这里还有个需要注意的点,对于KeyedStream,怎么保证不同key独立detect pattern sequence?

    对于keyed state,elementQueueState,本身就是按key独立的,所以天然就支持

  • 相关阅读:
    链堆栈的实现
    关于HyperLink的NavigateUrl属性的链接地址参数设置
    //yield return用于无缝实现迭代模式。
    NUnit的使用
    非常不错的数据访问架构
    Dictionary应用
    针对数据分析没态度的几句牢骚
    微软算法面试题(4)
    程序员面试题精选100题(60)判断二叉树是不是平衡的
    C++设计模式单件
  • 原文地址:https://www.cnblogs.com/fxjwind/p/7307535.html
Copyright © 2011-2022 走看看