1.1 模块创建和数据准备
在Flink-project下新建一个 maven module作为子项目,命名为gmall-network-flow。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。
在src/main/目录下,将apache服务器的日志文件apache.log复制到资源文件目录input下,我们将从这里读取数据。
当然,我们也可以仍然用UserBehavior.csv作为数据源,这时我们分析的就不是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”)操作。
1.2 基于服务器log的热门页面浏览量统计
我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。
具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。
在src/main/app下创建HotUrlApp类。定义javaBean ApacheLog,这是输入的日志数据流;另外还有UrlViewCount,这是窗口操作统计的输出数据类型。在main函数中创建StreamExecutionEnvironment 并做配置,然后从apache.log文件中读取数据,并包装成ApacheLog类型。
需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个DateTimeFormat将其转换为我们需要的时间戳格式:
.map(line -> {
String[] split = line.split(",");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = sdf.parse(split[3]).getTime();
return new ApacheLog(split[0], split[1], time, split[5], split[6]);
})
完整代码如下:
public class HotUrlApp {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.读取文本文件创建流,转换为JavaBean并提取时间戳
// SingleOutputStreamOperator<ApacheLog> apachLogDS = env.readTextFile("input/apache.log")
SingleOutputStreamOperator<ApacheLog> apachLogDS = env.socketTextStream("hadoop102", 7777)
.map(line -> {
String[] fields = line.split(" ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = sdf.parse(fields[3]).getTime();
return new ApacheLog(fields[0], fields[1], time, fields[5], fields[6]);
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLog>(Time.seconds(1)) {
@Override
public long extractTimestamp(ApacheLog element) {
return element.getEventTime();
}
});
OutputTag<ApacheLog> outputTag = new OutputTag<ApacheLog>("sideOutPut") {
};
//3.过滤数据,按照url分组,开窗,累加计算
SingleOutputStreamOperator<UrlViewCount> aggregate = apachLogDS
.filter(data -> "GET".equals(data.getMethod()))
.keyBy(data -> data.getUrl())
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.seconds(60))
.sideOutputLateData(outputTag)
.aggregate(new UrlCountAggFunc(), new UrlCountWindowFunc());
//4.按照窗口结束时间重新分组,计算组内排序
SingleOutputStreamOperator<String> result = aggregate.keyBy(data -> data.getWindowEnd())
.process(new UrlCountProcessFunc(5));
//5.打印数据
apachLogDS.print("apachLogDS");
aggregate.print("aggregate");
result.print("result");
aggregate.getSideOutput(outputTag).print("side");
//6.执行
env.execute();
}
public static class UrlCountAggFunc implements AggregateFunction<ApacheLog, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ApacheLog value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class UrlCountWindowFunc implements WindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
public void apply(String url, TimeWindow window, Iterable<Long> input, Collector<UrlViewCount> out) throws Exception {
out.collect(new UrlViewCount(url, window.getEnd(), input.iterator().next()));
}
}
public static class UrlCountProcessFunc extends KeyedProcessFunction<Long, UrlViewCount, String> {
//定义TopSize属性
private Integer topSize;
public UrlCountProcessFunc() {
}
public UrlCountProcessFunc(Integer topSize) {
this.topSize = topSize;
}
//定义集合状态用于存放同一个窗口中的数据
private MapState<String,UrlViewCount> mapState; //不能用ListState,因为它会把相同url的数据都会保持而我们只需要后面那个状态的,例如<url,1>,<url,2>但我们只要最新来的那个更新后的数据
@Override
public void open(Configuration parameters) throws Exception {
mapState=getRuntimeContext().getMapState(new MapStateDescriptor<String, UrlViewCount>("map-state",String.class,UrlViewCount.class));
}
@Override
public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {
//将数据放置集合状态
mapState.put(value.getUrl(),value);
//注册定时器,用于处理状态中的数据
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
//注册定时器,