zoukankan      html  css  js  c++  java
  • 一.Flink实时项目电商用户行为分析之实时热门商品统计

    项目整体介绍

    1.1 电商的用户行为

    电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、点击和浏览页面、页面停留时间以及页面跳转等等我们可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从web服务器日志中直接读取到。而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,我们一般会在业务系统中相应的位置埋点,然后收集日志进行分析。业务行为数据又可以简单分为两类:一类是能够明显地表现出用户兴趣的行为,比如对商品的收藏、喜欢、评分和评价,我们可以从中对数据进行深入分析,得到用户画像,进而对用户给出个性化的推荐商品列表,这个过程往往会用到机器学习相关的算法;另一类则是常规的业务操作,但需要着重关注一些异常状况以做好风控,比如登录和订单支付。

    1.2 项目主要模块

    基于对电商用户行为数据的基本分类,我们可以发现主要有以下三个分析方向:

      ①热门统计

    利用用户的点击浏览行为,进行流量统计、近期热门商品统计等。

      ②偏好统计

    利用用户的偏好行为,比如收藏、喜欢、评分等,进行用户画像分析,给出个性化的商品推荐列表。

      ③风险控制

    利用用户的常规业务行为,比如登录、下单、支付等,分析数据,对异常情况进行报警提示。

    本项目限于数据,我们只实现热门统计和风险控制中的部分内容,将包括以下五大模块:实时热门商品统计、实时流量统计、市场营销商业指标统计、恶意登录监控和订单支付失效监控,其中细分为以下9个具体指标:

    1.3 数据源解析

    我们准备了一份淘宝用户行为数据集,保存为csv文件。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔

    另外,我们还可以拿到web服务器的日志数据,这里以apache服务器的一份log为例,每一行日志记录了访问者的IPuserId、访问时间、访问方法以及访问的url

    实时热门商品统计

    首先要实现的是实时热门商品统计,我们将会基于UserBehavior数据集来进行分析。

    项目主体用Java编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。

    2.1 创建Maven项目

    2.1.1 项目框架搭建

    打开IDEA,创建一个maven项目,命名为Flink-project。由于包含了多个模块,我们可以以Flink-project作为父项目,并在其下建一个名为gmall-user-behavior的子项目,用于实时统计热门top N商品。

    Flink-project下新建一个 maven module作为子项目,命名为gmall-user-behavior

    父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以Flink-project下的src文件夹可以删掉。

    2.1.2 声明项目中工具的版本信息

    我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在最外层的Flink-project中声明所有子模块共用的版本信息。

    pom.xml中加入以下配置:

    <properties>
    <flink.version>1.10.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <kafka.version>2.2.0</kafka.version>
    </properties>

    2.1.3 添加项目依赖

    对于整个项目而言,所有模块都会用到flink相关的组件,所以我们在Flink-project中引入公有依赖:

    <properties>
    <flink.version>1.10.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <kafka.version>2.2.0</kafka.version>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_${scala.binary.version}</artifactId>
    <version>${kafka.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.3</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>1.10.0</version>
    </dependency>
    </dependencies>

    2.1.4 数据准备

    我们在Flink-project中创建input目录

    将数据文件UserBehavior.csv复制到资源文件目录input下,我们将从这里读取数据

    2.2 模块代码实现

    我们将实现一个实时热门商品的需求,可以将实时热门商品翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前N个商品。将这个需求进行分解我们大概要做这么几件事情:

    • 抽取出业务时间戳,告诉Flink框架基于业务时间做窗口
    • 过滤出点击行为数据
    • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window
    • 按每个窗口聚合,输出每个窗口中点击量前N名的商品

    2.2.1 程序主体

    目录结构如下:

    src/main/java/beans下定义POJOsUserBehaviorItemCount

    。创建类HotItemApp,在main方法中创建StreamExecutionEnvironment 并做配置,然后从UserBehavior.csv文件中读取数据,并包装成UserBehavior类型。最终输出的时候,将数据封装成ItemCount。代码如下:

    UseBehavior类

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class UserBehavior {
    private Long userId;
    private Long itemId;
    private Integer categoryId;
    private String behavior;
    private Long timestamp;

    }

    ItemCount类

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ItemCount {
    private Long itemId;
    private Long windowEnd;
    private Long count;
    }

    HotItemApp类

    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    //2.从文件读取数据创建流并转换为JavaBean同时提取事件时间
    SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input")
    .map(line -> {
    String[] fields = line.split(",");
    return new UserBehavior(
    Long.parseLong(fields[0]),
    Long.parseLong(fields[1]),
    Integer.parseInt(fields[2]),
    fields[3],
    Long.parseLong(fields[4]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });

    这里注意,我们需要统计业务时间上的每小时的点击量,所以要基于EventTime来处理。那么如果让Flink按照我们想要的业务时间来处理呢?这里主要有两件事情要做。

    第一件是告诉Flink我们现在按照EventTime模式进行处理,Flink默认使用ProcessingTime处理,所以我们要显式设置如下:

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    第二件事情是指定如何获得业务时间,以及生成WatermarkWatermark是用来追踪业务事件的概念,可以理解成EventTime世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做Watermark。这里我们用 AscendingTimestampExtractor来实现时间戳的抽取和Watermark的生成。

    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });

    这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

    2.2.2 过滤出点击事件

    在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前N个商品”。由于原始数据中存在点击、购买、收藏、喜欢各种行为的数据,但是我们只需要统计点击量,所以先使用filter将点击行为数据过滤出来。

    .filter( data -> "pv".equals(data.getBehavior()) )

    2.2.3 设置滑动窗口,统计点击量

    由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计[09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

    .keyBy(data -> data.getItemId())
    .timeWindow(Time.hours(1), Time.minutes(5))
    .aggregate(new ItemCountAggFunc(), new ItemCountWindowFunc());

    我们使用.keyBy("itemId")对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少state的存储压力。较之 .apply(WindowFunction wf) 会将窗口中的数据都存储下来,最后一起计算要高效地多。这里的ItemCountAggFunc()实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

    //自定义增量聚合函数
    public static class ItemCountAggFunc implements AggregateFunction<UserBehavior,Long,Long>{

    @Override
    public Long createAccumulator() {
    return 0L;
    }

    @Override
    public Long add(UserBehavior value, Long accumulator) {
    return accumulator + 1L;
    }

    @Override
    public Long getResult(Long accumulator) {
    return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
    return a+b;
    }
    }

    聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二个参数WindowFunction将每个key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction<主键商品ID,窗口,点击量>封装成了ItemCount进行输出。

      //自定义window函数
    public static class ItemCountWindowFunc implements WindowFunction<Long, ItemCount,Long, TimeWindow>{ //<In,out,key,window>

    @Override
    public void apply(Long itemId, TimeWindow window, Iterable<Long> input, Collector<ItemCount> out) throws Exception {
    long windowEnd = window.getEnd();
    Long count = input.iterator().next();
    out.collect(new ItemCount(itemId,windowEnd,count));
    }
    }

    现在我们就得到了每个商品在每个窗口的点击量的数据流。

    2.2.4 计算最热门Top N商品

    为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemCount中的windowEnd进行keyBy()操作。然后使用ProcessFunction实现一个自定义的TopN函数TopNItemCountProcessFunc来计算点击量排名前5名的商品,并将排名结果格式化成字符串,便于后续输出。

     itemCountDS.keyBy("windowEnd")
    .process(new TopNItemCountProcessFunc(5));// 求点击量前5名的商品

    ProcessFunctionFlink提供的一个low-level API,用于实现更高级的功能。它主要提供了定时器timer的功能(支持EventTimeProcessingTime)。本案例中我们将利用timer来判断何时收齐了某个window下所有商品的点击量数据。由于Watermark的进度是全局的,在processElement方法中,每当收到一条数据ItemCount,我们就注册一个windowEnd+1的定时器(Flink框架会自动忽略同一时间的重复注册)。windowEnd+1的定时器被触发时,意味着收到了windowEnd+1Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在onTimer()中处理将收集的所有商品及点击量进行排序,选出TopN,并将排名信息格式化成字符串后进行输出。

    这里我们还使用了ListState<ItemCount>来存储收到的每条ItemCount消息,保证在发生故障时,状态数据的不丢失和一致性。ListStateFlink提供的类似Java List接口的State API,它集成了框架的checkpoint机制,自动做到了exactly-once的语义保证。

     //自定义window函数
    public static class ItemCountWindowFunc implements WindowFunction<Long, ItemCount,Long, TimeWindow>{

    @Override
    public void apply(Long itemId, TimeWindow window, Iterable<Long> input, Collector<ItemCount> out) throws Exception {
    long windowEnd = window.getEnd();
    Long count = input.iterator().next();
    out.collect(new ItemCount(itemId,windowEnd,count));
    }
    }
    //自定义排序KeyedProcessFunction
    public static class TopNItemCountProcessFunc extends KeyedProcessFunction<Tuple,ItemCount,String>{
    //TopN属性
    private Integer topSize;

    public TopNItemCountProcessFunc() {
    }

    public TopNItemCountProcessFunc(Integer topSize) {
    this.topSize = topSize;
    }
    //定义ListState统一存放相同Key[windowEnd]的数据
    private ListState<ItemCount> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
    listState=getRuntimeContext().getListState(new ListStateDescriptor<ItemCount>("list-state",ItemCount.class));
    }

    @Override
    public void processElement(ItemCount value, Context ctx, Collector<String> out) throws Exception {
    //每来一条数据,将数据存入集合状态
    listState.add(value);
    //注册定时器
    ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    //取出状态中的所有数据
    Iterator<ItemCount> iterator = listState.get().iterator();
    List<ItemCount> itemCounts = Lists.newArrayList(iterator);
    //排序
    itemCounts.sort(new Comparator<ItemCount>() {
    @Override
    public int compare(ItemCount o1, ItemCount o2) {
    if(o1.getCount()>o2.getCount()){
    return -1;
    }else if(o1.getCount()< o2.getCount()){
    return 1;
    }else{
    return 0;
    }
    }
    });
        //后面的步骤是为了可以在控制台打印输出
    StringBuilder sb = new StringBuilder();
    sb.append("====================== ");
    sb.append("当前窗口结束时间为:").append(new Timestamp(timestamp - 1L)).append(" ");

    //取前topSize条数据输出
    for (int i = 0; i < Math.min(topSize, itemCounts.size()); i++) {
    //取出数据
    ItemCount itemCount = itemCounts.get(i);
    sb.append("TOP ").append(i + 1);
    sb.append(" ItemId=").append(itemCount.getItemId());
    sb.append(" 商品热度=").append(itemCount.getCount());
    sb.append(" ");
    }
    sb.append("====================== ");

    //清空状态
    listState.clear();
        // 控制输出频率,模拟实时滚动结果
    Thread.sleep(1000);

    //输出数据
    out.collect(sb.toString());
    }
    }
    }

    2.2.5 完整代码

      //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    //2.从文件读取数据创建流并转换为JavaBean同时提取事件时间
    SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input")
    .map(line -> {
    String[] fields = line.split(",");
    return new UserBehavior(
    Long.parseLong(fields[0]),
    Long.parseLong(fields[1]),
    Integer.parseInt(fields[2]),
    fields[3],
    Long.parseLong(fields[4]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });
    //3.按照"pv"过滤,按照itemID分组,开窗,计算数据
    SingleOutputStreamOperator<ItemCount> itemCountDS = userDS.filter(data -> "pv".equals(data.getBehavior()))
    .keyBy(data -> data.getItemId())
    .timeWindow(Time.hours(1), Time.minutes(5))
    .aggregate(new ItemCountAggFunc(), new ItemCountWindowFunc());
    //4.按照windowEnd分组,排序输出
    SingleOutputStreamOperator<String> result = itemCountDS.keyBy("windowEnd")
    .process(new TopNItemCountProcessFunc(5));
    result.print();
    env.execute();

    }
    //自定义增量聚合函数,每来一个数据就count1
    public static class ItemCountAggFunc implements AggregateFunction<UserBehavior,Long,Long>{

    @Override
    public Long createAccumulator() {
    return 0L;
    }

    @Override
    public Long add(UserBehavior value, Long accumulator) {
    return accumulator + 1L;
    }

    @Override
    public Long getResult(Long accumulator) {
    return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
    return a+b;
    }
    }
    //自定义window函数
    public static class ItemCountWindowFunc implements WindowFunction<Long, ItemCount,Long, TimeWindow>{

    @Override
    public void apply(Long itemId, TimeWindow window, Iterable<Long> input, Collector<ItemCount> out) throws Exception {
    long windowEnd = window.getEnd();
    Long count = input.iterator().next();
    out.collect(new ItemCount(itemId,windowEnd,count));
    }
    }
    //自定义排序KeyedProcessFunction
    public static class TopNItemCountProcessFunc extends KeyedProcessFunction<Tuple,ItemCount,String>{
    //TopN属性
    private Integer topSize;

    public TopNItemCountProcessFunc() {
    }

    public TopNItemCountProcessFunc(Integer topSize) {
    this.topSize = topSize;
    }
    //定义ListState统一存放相同Key[windowEnd]的数据
    private ListState<ItemCount> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
    listState=getRuntimeContext().getListState(new ListStateDescriptor<ItemCount>("list-state",ItemCount.class));
    }

    @Override
    public void processElement(ItemCount value, Context ctx, Collector<String> out) throws Exception {
    //每来一条数据,将数据存入集合状态
    listState.add(value);
    //注册定时器
    ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    //取出状态中的所有数据
    Iterator<ItemCount> iterator = listState.get().iterator();
    List<ItemCount> itemCounts = Lists.newArrayList(iterator);
    //排序
    itemCounts.sort(new Comparator<ItemCount>() {
    @Override
    public int compare(ItemCount o1, ItemCount o2) {
    if(o1.getCount()>o2.getCount()){
    return -1;
    }else if(o1.getCount()< o2.getCount()){
    return 1;
    }else{
    return 0;
    }
    }
    });
    StringBuilder sb = new StringBuilder();
    sb.append("====================== ");
    sb.append("当前窗口结束时间为:").append(new Timestamp(timestamp - 1L)).append(" ");

    //取前topSize条数据输出
    for (int i = 0; i < Math.min(topSize, itemCounts.size()); i++) {
    //取出数据
    ItemCount itemCount = itemCounts.get(i);
    sb.append("TOP ").append(i + 1);
    sb.append(" ItemId=").append(itemCount.getItemId());
    sb.append(" 商品热度=").append(itemCount.getCount());
    sb.append(" ");
    }
    sb.append("====================== ");

    //清空状态
    listState.clear();

    Thread.sleep(1000);

    //输出数据
    out.collect(sb.toString());
    }

    2.2.6 更换Kafka 作为数据源

    实际生产环境中,我们的数据流往往是从Kafka获取到的。如果要让代码更贴近生产实际,我们只需将source更换为Kafka即可:

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "consumer-group");
    properties.setProperty("key.deserializer",  

    "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer",

    "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("auto.offset.reset", "latest");
    DataStream<String> inputStream = env
            .addSource(new FlinkKafkaConsumer<String>("hotitems",

     new SimpleStringSchema(), properties));

    当然,根据实际的需要,我们还可以将Sink指定为KafkaESRedis或其它存储,这里就不一一展开实现了。

    2.2.7 下面我们用flinkTableAPI的方式实现TOPN

    //1.创建环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取数据
    DataStream<String> inputStream = env.readTextFile("input/UserBehavior.csv");

    //3.转换成Java Bean,提取时间戳和watermark
    DataStream<UserBehavior> dataStream = inputStream
    .map(line -> {
    String[] fields = line.split(",");
    return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior element) {
    return element.getTimestamp() * 1000L;
    }
    });
    //4.创建表执行环境
    EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    //5.把流转化为表
    Table dataTable = tableEnv.fromDataStream(dataStream, "itemId, behavior, timestamp.rowtime as ts");
    tableEnv.createTemporaryView("data_table",dataTable);
    //6.SQL实现
    String aggSql="select itemId,count(itemId) as cnt,hop_end(ts,interval '5' minute,interval '1' hour) as windowEnd " +
    "from data_table where behavior = 'pv' " +
    " group by itemId ,hop(ts,interval '5' minute ,interval '1' hour)";

    Table aggSqlTable = tableEnv.sqlQuery(aggSql);
    tableEnv.createTemporaryView("agg",aggSqlTable);
    //7.基于聚合结果排序输出
    String topNSql=" select * from " +
    "(select * ,row_number() over(partition by windowEnd order cnt desc) as row_num" +
    " from agg )" +
    " where row_num <= 5";
    Table resultTable = tableEnv.sqlQuery(topNSql);
    //8.转化为流进行输出
    tableEnv.toRetractStream(resultTable, Row.class).print();
    //9.执行任务
    env.execute("hot items with sql job");















     

     

     

     

     
     

     

  • 相关阅读:
    第三个Sprint冲刺第三天
    回答第1-17章
    阅读第13-17章
    阅读第10、11、12章
    阅读第8,9,10章
    作业5.2 5.3
    四则运算 测试与封装 5.1
    阅读第5-7章
    阅读1-5章
    我给队友做的汉堡包
  • 原文地址:https://www.cnblogs.com/whdd/p/14057668.html
Copyright © 2011-2022 走看看