zoukankan      html  css  js  c++  java
  • flink(三) 电商用户行为分析(三)实时流量统计(一)热门页面浏览量、网站总浏览量

    1 模块创建和数据准备

      在 UserBehaviorAnalysis 下 新 建 一 个 maven module 作 为 子 项 目 , 命 名 为NetworkFlowAnalysis。在这个子模块中,我们同样并没有引入更多的依赖,所以也
    不需要改动 pom 文件。
      在 src/main/目录下,将默认源文件目录 java 改名为 scala。将 apache 服务器的日志文件 apache.log 复制到资源文件目录 src/main/resources 下,我们将从这里读取
    数据。
      当然,我们也可以仍然用 UserBehavior.csv 作为数据源,这时我们分析的就不是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”)操作。

    2 基于服务器 log 的热门页面浏览量统计

      我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,
    可以简单地从 web 服务器的日志中提取出来
     
      我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行 log,统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示。
    具体做法为:每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此
    前的代码。
    package come.atguigu.networkflow_analysis
    
    import java.text.SimpleDateFormat
    
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable.ListBuffer
    
    case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
    
    case class PageViewCount(url:String, windowEnd:Long, count:Long )
    
    object NetworkTopNPage {
      def main(args: Array[String]): Unit = {
        //创建流处理环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\apache.log")
    
        val dataStream:DataStream[ApacheLogEvent] = inputStream
          .map(data =>{
            val dataArray = data.split(" ")
    
            val simpleDataFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
            val timestamp = simpleDataFormat.parse(dataArray(3)).getTime
    
            ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(60)) {
            override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
          })
    
        val aggStream = dataStream
          .keyBy(_.url)
          .timeWindow(Time.minutes(10),Time.seconds(5))
          .aggregate(new PageCountAgg(), new PageCountWindowResult())
    
        val resultStream = aggStream
          .keyBy(_.windowEnd)
          .process(new TopNHotPage(3))
    
        resultStream.print()
        env.execute("top n page job")
    
      }
    
    }
    
    class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
      override def add(in: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(accumulator: Long): Long = accumulator
    
      override def merge(a: Long, b: Long): Long = a+b
    
    }
    
    class PageCountWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
        out.collect(PageViewCount(key, window.getEnd, input.head))
      }
    }
    
    class TopNHotPage(n: Int) extends  KeyedProcessFunction[Long, PageViewCount, String]{
      lazy val pageCountListState: ListState[PageViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[PageViewCount]("pagecount-list", classOf[PageViewCount]))
      override def processElement(value: PageViewCount, context: KeyedProcessFunction[Long, PageViewCount, String]#Context, collector: Collector[String]): Unit = {
        pageCountListState.add(value)
        context.timerService().registerEventTimeTimer(value.windowEnd+1)
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
        // 获取收到的所有 URL 访问量
        val allPageCountList: ListBuffer[PageViewCount] = ListBuffer()
        val iter = pageCountListState.get().iterator()
        while(iter.hasNext){
          allPageCountList += iter.next()
        }
        // 提前清除状态中的数据,释放空间
        pageCountListState.clear()
    
        // 按照访问量从大到小排序
        val sortedPageCountList = allPageCountList.sortWith(_.count > _.count).take(n)
    
        //将排名信息格式化成string,方便监控显示
        val result:StringBuilder = new StringBuilder
        result.append("时间: ").append(new Timestamp(timestamp - 1)).append("
    ")
    
        //遍历sorted列表,输出TopN信息
        for(i <- sortedPageCountList.indices){
          //获取当前商品的count信息
          val currentItemCount = sortedPageCountList(i)
          result.append("Top").append(i+1).append(":")
            .append(" 页面url").append(currentItemCount.url)
            .append(" 访问量=").append(currentItemCount.count)
            .append("
    ")
        }
    
        result.append("====================================
    
    ")
    
        // 控制输出频率
        Thread.sleep(1000)
        out.collect(result.toString())
      }
    }

    基于时间延迟的代码

    package come.atguigu.networkflow_analysis
    
    import java.text.SimpleDateFormat
    
    import com.sun.jmx.snmp.Timestamp
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable.ListBuffer
    
    
    case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
    case class PageViewCount(url:String, windowEnd:Long, count:Long )
    
    object NetworkTopNPageLateness {
      def main(args: Array[String]): Unit = {
        //创建流处理环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        //val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\apache.log")
        val inputStream = env.socketTextStream("localhost", 777)
    
        val dataStream:DataStream[ApacheLogEvent] = inputStream
          .map(data =>{
            val dataArray = data.split(" ")
            val simpleDataFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
            val timestamp = simpleDataFormat.parse(dataArray(3)).getTime
    
            ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(60)) {
            override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
          })
    
        val lateOutputTag = new OutputTag[ApacheLogEvent]("late data")
    
        val aggStream = dataStream
          .keyBy(_.url)
          .timeWindow(Time.minutes(10),Time.seconds(5))
          .allowedLateness(Time.minutes(1))
          .sideOutputLateData(lateOutputTag)
          .aggregate(new PageCountAgg(), new PageCountWindowResult())
    
        val lateDataStream = aggStream.getSideOutput(lateOutputTag)
    
        val resultStream = aggStream
          .keyBy(_.windowEnd)
          .process(new TopNHotPage(3))
    
    
        dataStream.print("data")
        aggStream.print("agg")
        lateDataStream.print("late")
        resultStream.print("result")
        env.execute("top n page job")
    
      }
    
    }
    
    
    class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
      override def add(in: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(accumulator: Long): Long = accumulator
    
      override def merge(a: Long, b: Long): Long = a+b
    
    }
    
    class PageCountWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
        out.collect(PageViewCount(key, window.getEnd, input.head))
      }
    }
    
    class TopNHotPage(n: Int) extends  KeyedProcessFunction[Long, PageViewCount, String]{
      lazy val pageCountMapState: MapState[String, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pagecount-map", classOf[String], classOf[Long]))
      override def processElement(value: PageViewCount, context: KeyedProcessFunction[Long, PageViewCount, String]#Context, collector: Collector[String]): Unit = {
        pageCountMapState.put(value.url, value.count)
        context.timerService().registerEventTimeTimer(value.windowEnd+1)
        context.timerService().registerEventTimeTimer(value.windowEnd+60*1000L)
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
        if(timestamp == ctx.getCurrentKey + 60*1000L){
    
          pageCountMapState.clear()
          return
        }
    
        val allPageCountList: ListBuffer[(String, Long)] = ListBuffer()
        val iter = pageCountMapState.entries().iterator()
        while(iter.hasNext){
          val entry = iter.next()
          allPageCountList += ((entry.getKey, entry.getValue))
        }
    
    
        val sortedPageCountList = allPageCountList.sortWith(_._2 > _._2 ).take(n)
    
        //将排名信息格式化成string,方便监控显示
        val result:StringBuilder = new StringBuilder
        result.append("时间: ").append(new Timestamp(timestamp - 1)).append("
    ")
    
        //遍历sorted列表,输出TopN信息
        for(i <- sortedPageCountList.indices){
          //获取当前商品的count信息
          val currentItemCount = sortedPageCountList(i)
          result.append("Top").append(i+1).append(":")
            .append(" 页面url").append(currentItemCount._1)
            .append(" 访问量=").append(currentItemCount._2)
            .append("
    ")
        }
    
        result.append("====================================
    
    ")
    
        // 控制输出频率
        Thread.sleep(1000)
        out.collect(result.toString())
      }
    }

    3 基于埋点日志数据的网络流量统计

      我们发现,从 web 服务器 log 中得到的 url,往往更多的是请求某个资源地址(/*.js、/*.css),如果要针对页面进行统计往往还需要进行过滤。而在实际电商应
    用中,相比每个单独页面的访问量,我们可能更加关心整个电商网站的网络流量。这个指标,除了合并之前每个页面的统计结果之外,还可以通过统计埋点日志数据
    中的“pv”行为来得到。
     
    3.1 网站总浏览量(PV)的统计
     
      衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录 1 次 PV,多次打开同一页面则浏览量累计。
    一般来说,PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的 PV。
      我们知道,用户浏览页面时,会从浏览器向网络服务器发出一个请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,
    从而产生了一个 PV。所以我们的统计方法,可以是从 web 服务器的日志中去提取对应的页面访问然后统计,就向上一节中的做法一样;也可以直接从埋点日志中提
    取用户发来的页面请求,从而统计出总浏览量。
      所以,接下来我们用 UserBehavior.csv 作为数据源,实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站 PV。
    package come.atguigu.networkflow_analysis
    
    import java.util.Properties
    
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.util.Collector
    
    import scala.tools.cmd.Spec.Accumulator
    
    //定义样例类
    case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
    
    case class PvCount(windowEnd:Long, count:Long)
    
    
    object PageView {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv")
    
    
        // 将数据转换成样例类类型,并提取timestamp定义watermark
        val dataStream:DataStream[UserBehavior] = inputStream
          .map(data =>{
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)})
          .assignAscendingTimestamps(_.timestamp*1000L)
    
    
        val pvStream:DataStream[PvCount] = dataStream
          .filter(_.behavior == "pv")
          .map(data => ("pv", 1L))
          .keyBy(_._1)
          .timeWindow(Time.hours(1))
          .aggregate(new PvCountAgg(), new PvCountResult())
    
        pvStream.print()
        env.execute("pv job")
      }
    
    }
    
    class PvCountAgg() extends AggregateFunction[(String, Long), Long, Long]{
      override def add(in: (String, Long), acc: Long): Long = acc + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(acc: Long): Long = acc
    
      override def merge(acc: Long, acc1: Long): Long = acc + acc1
    }
    
    class PvCountResult() extends WindowFunction[Long, PvCount, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
        out.collect(PvCount(window.getEnd, input.head))
      }
    }

    性能优化后代码

    package come.atguigu.networkflow_analysis
    
    import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import scala.util.Random
    
    
    //定义样例类
    case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
    
    case class PvCount(windowEnd:Long, count:Long)
    
    object PageViewOp {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(4)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv")
    
    
        // 将数据转换成样例类类型,并提取timestamp定义watermark
        val dataStream: DataStream[UserBehavior] = inputStream
          .map(data => {
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)})
          .assignAscendingTimestamps(_.timestamp*1000L)
    
    
        val pvStream:DataStream[PvCount] = dataStream
          .filter(_.behavior == "pv")
          .map(new MyMapper()) // 主要是为了能够将数据分区进而利用并行计算以及防止数据倾斜
          .keyBy(_._1)
          .timeWindow(Time.hours(1))
          .aggregate(new PvCountAgg(), new PvCountResult())
    
        val pvTotalStream:DataStream[PvCount] = pvStream
            .keyBy(_.windowEnd)
            .process(new TotalPvCountResult())
    
        pvTotalStream.print()
        env.execute("pv job")
      }
    
    }
    
    
    class PvCountAgg() extends AggregateFunction[(String, Long), Long, Long]{
      override def add(in: (String, Long), acc: Long): Long = acc + 1
    
      override def createAccumulator(): Long = 0L
    
      override def getResult(acc: Long): Long = acc
    
      override def merge(acc: Long, acc1: Long): Long = acc + acc1
    }
    
    class PvCountResult() extends WindowFunction[Long, PvCount, String, TimeWindow]{
      override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
        out.collect(PvCount(window.getEnd, input.head))
      }
    }
    
    class MyMapper() extends MapFunction[UserBehavior, (String, Long)]{
      override def map(value: UserBehavior): (String, Long) = (Random.nextString(10), 1L)
    
    }
    
    class TotalPvCountResult() extends KeyedProcessFunction[Long, PvCount, PvCount]{
      lazy val totalCountState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-count", classOf[Long]))
      override def processElement(value: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
        totalCountState.update(totalCountState.value() + value.count)
        context.timerService().registerEventTimeTimer(value.windowEnd + 1)
    
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
        out.collect(PvCount(ctx.getCurrentKey, totalCountState.value()))
        totalCountState.clear()
      }
    }

     

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13491252.html

  • 相关阅读:
    记录「十一月做题记录」
    题解「GMOJ6898 【2020.11.27提高组模拟】第二题」
    题解「CFGYM102331B Bitwise Xor」
    题解「Japan Alumni Group Summer Camp 2018 Day 2J AB Sort」
    题解「AGC048B Bracket Score」
    题解「中位数之中位数 median」
    记录「十月做题记录」
    测试「2020牛客NOIP赛前集训营-提高组(第五场)」
    测试「20201028测试总结」
    定时提醒助手
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13491252.html
Copyright © 2011-2022 走看看