zoukankan      html  css  js  c++  java
  • flink(六) 电商用户行为分析(六)恶意登录监控之连续登陆超时

    1 模块创建和数据准备

      继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为LoginFailDetect。在这个子模块中,我们将会用到 flink 的 CEP 库来实现事件流的模
    式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
      对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,
    应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同 IP)在 2 秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行
    报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
     
    2.1 状态编程
      由于同样引入了时间,我们可以想到,最简单的方法其实与之前的热门统计类似,只需要按照用户 ID 分流,然后遇到登录失败的事件时将其保存在 ListState 中,
    然后设置一个定时器,2 秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于 2,那么就输出报警信息。
      在 src/main/scala 下创建 LoginFail.scala 文件,新建一个单例对象。定义样例类LoginEvent,这是输入的登录事件流。登录数据本应该从 UserBehavior 日志里提取,
    由于 UserBehavior.csv 中没有做相关埋点,我们从另一个文件 LoginLog.csv 中读取登录数据。
    package com.atguigu
    
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable.ListBuffer
    
    //定义输入输出类
    case class LoginEvent(userId:Long, ip:String, eventType:String, eventTime: Long)
    case class Warning(userId:Long, firstFailTime:Long, lastFailTime:Long, warningMsg:String)
    
    object LoginFail {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val resource = getClass.getResource("/LoginLog.csv")
        //val loginEventStream:DataStream[LoginEvent] = env.readTextFile(resource.getPath)
    
        val loginEventStream:DataStream[LoginEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\LoginFailDetect\src\main\resources\LoginLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
            override def extractTimestamp(t: LoginEvent): Long = t.eventTime*1000L
          })
    
        val loginWarningStream:DataStream[Warning] = loginEventStream
          .keyBy(_.userId)
          .process( new LoginFailWarning(2))
    
        loginWarningStream.print()
        env.execute("login fail job")
    
      }
    
    }
    
    // 实现自定义的ProcessFunction
    class LoginFailWarning(maxFailTime: Int) extends KeyedProcessFunction[Long, LoginEvent, Warning]{
      // 定义list状态,用来保存2秒内所有的登录失败事件
      lazy val LoginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("saved-logingfail",classOf[LoginEvent]))
      // 定义value状态,用来保存定时器的时间戳
      lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts",classOf[Long]))
    
      override def processElement(value: LoginEvent, context: KeyedProcessFunction[Long, LoginEvent, Warning]#Context, collector: Collector[Warning]): Unit = {
        if(value.eventType == "fail"){
          LoginFailListState.add(value)
          if(timerTsState.value()==0){
            val ts = value.eventTime*1000L + 2000L
            context.timerService().registerEventTimeTimer(ts)
            timerTsState.update(ts)
          }
        }else{
          context.timerService().deleteEventTimeTimer(timerTsState.value())
          LoginFailListState.clear()
          timerTsState.clear()
        }
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#OnTimerContext, out: Collector[Warning]): Unit = {
        val allLoginFailList:ListBuffer[LoginEvent] = new ListBuffer[LoginEvent]
        val iter = LoginFailListState.get().iterator()
        while(iter.hasNext){
          allLoginFailList += iter.next()
        }
    
        if(allLoginFailList.length >= maxFailTime){
          out.collect(Warning( ctx.getCurrentKey,
            allLoginFailList.head.eventTime,
            allLoginFailList.last.eventTime,
          "login fall in 2s for " + allLoginFailList.length + " times."))
    
        }
    
        LoginFailListState.clear()
        timerTsState.clear()
    
    
      }
    
    
    }

    CEP

    package com.atguigu.loginfail_detect
    
    import java.util
    
    import com.atguigu.LoginFail.getClass
    import org.apache.flink.cep.PatternSelectFunction
    import org.apache.flink.cep.scala.CEP
    import org.apache.flink.cep.scala.pattern.Pattern
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable.ListBuffer
    
    
    //定义输入输出类
    case class LoginEvent(userId:Long, ip:String, eventType:String, eventTime: Long)
    case class Warning(userId:Long, firstFailTime:Long, lastFailTime:Long, warningMsg:String)
    
    object LoginFailCEP {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val resource = getClass.getResource("/LoginLog.csv")
        //val loginEventStream:DataStream[LoginEvent] = env.readTextFile(resource.getPath)
    
        val loginEventStream:DataStream[LoginEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\LoginFailDetect\src\main\resources\LoginLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
            override def extractTimestamp(t: LoginEvent): Long = t.eventTime*1000L
          })
    
        // 1.定义匹配的模式
        val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
          .begin[LoginEvent]("firstFail").where(_.eventType == "fail")
          .next("secondFail").where(_.eventType == "fail")
          .within(Time.seconds(2))
    
        // 2 在分组之后的数据流上应用模式,等到一个PatternStream
        val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
    
        // 3 将检测到的事件序列,转换输出报警信息
        val loginFailStream: DataStream[Warning] = patternStream.select( new LoginFailDetect())
    
        // 4 打印输出
        loginFailStream.print()
    
        env.execute("login fail job")
      }
    
    }
    
    // 自定义PatternSelectFunction, 用来检测到的连续登陆失败事件,包装成报警信息输出
    class LoginFailDetect extends PatternSelectFunction[LoginEvent, Warning]{
      override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
        // map 例存放的就是匹配到的一组事件,key是定义好的事件模式名称
        val firstLoginFail = map.get("firstFail").get(0)
        val secondLoginFail = map.get("secondFail").get(0)
        Warning( firstLoginFail.userId, firstLoginFail.eventTime, secondLoginFail.eventTime, "login fail")
    
      }
    }
     
     
     

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13492005.html

  • 相关阅读:
    搭建App主流框架_纯代码搭建(OC)
    UIApplication,UIWindow,UIViewController,UIView(layer)
    插件类
    VIEWCONTROLLER的启动流程
    UIView你知道多少
    分析UIWindow
    创建控制器的3种方式、深入了解view的创建和加载顺序
    UIViewController的生命周期及iOS程序执行顺序
    ViewController加载顺序与self.view
    GCD
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13492005.html
Copyright © 2011-2022 走看看