zoukankan      html  css  js  c++  java
  • Flink之CEP(Complex Event Processing,复杂事件处理)的使用

    需要在pom导入对应的依赖,如下所示:

    <!-- flink中的CEP -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.11</artifactId>
        <version>1.10.2</version>
    </dependency>

    需要使用的样例类如下所示:

    case class Login(userId: String, ip: String, eventType: String, eventTime: String)
    
    case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)

    在main函数中的代码如下所示:

    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    // 生成数据,封装成样例类,并设置时间属性
    val loginEventStream: DataStream[Login] = env.fromCollection(List(
        Login("1", "192.168.0.1", "fail", "1558430842"),
        Login("1", "192.168.0.2", "fail", "1558430843"),
        Login("1", "192.168.0.3", "fail", "1558430844"),
        Login("2", "192.168.10.10", "success", "1558430845")
    )).assignAscendingTimestamps(_.eventTime.toLong)
    
    /**
     * Flink的CEP中函数的使用说明:
     * 1、.bgein[]()        一个模式的开始样式
     * 2、.next()           紧跟着上一个样式(中间不能有其他,严格近邻)
     * 3、.followedBy()     不要紧跟上一个样式(中间可以有其他,非严格近邻)
     * 4、.where()          样式的条件(传入的参数为过滤的条件)
     * 5、.within()         时间限制(为Flink中的Time类型)
     */
    // 定义模式
    val loginFailPatten: Pattern[Login, Login] = Pattern
        .begin[Login]("begin").where(_.eventType == "fail")
        .next("next").where(_.eventType == "fail")
        .within(Time.seconds(10))
    // 获取流中符合模式的数据 val patternStream: PatternStream[Login] = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPatten) // 通过select将数据从patternStream中获取出来,并可以封装成需要的类型 val loginFailDataStream: DataStream[Warning] = patternStream.select(new PatternSelectFunction[Login, Warning] { override def select(pattern: util.Map[String, util.List[Login]]): Warning = { val first: Login = pattern.get("begin").get(0) val last: Login = pattern.get("next").get(0) Warning(first.userId.toLong, first.eventTime.toLong, last.eventTime.toLong, "login fail") } }) // 获取超时数据(即在within时间之后,next中的数据还是没有时),在调用select时,需要传入一个侧输出流的标签,会将超时数据放入侧输出流中 val overtimeTag: OutputTag[String] = new OutputTag[String]("overtime") val overtimeStream: DataStream[Warning] = patternStream.select( overtimeTag, new PatternTimeoutFunction[Login, String] { override def timeout(pattern: util.Map[String, util.List[Login]], timeoutTimestamp: Long): String = { "这是超时数据" } }, new PatternSelectFunction[Login, Warning] { override def select(pattern: util.Map[String, util.List[Login]]): Warning = { val first: Login = pattern.get("begin").get(0) val last: Login = pattern.get("next").get(0) Warning(first.userId.toLong, first.eventTime.toLong, last.eventTime.toLong, "login fail") } } ) // 打印数据 loginFailDataStream.print("loginFailDataStream") overtimeStream.print("overtimeStream") overtimeStream.getSideOutput(overtimeTag).print("overtimeTag") // 启动执行器,执行任务 env.execute("CEPDemo")
  • 相关阅读:
    左边的div导航根据右部div内容的高自动调整
    IE中在a标签里的图片会显示边框
    MVC4中视图获取控制器中返回的json格式数据
    Oracle 分页
    各个数据库中top 的表示方法
    AndroidPageObjectTest_TimeOutManagement.java
    AndroidSlideTest.java
    区分:AndroidDriver, iOSDriver, AppiumDriver and Remote WebDriver
    区分:WebElement, MobileElement, AndroidElement, and iosElement
    AndroidTest.java
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14138950.html
Copyright © 2011-2022 走看看