zoukankan      html  css  js  c++  java
  • flink(四) 电商用户行为分析(四)实时流量统计(二)网站独立访客数(UV)

    网站独立访客数(UV)的统计

      另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。UV指的是一段时间(比如一小时)内访问网站的总人数,1 天内同一访客的多次访问
    只记录为一个访客。通过 IP 和 cookie 一般是判断 UV 值的两种方式。当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个 Cookie,
    通常放在这个客户端电脑的 C 盘当中。在这个 Cookie 中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下
    次再访问这个服务器的时候,服务器就可以直接从你的电脑中找到上一次放进去的Cookie 文件,并且对其进行一些更新,但那个独一无二的编号是不会变的。
    当然,对于 UserBehavior 数据源来说,我们直接可以根据 userId 来区分不同的用户。
    总量统计
    package come.atguigu.networkflow_analysis
    
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.AllWindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    //定义样例类
    case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
    case class UvCount(windowEnd: Long, count: Long )
    
    object UniqueVisitor {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(4)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv")
    
    
        // 将数据转换成样例类类型,并提取timestamp定义watermark
        val dataStream: DataStream[UserBehavior] = inputStream
          .map(data => {
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)})
          .assignAscendingTimestamps(_.timestamp*1000L)
    
    
        val uvStream:DataStream[UvCount] = dataStream
          .filter(_.behavior == "pv")
          .timeWindowAll(Time.hours(1))
          .apply( new UvCountResult() )
    
        uvStream.print()
        env.execute("uv job")
      }
    }
    
    class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
      override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
        var idSet = Set[Long]()
    
        for( userBehavior <- input ){
          idSet += userBehavior.userId
        }
    
        out.collect(UvCount(window.getEnd, idSet.size))
    
    
      }
    }
    增量统计
    package come.atguigu.networkflow_analysis
    
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.function.AllWindowFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    //定义样例类
    case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
    case class UvCount(windowEnd: Long, count: Long )
    
    object UniqueVisistorIncr {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv")
    
    
        // 将数据转换成样例类类型,并提取timestamp定义watermark
        val dataStream: DataStream[UserBehavior] = inputStream
          .map(data => {
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
          })
          .assignAscendingTimestamps(_.timestamp*1000L)
    
    
        val uvStream:DataStream[UvCount] = dataStream
          .filter(_.behavior == "pv")
          .timeWindowAll(Time.hours(1))
          .aggregate(new UvCountAgg(), new UvCountResultWithIncreAgg())
    
        uvStream.print()
        env.execute("uv job")
      }
    
    }
    
    
    class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
      override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
        var idSet = Set[Long]()
    
        for( userBehavior <- input ){
          idSet += userBehavior.userId
        }
    
        out.collect(UvCount(window.getEnd, idSet.size))
    
    
      }
    }
    
    class UvCountAgg extends AggregateFunction[UserBehavior, Set[Long], Long]{
      override def add(value: UserBehavior, acc: Set[Long]): Set[Long] = acc + value.userId
    
      override def createAccumulator(): Set[Long] = Set[Long]()
    
      override def getResult(acc: Set[Long]): Long = acc.size
    
      override def merge(acc: Set[Long], acc1: Set[Long]): Set[Long] = acc ++ acc1
    }
    
    class UvCountResultWithIncreAgg extends AllWindowFunction[Long, UvCount, TimeWindow]{
      override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = {
        out.collect( UvCount(window.getEnd, input.head))
      }
    }
    布隆过滤统计
      我们把所有数据的 userId 都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,
    这种做法也没什么问题;但如果我们遇到的数据量很大呢?
      把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用 redis这种内存级 k-v 数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数
    据大到惊人呢?比如上亿级的用户,要去重计算 UV。
      如果放到 redis 中,亿级的用户 id(每个 20 字节左右的话)可能需要几 G 甚至几十 G 的空间来存储。当然放到 redis 中,用集群进行扩展也不是不可以,但明显
    代价太大了。
      一个更好的想法是,其实我们不需要完整地存储用户 ID 的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户
    的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)。
      本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilisticdata structure),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存
    在或者可能存在”。
      它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是 0,就是 1。相比于传统的 List、Set、Map 等数据结构,它更高效、占用
    空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。
      我们的目标就是,利用某种方法(一般是 Hash 函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是 1,不存在则为 0。
    注意这里我们用到了 redis 连接存取数据,所以需要加入 redis 客户端的依赖:
        <dependencies>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.8.1</version>
            </dependency>
        </dependencies>
     
    package come.atguigu.networkflow_analysis
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.triggers._
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    import redis.clients.jedis.Jedis
    
    //定义样例类
    case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
    case class UvCount(windowEnd: Long, count: Long )
    object UvWithBloomFilter {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv")
    
    
        // 将数据转换成样例类类型,并提取timestamp定义watermark
        val dataStream: DataStream[UserBehavior] = inputStream
          .map(data => {
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
          })
          .assignAscendingTimestamps(_.timestamp*1000L)
    
    
        val uvStream:DataStream[UvCount] = dataStream
          .filter(_.behavior == "pv")
            .map( data => ("uv", data.userId))
            .keyBy(_._1)
            .timeWindow(Time.hours(1))
            .trigger(new Mytrigger())
            .process(new UvCountResultWithBloomFilter())
    
        uvStream.print()
        env.execute("uv job")
      }
    }
    
    class Mytrigger() extends Trigger[(String, Long), TimeWindow]{
      override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
    
      override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult =TriggerResult.CONTINUE
    
      override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {}
    
      override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE
    
    }
    
    
    class UvCountResultWithBloomFilter extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
      var jedis: Jedis = _
      var bloom:Bloom = _
    
      override def open(parameters: Configuration): Unit = {
        jedis = new Jedis("192.168.1.122",6379)
        bloom = new Bloom(1<<30)
      }
    
      //每次一个数据,主要是要用布隆过滤器判断redis位图中对应的位置是否为1
      override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
        val storedKey = context.window.getEnd.toString
    
        val countMap = "countMap"
    
        var count = 0L
    
        if(jedis.hget(countMap, storedKey)!= null ){
          count = jedis.hget(countMap,storedKey).toLong
        }
    
        // 取userId,计算hash值,判断是否在位图中
        val userId = elements.last._2.toString
        val offset = bloom.hash(userId,61)
        val isExist = jedis.getbit(storedKey, offset)
    
        if(!isExist){
          jedis.setbit(storedKey, offset, true)
          jedis.hset(countMap, storedKey, (count+1).toString)
        }
    
      }
    }
    
    //自定义一个布隆过滤器
    class Bloom(size:Long) extends Serializable{
      // 定义位图的大小,应该是2的整次幂
      private val cap = size
    
      //实现一个hash函数
      def hash(str:String, seed:Int): Long = {
        var result = 0
        for(i <- 0 until str.length){
          result = result*seed + str.charAt(i)
        }
        // 返回一个在cap范围内的一个值
        (cap-1) & result
      }
    
    }
     

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13491507.html

  • 相关阅读:
    mongdb 备份还原导入导出
    mongodb副本集(选举,节点设置,读写分离设置)
    mongodb副本集的内部机制(借鉴lanceyan.com)
    sqlserver 登录记录(登录触发器)
    wmic命令用法小例
    mysql查询相关的命令解析
    学习笔记:APP 瘦身 & 增加bitcode支持编译第三方框架
    关于Git的一些学习笔记
    [转]Xcode中LLDB的使用
    Swift学习笔记(2):willSet与didSet
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13491507.html
Copyright © 2011-2022 走看看