zoukankan      html  css  js  c++  java
  • Flink之CEP(Complex Event Processing,复杂事件处理)的使用

    需要在pom导入对应的依赖,如下所示:

    <!-- flink中的CEP -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.11</artifactId>
        <version>1.10.2</version>
    </dependency>

    需要使用的样例类如下所示:

    case class Login(userId: String, ip: String, eventType: String, eventTime: String)
    
    case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)

    在main函数中的代码如下所示:

    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    // 生成数据,封装成样例类,并设置时间属性
    val loginEventStream: DataStream[Login] = env.fromCollection(List(
        Login("1", "192.168.0.1", "fail", "1558430842"),
        Login("1", "192.168.0.2", "fail", "1558430843"),
        Login("1", "192.168.0.3", "fail", "1558430844"),
        Login("2", "192.168.10.10", "success", "1558430845")
    )).assignAscendingTimestamps(_.eventTime.toLong)
    
    /**
     * Flink的CEP中函数的使用说明:
     * 1、.bgein[]()        一个模式的开始样式
     * 2、.next()           紧跟着上一个样式(中间不能有其他,严格近邻)
     * 3、.followedBy()     不要紧跟上一个样式(中间可以有其他,非严格近邻)
     * 4、.where()          样式的条件(传入的参数为过滤的条件)
     * 5、.within()         时间限制(为Flink中的Time类型)
     */
    // 定义模式
    val loginFailPatten: Pattern[Login, Login] = Pattern
        .begin[Login]("begin").where(_.eventType == "fail")
        .next("next").where(_.eventType == "fail")
        .within(Time.seconds(10))
    // 获取流中符合模式的数据 val patternStream: PatternStream[Login] = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPatten) // 通过select将数据从patternStream中获取出来,并可以封装成需要的类型 val loginFailDataStream: DataStream[Warning] = patternStream.select(new PatternSelectFunction[Login, Warning] { override def select(pattern: util.Map[String, util.List[Login]]): Warning = { val first: Login = pattern.get("begin").get(0) val last: Login = pattern.get("next").get(0) Warning(first.userId.toLong, first.eventTime.toLong, last.eventTime.toLong, "login fail") } }) // 获取超时数据(即在within时间之后,next中的数据还是没有时),在调用select时,需要传入一个侧输出流的标签,会将超时数据放入侧输出流中 val overtimeTag: OutputTag[String] = new OutputTag[String]("overtime") val overtimeStream: DataStream[Warning] = patternStream.select( overtimeTag, new PatternTimeoutFunction[Login, String] { override def timeout(pattern: util.Map[String, util.List[Login]], timeoutTimestamp: Long): String = { "这是超时数据" } }, new PatternSelectFunction[Login, Warning] { override def select(pattern: util.Map[String, util.List[Login]]): Warning = { val first: Login = pattern.get("begin").get(0) val last: Login = pattern.get("next").get(0) Warning(first.userId.toLong, first.eventTime.toLong, last.eventTime.toLong, "login fail") } } ) // 打印数据 loginFailDataStream.print("loginFailDataStream") overtimeStream.print("overtimeStream") overtimeStream.getSideOutput(overtimeTag).print("overtimeTag") // 启动执行器,执行任务 env.execute("CEPDemo")
  • 相关阅读:
    java的构造方法 java程序员
    No result defined for action cxd.action.QueryAction and result success java程序员
    大学毕业后拉开差距的真正原因 java程序员
    hibernate的回滚 java程序员
    验证码 getOutputStream() has already been called for this response异常的原因和解决方法 java程序员
    浅谈ssh(struts,spring,hibernate三大框架)整合的意义及其精髓 java程序员
    你平静的生活或许会在某个不可预见的时刻被彻底打碎 java程序员
    Spring配置文件中使用ref local与ref bean的区别. 在ApplicationResources.properties文件中,使用<ref bean>与<ref local>方法如下 java程序员
    poj1416Shredding Company
    poj1905Expanding Rods
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14138950.html
Copyright © 2011-2022 走看看