zoukankan      html  css  js  c++  java
  • flink(二) 电商用户行为分析(二)实时热门商品统计(计算最热门 Top N 商品)

    1.简介

      首先要实现的是实时热门商品统计,我们将会基于 UserBehavior 数据集来进行分析。
      项目主体用 Scala 编写,采用 IDEA 作为开发环境进行项目编写,采用 maven作为项目构建和管理工具。首先我们需要搭建项目框架。

    2 创建 Maven 项目

    2.1 项目框架搭建
      打开 IDEA,创建一个 maven 项目,命名为 UserBehaviorAnalysis。由于包含了多个模块,我们可以以 UserBehaviorAnalysis 作为父项目,并在其下建一个名为
    HotItemsAnalysis 的子项目,用于实时统计热门 top N 商品。
      在 UserBehaviorAnalysis 下 新 建 一 个 maven module 作 为 子 项 目 , 命 名 为HotItemsAnalysis。
      父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以 UserBehaviorAnalysis 下的 src 文件夹可以删掉。
    2.2 声明项目中工具的版本信息
      我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在最外层的 UserBehaviorAnalysis 中声明所有子模块共用的版本信息。
      在 pom.xml 中加入以下配置:
           UserBehaviorAnalysispom.xml
        <properties>
            <flink.version>1.10.0</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <kafka.version>2.2.0</kafka.version>
        </properties>
    2.3 添加项目依赖
    对 于 整 个 项 目 而 言 , 所 有 模 块 都 会 用 到 flink 相 关 的 组 件 , 所 以 我们在UserBehaviorAnalysis 中引入公有依赖:
    UserBehaviorAnalysis/pom.xml
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_${scala.binary.version}</artifactId>
                <version>${kafka.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    同样,对于 maven 项目的构建,可以引入公有的插件:
        <build>
        <plugins>
        <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <!-- 声明绑定到 maven 的 compile 阶段 -->
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>
                    jar-with-dependencies
                </descriptorRef>
            </descriptorRefs>
        </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        </plugins>
        </build>
    在 HotItemsAnalysis 子模块中,我们并没有引入更多的依赖,所以不需要改动pom 文件。
    2.4 数据准备
      在 src/main/目录下,可以看到已有的默认源文件目录是 java,我们可以将其改名为 scala。将数据文件 UserBehavior.csv 复制到资源文件目录 src/main/resources 下,
    我们将从这里读取数据。
    至此,我们的准备工作都已完成,接下来可以写代码了。

    3  模块代码实现

    计算最热门 Top N 商品
      为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount 中的 windowEnd 进行 keyBy()操作。然后使用 ProcessFunction 实现
    一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前 5 名的商品,并将排名结果格式化成字符串,便于后续输出。
     
    package com.atguigu.hotitems_analysis
    
    
    import java.util.Properties
    
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
    import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable.ListBuffer
    import scala.tools.cmd.Spec.Accumulator
    
    //定义样例类
    case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
    
    //定义窗口聚合结果的样例类
    case class ItemViewCount(itemID:Long, windowEnd:Long, count:Long)
    
    object HotItems {
      def main(args: Array[String]): Unit = {
        //创建流处理环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        //从文件读取数据
        //val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\HotItemAnalysis\src\main\resources\UserBehavior.csv")
        // 从kafka中读取数据
        val properties = new Properties()
        properties.setProperty("bootstrap.servers","192.168.1.122:9092,192.168.1.133:9092,192.168.1.144:9092")
        properties.setProperty("group.id","consumer-group")
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("auto.offset.reset", "latest")
        val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
    
    
    
        // 将数据转换成样例类类型,并提取timestamp定义watermark
        val dataStream:DataStream[UserBehavior] = inputStream
          .map(data =>{
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
          })
          .assignAscendingTimestamps(_.timestamp*1000L)
    
        // 对数据进行转换,过滤出pv行为,开窗聚合统计个数,并自定义单个窗口输出的结果
        val aggStream:DataStream[ItemViewCount] = dataStream
            .filter(_.behavior == "pv")
            .keyBy("itemId")
            .timeWindow(Time.hours(1),Time.minutes(5))
            .aggregate(new CountAgg(),new ItemCountWindowResult())
    
        //对窗口聚合结果按照窗口进行分组,并做排序取TopN输出
        val resultStream:DataStream[String] = aggStream
            .keyBy("windowEnd")
            .process(new TopNHotItem(5))
    
        resultStream.print()
    
        env.execute("hot items job")
      }
    
    }
    
    //自定义预聚合函数
    class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
      override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(accumulator: Long): Long = accumulator
    
      override def merge(a: Long, b: Long): Long = a+b
    
    }
    
    class ItemCountWindowResult() extends  WindowFunction[Long, ItemViewCount, Tuple, TimeWindow]{
      override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
        val itemId = key.asInstanceOf[Tuple1[Long]].f0
        val windEnd = window.getEnd
        val count = input.iterator.next()
        out.collect(ItemViewCount(itemId,windEnd,count))
      }
    
    }
    
    class TopNHotItem(n: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String]{
      //定义一个ListState,用来保存当前窗口所有的count结果
      lazy val itemCountListState: ListState[ItemViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list", classOf[ItemViewCount]))
    
      override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
        // 每来一条数据,就把他保存到状态中
        itemCountListState.add(value)
    
        //注册定时器,在windowEnd+100触发
        ctx.timerService().registerEventTimeTimer(value.windowEnd + 100)
    
    
      }
    
      //定时器触发时,从状态中取数据,然后排序输出
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
        // 先把状态的数据提取到一个ListBuffer中
        val allItemCountList: ListBuffer[ItemViewCount] = ListBuffer()
        import scala.collection.JavaConversions._
        for( itemCount <- itemCountListState.get()){
          allItemCountList += itemCount
        }
    
        //按照count值大小排序
        val sortedItemCountList = allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)
    
        //清除状态
        itemCountListState.clear()
    
        //将排名信息格式化成string,方便监控显示
        val result:StringBuilder = new StringBuilder
        result.append("时间: ").append(new Timestamp(timestamp - 100)).append("
    ")
    
        //遍历sorted列表,输出TopN信息
        for(i <- sortedItemCountList.indices){
          //获取当前商品的count信息
          val currentItemCount = sortedItemCountList(i)
          result.append("Top").append(i+1).append(":")
            .append(" 商品ID=").append(currentItemCount.itemID)
            .append(" 访问量=").append(currentItemCount.count)
            .append("
    ")
        }
    
        result.append("====================================
    
    ")
    
        // 控制输出频率
        Thread.sleep(1000)
        out.collect(result.toString())
      }
    
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13491073.html

  • 相关阅读:
    一、cocos2d-x 3.0 final使用httpclient编译到android,须要用到的android.mk
    lvchange的available參数
    基于谱减法的声音去噪
    ios使用openUrl进行应用跳转
    linux下ssh免密登陆
    字体图标 icon font
    hdu 3642 Get The Treasury(扫描线)
    3D游戏引擎一 win32编程
    Codeforces 112B-Petya and Square(实现)
    动态规划 is beginning。。。。。。。。。
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13491073.html
Copyright © 2011-2022 走看看