zoukankan      html  css  js  c++  java
  • 四.Flink实时项目电商用户行为分析之恶意登录监控

    1.1 模块创建和数据准备

    继续在Flink-Project下新建一个 maven module作为子项目,命名为gmall-login-fail。在这个子模块中,我们将会用到flinkCEP库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>1.10.0</version>
    </dependency>

    1.2 代码实现

    对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。

    1.2.1 状态编程

    由于同样引入了时间,我们可以想到,最简单的方法其实与之前的热门统计类似,只需要按照用户ID分流,然后遇到登录失败的事件时将其保存在ListState中,然后设置一个定时器,2秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于2,那么就输出报警信息。

    登录数据本应该从UserBehavior日志里提取,由于UserBehavior.csv中没有做相关埋点,我们从另一个文件LoginLog.csv中读取登录数据。

    代码如下:

    1JavaBean--LoginEvent

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class LoginEvent {
        private Long userId;
        private String ip;
        private String eventType;
        private Long timestamp;
    }

    2)主程序

    public class LoginFailApp {
    public static void main(String[] args) throws Exception {
    //1.创建环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文本数据创建流,转换为JavaBean,提取时间戳
    SingleOutputStreamOperator<LoginEvent> loginEventDS = env.readTextFile("input/LoginLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new LoginEvent(Long.parseLong(fields[0]),
    fields[1],
    fields[2],
    Long.parseLong(fields[3]));
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2)) {
    @Override
    public long extractTimestamp(LoginEvent element) {
    return element.getTimestamp() * 1000L;
    }
    });
    //3.按照userId进行分组
    SingleOutputStreamOperator<String> result = loginEventDS.keyBy(data -> data.getUserId())
    .process(new LoginFailKeyProcessFunc());
    //4.打印
    result.print();
    //5.执行
    env.execute();
    }

    public static class LoginFailKeyProcessFunc extends KeyedProcessFunction<Long,LoginEvent,String>{

    //定义状态数据
    private ListState<LoginEvent> listState;
    private ValueState<Long> tsState;

    @Override
    public void open(Configuration parameters) throws Exception {
    listState=getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("list-state",LoginEvent.class));
    tsState=getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state",Long.class));
    }

    @Override
    public void processElement(LoginEvent value, Context ctx, Collector<String> out) throws Exception {
    //取出状态数据
    Iterable<LoginEvent> loginEvents = listState.get();
    //判断是否为空,确定是否为第一条失败数据
    if(!loginEvents.iterator().hasNext()){
    //第一条数据,则判断当前数据是否为登录失败
    if("fail".equals(value.getEventType())){
    //将当前数据放置状态中,注册2s后的定时器
    listState.add(value);
    long ts= (value.getTimestamp() +2) * 1000L;
    ctx.timerService().registerEventTimeTimer(ts);
    tsState.update(ts);
    }
    }
    else{
    //不是第一条数据,并且是失败数据
    if("fail".equals(value.getEventType())){
    listState.add(value);
    }else{
    //不是第一条数据,并且是成功数据,删除定时器
    ctx.timerService().deleteEventTimeTimer(tsState.value());
    //清空状态
    listState.clear();
    tsState.clear();
    }
    }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    //取出状态中的数据
    List<LoginEvent> loginEvents = Lists.newArrayList(listState.get().iterator());
    int size = loginEvents.size();
    //如果集合中数据大于等于2,则输出报警信息
    if(size >= 2){
    LoginEvent firstFail = loginEvents.get(0);
    LoginEvent lastFail = loginEvents.get(size - 1);
    out.collect(ctx.getCurrentKey()+
    ""+firstFail.getTimestamp() +
    "" + lastFail.getTimestamp() +
    "之间登录失败" + size + "次!");
    }
    //清空状态
    listState.clear();
    tsState.clear();
    }
    }
    }

    1.2.2 状态编程的改进

    上一节的代码实现中我们可以看到,直接把每次登录失败的数据存起来、设置定时器一段时间后再读取,这种做法尽管简单,但和我们开始的需求还是略有差异的。这种做法只能隔2秒之后去判断一下这期间是否有多次失败登录,而不是在一次登录失败之后、再一次登录失败时就立刻报警。这个需求如果严格实现起来,相当于要判断任意紧邻的事件,是否符合某种模式。

    于是我们可以想到,这个需求其实可以不用定时器触发,直接在状态中存取上一次登录失败的事件,每次都做判断和比对,就可以实现最初的需求。

    public class LoginFailApp2 {
    public static void main(String[] args) throws Exception {
    //1.创建环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文本数据创建流,转换为JavaBean,提取时间戳
    SingleOutputStreamOperator<LoginEvent> loginEventDS = env.readTextFile("input/LoginLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new LoginEvent(Long.parseLong(fields[0]),
    fields[1],
    fields[2],
    Long.parseLong(fields[3]));
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2)) {
    @Override
    public long extractTimestamp(LoginEvent element) {
    return element.getTimestamp() * 1000L;
    }
    });
    //3.按照userId进行分组
    SingleOutputStreamOperator<String> result = loginEventDS.keyBy(data -> data.getUserId())
    .process(new LoginFailKeyProcessFunc());
    //4.打印
    result.print();
    env.execute();
    }

    public static class LoginFailKeyProcessFunc extends KeyedProcessFunction<Long,LoginEvent,String>{
    //定义状态数据
    private ListState<LoginEvent> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
    listState=getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("list-state",LoginEvent.class));
    }

    @Override
    public void processElement(LoginEvent value, Context ctx, Collector<String> out) throws Exception {
    //取出状态中的数据
    Iterator<LoginEvent> iterator = listState.get().iterator();
    //判断当前是否为失败数据
    if("fail".equals(value.getEventType())){
    //判断集合中是否有数据
    if(iterator.hasNext()){
    //取出集合中的数据
    LoginEvent lastLogFail= iterator.next();
    //判断两次失败数据之间时间间隔是否小于等于2
    if(value.getTimestamp()-lastLogFail.getTimestamp() <= 2){
    //报警
    out.collect(ctx.getCurrentKey()+
    "" + lastLogFail.getTimestamp()+
    "" + value.getTimestamp() +
    "之间登录失败" + 2+ ""
    );
    }
    listState.clear();
    listState.add(value);

    }else{
    listState.add(value);
    }
    }else {
    listState.clear();
    }
    }
    }
    }

    1.2.3 CEP编程

    上一节我们通过对状态编程的改进,去掉了定时器,在process function中做了更多的逻辑处理,实现了最初的需求。不过这种方法里有很多的条件判断,而我们目前仅仅实现的是检测“连续2次登录失败”,这是最简单的情形。如果需要检测更多次,内部逻辑显然会变得非常复杂。那有什么方式可以方便地实现呢?

    很幸运,flink为我们提供了CEPComplex Event Processing,复杂事件处理)库,用于在流中筛选符合某种复杂模式的事件。接下来我们就基于CEP来完成这个模块的实现。

    代码如下:

    public class LoginFailWithCep {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    //从文本读取数据
    SingleOutputStreamOperator<LoginEvent> loginEventDS = env.readTextFile("input/LoginLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new LoginEvent(Long.parseLong(fields[0]), fields[1], fields[2], Long.parseLong(fields[3]));
    })
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
    @Override
    public long extractTimestamp(LoginEvent element) {
    return element.getTimestamp() * 1000L;
    }
    });
    // 1. 定义一个模式,指定多个事件互相之前的关系

    Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
    @Override
    public boolean filter(LoginEvent value) throws Exception {
    return "fail".equals(value.getEventType());
    }//这是循环模式
    }).times(2) //times默认是followedby 不挨着
    .consecutive() //连续 挨着
    .within(Time.seconds(2));//这是循环模式
    // .next("secondFail")//这是单例模式
    // .where(new SimpleCondition<LoginEvent>() {
    // @Override
    // public boolean filter(LoginEvent value) throws Exception {
    // return "fail".equals(value.getEventType());
    // }
    // })
    // .within(Time.seconds(2));
    // 2. 在以userId分组之后的数据流上应用pattern
    PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS.keyBy(data -> data.getUserId()), loginFailPattern);
    // 3. 检出符合匹配条件的事件,得到一个新的DataStream
    SingleOutputStreamOperator<String> loginFailWarningStream = patternStream.select(new LoginFailMatchSelect());
    loginFailWarningStream.print();
    env.execute("login fail detect job");

    }

    // 实现自定义PatternSelectFunction
    public static class LoginFailMatchSelect implements PatternSelectFunction<LoginEvent,String>{

    @Override
    public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
    //List<LoginEvent> next1 = pattern.values().iterator().next();
    // List<LoginEvent> firstFail = pattern.get("firstFail");
    // List<LoginEvent> secondFail = pattern.get("secondFail");
    // return firstFail.get(0).getUserId()+","+firstFail.get(0).getTimestamp()+","+secondFail.get(0).getTimestamp();
    List<LoginEvent> firstFail = pattern.get("firstFail");
    return firstFail.get(0).getUserId()+","+firstFail.get(0).getTimestamp()+","+firstFail.get(1).getTimestamp()+",login fail";
    }
    }

    }

     

     

     

     

     

  • 相关阅读:
    小程序请求Django后台及路由跳转
    git操作
    github 介绍
    小程序01
    HTML5要点(四)对象全整理
    JavaScript要点(十二) HTML DOM 事件
    JavaScript要点(九)HTML DOM
    JavaScript要点(八) 闭包
    inferred 和 freefrom
    MySql数据库实现分布式的主从结构
  • 原文地址:https://www.cnblogs.com/whdd/p/14058787.html
Copyright © 2011-2022 走看看