zoukankan      html  css  js  c++  java
  • 流计算技术实战

    CEP,Complex event processing

    Wiki定义

    “Complex event processing, or CEP, is event processing that combines data from multiple sources[2] to infer events or patterns that suggest more complicated circumstances. The goal of complex event processing is to identify meaningful events (such as opportunities or threats)[3] and respond to them as quickly as possible.”

    通过上面的Wiki定义,可以看出CEP的特点主要是,
    复杂性:多个流join,窗口聚合,事件序列或patterns检测
    低延迟:秒或毫秒级别,比如做信用卡盗刷检测,或攻击检测
    高吞吐:每秒上万条消息

    CEP和数据库

    CEP的概念出现比较早,用于解决传统数据库所无法解决的实时需求
    传统数据库,数据是静态的,查询是动态的,但做不到实时和连续的输出查询结果
    而CEP反其道而行之,查询是静态的,数据是动态的,这样就可以满足实现和连续查询的需求,但是无法满足ad hoc查询需求
    所以CEP和传统数据库相结合,可以用于解决金融,商业,网络监控等领域的问题
    比如比较知名的Esper,功能非常强大,并提供EPL这样类sql语言,让用户感觉到类似使用数据库的体验

    流计算下的CEP

    流式计算概念可以认为是从Storm或Yahoo S4那个时候开始被大家广泛接受的
    流式计算概念的出现,主要针对当时主流的像Hadoop这样的MapReduce系统在实时性上的缺陷;时势造英雄,加上像Twitter这样普及的实时应用,让大家认识到数据实时性的重要性,从此实时大数据的时代渐渐来临

    CEP和流式计算是在不同的时代背景下产生的,而由于他们所要解决问题域的重合,注定了在技术上会产生融合;
    在Storm的年代,Storm主要是封装和提供一种类似MapReduce的编程模型,所以当时流式计算业务主要还是ETL和简单聚合;
    为了满足CEP需求,可以将Esper引擎跑在Storm上,但是Esper虽然功能很强大,但是实在太重而且比较低效

    后续出现轻量级的CEP引擎,如Siddhi,
    但我们最终也没有规模使用,最主要的原因是,它没有考虑event time和数据乱序的问题,比较难于用于实际的线上场景

    在Dataflow论文出来前,确实没有任何计算平台,在平台层面对event time和数据乱序提出系统的方案,Flink实现了Dataflow中的窗口模型,在平台层面解决了event time和数据乱序的问题
    并且Flink提供了专门的CEP的lib,FlinkCEP - Complex event processing for Flink
    当然这个CEP lib是会考虑并解决event time和数据乱序问题的
    下面我们先来看看Flink CEP是怎么使用的

    Flink CEP

    Example

    我们先产生一个输入流,这个输入Event流由Event对象和event time组成
    那么要使用EventTime,除了指定TimeCharacteristic外,在Flink中还要assignTimestampsAndWatermarks,其中分别定义了Eventtime和WaterMark,

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // (Event, timestamp)
    DataStream<Event> input = env.fromElements(
        Tuple2.of(new Event(1, "start", 1.0), 5L),
        Tuple2.of(new Event(2, "middle", 2.0), 1L),
        Tuple2.of(new Event(3, "end", 3.0), 3L),
        Tuple2.of(new Event(4, "end", 4.0), 10L), //触发2,3,1
        Tuple2.of(new Event(5, "middle", 5.0), 7L),
        // last element for high final watermark
        Tuple2.of(new Event(5, "middle", 5.0), 100L) //触发5,4
    ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {

        @Override
        public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
            return element.f1; //定义Eventtime
        }

        @Override
        public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
            return new Watermark(lastElement.f1 - 5); //定义watermark
        }

    }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
        @Override
        public Event map(Tuple2<Event, Long> value) throws Exception {
            return value.f0;
        }
    });
    接着我们定义需要匹配的pattern,需求就是找出包含”start“, ”middle“, ”end“的一组事件
    具体语法参考Flink文档,这里不详述

    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {

        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("start");
        }
    }).followedByAny("middle").where(new SimpleCondition<Event>() {

        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("middle");
        }
    }).followedByAny("end").where(new SimpleCondition<Event>() {

        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("end");
        }
    });
    最终在输入流上执行CEP,
    这里实现PatternSelectFunction来处理匹配到的pattern,处理逻辑是打印出匹配到的3个Event对象的id

    DataStream<String> result = CEP.pattern(input, pattern).select(
        new PatternSelectFunction<Event, String>() {

            @Override
            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                System.out.println(pattern);
                builder.append(pattern.get("start").get(0).getId()).append(",")
                    .append(pattern.get("middle").get(0).getId()).append(",")
                    .append(pattern.get("end").get(0).getId());

                return builder.toString();
            }
        }
    );

    result.print();
    大家想想,这里匹配到的是哪些Event?
    从上面Event的顺序看应该是 1,2,3

    但结果是 1,5,4,因为这里考虑的是Eventtime的顺序,这个特性在生产环境中很关键,因为我们无法保证采集数据达到的顺序。

    Implementation

    对于EventTime部分的实现,可以看下AbstractKeyedCEPPatternOperator中的实现,

        public void processElement(StreamRecord<IN> element) throws Exception {
            if (isProcessingTime) {
                // there can be no out of order elements in processing time
                NFA<IN> nfa = getNFA();
                processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
                updateNFA(nfa);

            } else { //EventTime
                long timestamp = element.getTimestamp();
                IN value = element.getValue();

                if (timestamp >= lastWatermark) { //只处理非late record

                    // we have an event with a valid timestamp, so
                    // we buffer it until we receive the proper watermark.

                    saveRegisterWatermarkTimer();

                    List<IN> elementsForTimestamp =  elementQueueState.get(timestamp);
                    elementsForTimestamp.add(element.getValue());
                    elementQueueState.put(timestamp, elementsForTimestamp); //放到队列中
                }
            }
        }
    如果是EventTime,不会直接processEvent并更新NFA,而是先放到一个队列elementQueueState里面。
    等后面收到watermark触发onEventTime时,
    会把队列里面的数据按时间排序,从小到大,并把大于watermark的拿出来挨个处理,这样就实现了按EventTime有序,解决了乱序问题。

    Improvement

    应用中实际使用Flink CEP时,发现有些不方便的地方:

    首先,patterns需要用java代码写,需要编译,很冗长很麻烦,没法动态配置;需要可配置,或提供一种DSL
    再者,对于一个流同时只能设置一个pattern,比如对于不同的用户实例想配置不同的pattern,就没法支持;需要支持按key设置pattern

    DSL

    对于第一个问题,我刚开始考虑开发一套DSL,这样成本比较高,而且社区也在考虑支持SQL
    所以我就先基于JSON简单实现了一个,如下

    image

    这个基本可以满足当前Flink CEP的常用语法,扩展也比简单
    通过一个JSONArray来表示一个pattern sequence,每个pattern中可以定义多个并,或条件
    每个条件由三部分组成,比如,["sql", "contains", "delete"], "sql"是字段名,”contains“是Op,”delete“是value, 意思就是找出sql字段中包含delete的log

    现在就不需要用java来写pattern了,直接传入配置就ok,如下,

    JSONArray jsonArray = JSON.parseArray("pattern配置");

    CepBuilder<Log> cepBuilder = new CepBuilder<Log>();
    Pattern<Log, ?>  pattern = cepBuilder.patternSequenceBuilder(jsonArray);
    这里我实现一个CepBuilder可以把JSON配置直接转换成Pattern对象

    按Key配置多patterns

    为了满足为不同的用户配置不同的pattern的需求,我修改了下Flink CEP提供的接口,
    原先Flink CEP,是这样定义CEP的,
    PatternStream = CEP.pattern(input, pattern)
    可以看到对一个input只能定义一个pattern,

    所以我定义GroupPatternStream,可以传入一组patterns

    public class GroupPatternStream<K, T> {

        // underlying data stream
        private final DataStream<T> inputStream;

        private final  Map<K, Pattern<T, ?>> patterns;

        GroupPatternStream(final DataStream<T> inputStream, final Map<K, Pattern<T, ?>> patterns) {
            this.inputStream = inputStream;
            this.patterns = patterns;
        }
    然后在createPatternStream逻辑中,把每个pattern compile成相应的NFAFactory,最终将nfaFactoryMap作为参数创建KeyedCEPGroupPatternOperator

    public SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Map<K, Pattern<T, ?>> patterns) {
        final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
        Map<K,  NFACompiler.NFAFactory<T>> nfaFactoryMap = new HashMap<>();

        if(patterns != null){
            for(K key: patterns.keySet()){
                Pattern<T, ?> pattern = patterns.get(key);
                nfaFactoryMap.put(key, NFACompiler.compileFactory(pattern, inputSerializer, false));
            }
        }

        if (inputStream instanceof KeyedStream) {
            patternStream = keyedStream.transform(
                "KeyedCEPPatternOperator",
                (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
                new KeyedCEPGroupPatternOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    keySerializer,
                    nfaFactory,
                    nfaFactoryMap,
                    true));
        } else {
            //not-support non-keyed stream
            patternStream = null;
        }

        return patternStream;
    }
    KeyedCEPGroupPatternOperator,也是我新建的,和原来的KeyedCEPPatternOperator比多了个参数nfaFactoryMap,并且重写了getNFA函数

    public class KeyedCEPGroupPatternOperator<IN, KEY> extends KeyedCEPPatternOperator {

        Map<KEY,  NFACompiler.NFAFactory<IN>> nfaFactoryMap;

        public KeyedCEPGroupPatternOperator(   TypeSerializer<IN> inputSerializer,
            boolean isProcessingTime,
            TypeSerializer<KEY> keySerializer,
            NFACompiler.NFAFactory<IN> nfaFactory,
            Map<KEY,  NFACompiler.NFAFactory<IN>> nfaFactoryMap,
            boolean migratingFromOldKeyedOperator){
            super(inputSerializer, isProcessingTime, keySerializer, nfaFactory,
                migratingFromOldKeyedOperator);

            this.nfaFactoryMap = nfaFactoryMap;
        }

        @Override
        public NFA<IN> getNFA() throws IOException {
            NFA<IN> nfa = (NFA<IN>) nfaOperatorState.value();
            if(nfa == null) {
                Object key = getCurrentKey();
                NFACompiler.NFAFactory<IN> factory =  nfaFactoryMap.get(key);
                if(factory != null){
                    nfa = factory.createNFA();
                }

                //if the key didn't define pattern, add EmptyNFA
                if(nfa == null){
                    nfa = new EmptyNFA<>();
                }
            }
            return nfa;
        }

    }
    核心逻辑就在getNFA, 主要就是通过修改这个逻辑来满足需求
    在KeyedCEPPatternOperator中,他每次都会生成同样的NFA

    public NFA<IN> getNFA() throws IOException {
        NFA<IN> nfa = nfaOperatorState.value();
        return nfa != null ? nfa : nfaFactory.createNFA();
    }
    而在我的逻辑里面,
    会先取出当前上下文的key,
    并根据不同的key,创建不同的NFA,这样就可以实现对不同的key使用不同的pattern进行匹配。这些NFA状态机是作为key的state存在stateBackend里面的,所以每次有相应的key的record流过时,都可以从stateBackend中取到。

    然后我们就可以这样用,
    先准备测试数据,

    Log log = new Log();
    log.putItem("id", "1");
    log.putItem("sql", "start counting!");
    logs.add(log);

    log = new Log();
    log.putItem("id", "2");
    log.putItem("sql", "start counting!");
    logs.add(log);

    log = new Log();
    log.putItem("id", "1");
    log.putItem("sql", "end counting");
    logs.add(log);

    log = new Log();
    log.putItem("id", "2");
    log.putItem("sql", "select from 1");
    logs.add(log);

    log = new Log();
    log.putItem("id", "2");
    log.putItem("sql", "end counting");
    logs.add(log);
    DataStream<Log> input = env.fromCollection(logs).keyBy(new KeySelector<Log, String>() {
        public String getKey(Log log){
            return (String)log.getItem("id");
        }
    });

    构造pattern,

    JSONArray jsonArray = JSON.parseArray(
        "[{"id":"start","conditions":[[["sql","contains","start"]]]},{"id":"middle","conditions":[[["sql","contains","end"]]]}]");

    JSONArray jsonArray2 = JSON.parseArray(
        "[{"id":"start","conditions":[[["sql","contains","start"]]]},{"id":"middle","conditions":[[["sql","contains","select"]]]},{"id":"end","conditions":[[["sql","contains","end"]]]}]");

    CepBuilder<Log> cepBuilder = new CepBuilder<Log>();
    Pattern<Log, ?> pattern = cepBuilder.patternSequenceBuilder(jsonArray);
    Pattern<Log, ?>  pattern2 = cepBuilder.patternSequenceBuilder(jsonArray2);

    Map<String,  Pattern<Log, ?>> patternMap = new HashedMap();
    patternMap.put("1", pattern);
    patternMap.put("2", pattern2);
    对于id=”1“的log,找出包含”start“,”end“的pattern
    对于id=”2“的log,找出包含”start“,”select“,”end“的pattern

    运行CEP,

        GroupPatternStream<String, Log> groupPatternStream = new GroupPatternStream<>(input, patternMap);
            DataStream<String> result =groupPatternStream.select(
                new PatternSelectFunction<Log, String>() {
                        return pattern.toString();
                    }
                });
            result.print();
    得到运行结果,
    2> {middle=[{id=2, sql=select from 1}], start=[{id=2, sql=start counting!}], end=[{id=2, sql=end counting}]}
    4> {middle=[{id=1, sql=end counting}], start=[{id=1, sql=start counting!}]}
    可以看到对于不同的key,匹配到了不同的pattern,是不是很酷

  • 相关阅读:
    《编译原理》-用例题理解-自顶向下语法分析及 FIRST,FOLLOW,SELECT集,LL(1)文法
    8 张脑图入门 JavaScript
    Navicat Premium 12连接Oracle时提示oracle library is not loaded的问题解决
    Spring boot 多模块项目 + Swagger 让你的API可视化
    Spring Boot -05- 多模块结构项目构建与测试(详细图文教程)IDEA 版
    JAVA 实现 QQ 邮箱发送验证码功能(不局限于框架)
    SSM 项目从搭建爬坑到 CentOS 服务器部署
    LeetCode
    有趣的位运算
    记一次向maven中央仓库提交依赖包
  • 原文地址:https://www.cnblogs.com/fxjwind/p/8085298.html
Copyright © 2011-2022 走看看