1.1 模块创建和数据准备
继续在Flink-Project下新建一个 maven module作为子项目,命名为gmall-login-fail。在这个子模块中,我们将会用到flink的CEP库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>1.10.0</version>
</dependency>
1.2 代码实现
对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
1.2.1 状态编程
由于同样引入了时间,我们可以想到,最简单的方法其实与之前的热门统计类似,只需要按照用户ID分流,然后遇到登录失败的事件时将其保存在ListState中,然后设置一个定时器,2秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于2,那么就输出报警信息。
登录数据本应该从UserBehavior日志里提取,由于UserBehavior.csv中没有做相关埋点,我们从另一个文件LoginLog.csv中读取登录数据。
代码如下:
1)JavaBean--LoginEvent
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginEvent {
private Long userId;
private String ip;
private String eventType;
private Long timestamp;
}
2)主程序
public class LoginFailApp {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.读取文本数据创建流,转换为JavaBean,提取时间戳
SingleOutputStreamOperator<LoginEvent> loginEventDS = env.readTextFile("input/LoginLog.csv")
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(Long.parseLong(fields[0]),
fields[1],
fields[2],
Long.parseLong(fields[3]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
//3.按照userId进行分组
SingleOutputStreamOperator<String> result = loginEventDS.keyBy(data -> data.getUserId())
.process(new LoginFailKeyProcessFunc());
//4.打印
result.print();
//5.执行
env.execute();
}
public static class LoginFailKeyProcessFunc extends KeyedProcessFunction<Long,LoginEvent,String>{
//定义状态数据
private ListState<LoginEvent> listState;
private ValueState<Long> tsState;
@Override
public void open(Configuration parameters) throws Exception {
listState=getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("list-state",LoginEvent.class));
tsState=getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state",Long.class));
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<String> out) throws Exception {
//取出状态数据
Iterable<LoginEvent> loginEvents = listState.get();
//判断是否为空,确定是否为第一条失败数据
if(!loginEvents.iterator().hasNext()){
//第一条数据,则判断当前数据是否为登录失败
if("fail".equals(value.getEventType())){
//将当前数据放置状态中,注册2s后的定时器
listState.add(value);
long ts= (value.getTimestamp() +2) * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
tsState.update(ts);
}
}
else{
//不是第一条数据,并且是失败数据
if("fail".equals(value.getEventType())){
listState.add(value);
}else{
//不是第一条数据,并且是成功数据,删除定时器
ctx.timerService().deleteEventTimeTimer(tsState.value());
//清空状态
listState.clear();
tsState.clear();
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//取出状态中的数据
List<LoginEvent> loginEvents = Lists.newArrayList(listState.get().iterator());
int size = loginEvents.size();
//如果集合中数据大于等于2,则输出报警信息
if(size >= 2){
LoginEvent firstFail = loginEvents.get(0);
LoginEvent lastFail = loginEvents.get(size - 1);
out.collect(ctx.getCurrentKey()+
"在"+firstFail.getTimestamp() +
"到" + lastFail.getTimestamp() +
"之间登录失败" + size + "次!");
}
//清空状态
listState.clear();
tsState.clear();
}
}
}
1.2.2 状态编程的改进
上一节的代码实现中我们可以看到,直接把每次登录失败的数据存起来、设置定时器一段时间后再读取,这种做法尽管简单,但和我们开始的需求还是略有差异的。这种做法只能隔2秒之后去判断一下这期间是否有多次失败登录,而不是在一次登录失败之后、再一次登录失败时就立刻报警。这个需求如果严格实现起来,相当于要判断任意紧邻的事件,是否符合某种模式。
于是我们可以想到,这个需求其实可以不用定时器触发,直接在状态中存取上一次登录失败的事件,每次都做判断和比对,就可以实现最初的需求。
public class LoginFailApp2 {
public static void main(String[] args) throws Exception {
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.读取文本数据创建流,转换为JavaBean,提取时间戳
SingleOutputStreamOperator<LoginEvent> loginEventDS = env.readTextFile("input/LoginLog.csv")
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(Long.parseLong(fields[0]),
fields[1],
fields[2],
Long.parseLong(fields[3]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
//3.按照userId进行分组
SingleOutputStreamOperator<String> result = loginEventDS.keyBy(data -> data.getUserId())
.process(new LoginFailKeyProcessFunc());
//4.打印
result.print();
env.execute();
}
public static class LoginFailKeyProcessFunc extends KeyedProcessFunction<Long,LoginEvent,String>{
//定义状态数据
private ListState<LoginEvent> listState;
@Override
public void open(Configuration parameters) throws Exception {
listState=getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("list-state",LoginEvent.class));
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<String> out) throws Exception {
//取出状态中的数据
Iterator<LoginEvent> iterator = listState.get().iterator();
//判断当前是否为失败数据
if("fail".equals(value.getEventType())){
//判断集合中是否有数据
if(iterator.hasNext()){
//取出集合中的数据
LoginEvent lastLogFail= iterator.next();
//判断两次失败数据之间时间间隔是否小于等于2
if(value.getTimestamp()-lastLogFail.getTimestamp() <= 2){
//报警
out.collect(ctx.getCurrentKey()+
"在" + lastLogFail.getTimestamp()+
"到" + value.getTimestamp() +
"之间登录失败" + 2+ "次"
);
}
listState.clear();
listState.add(value);
}else{
listState.add(value);
}
}else {
listState.clear();
}
}
}
}
1.2.3 CEP编程
上一节我们通过对状态编程的改进,去掉了定时器,在process function中做了更多的逻辑处理,实现了最初的需求。不过这种方法里有很多的条件判断,而我们目前仅仅实现的是检测“连续2次登录失败”,这是最简单的情形。如果需要检测更多次,内部逻辑显然会变得非常复杂。那有什么方式可以方便地实现呢?
很幸运,flink为我们提供了CEP(Complex Event Processing,复杂事件处理)库,用于在流中筛选符合某种复杂模式的事件。接下来我们就基于CEP来完成这个模块的实现。
代码如下:
public class LoginFailWithCep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文本读取数据
SingleOutputStreamOperator<LoginEvent> loginEventDS = env.readTextFile("input/LoginLog.csv")
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(Long.parseLong(fields[0]), fields[1], fields[2], Long.parseLong(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 1. 定义一个模式,指定多个事件互相之前的关系
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getEventType());
}//这是循环模式
}).times(2) //times默认是followedby 不挨着
.consecutive() //连续 挨着
.within(Time.seconds(2));//这是循环模式
// .next("secondFail")//这是单例模式
// .where(new SimpleCondition<LoginEvent>() {
// @Override
// public boolean filter(LoginEvent value) throws Exception {
// return "fail".equals(value.getEventType());
// }
// })
// .within(Time.seconds(2));
// 2. 在以userId分组之后的数据流上应用pattern
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS.keyBy(data -> data.getUserId()), loginFailPattern);
// 3. 检出符合匹配条件的事件,得到一个新的DataStream
SingleOutputStreamOperator<String> loginFailWarningStream = patternStream.select(new LoginFailMatchSelect());
loginFailWarningStream.print();
env.execute("login fail detect job");
}
// 实现自定义PatternSelectFunction
public static class LoginFailMatchSelect implements PatternSelectFunction<LoginEvent,String>{
@Override
public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
//List<LoginEvent> next1 = pattern.values().iterator().next();
// List<LoginEvent> firstFail = pattern.get("firstFail");
// List<LoginEvent> secondFail = pattern.get("secondFail");
// return firstFail.get(0).getUserId()+","+firstFail.get(0).getTimestamp()+","+secondFail.get(0).getTimestamp();
List<LoginEvent> firstFail = pattern.get("firstFail");
return firstFail.get(0).getUserId()+","+firstFail.get(0).getTimestamp()+","+firstFail.get(1).getTimestamp()+",login fail";
}
}
}