网站独立访客数(UV)的统计
另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。UV指的是一段时间(比如一小时)内访问网站的总人数,1 天内同一访客的多次访问
只记录为一个访客。通过 IP 和 cookie 一般是判断 UV 值的两种方式。当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个 Cookie,
通常放在这个客户端电脑的 C 盘当中。在这个 Cookie 中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下
次再访问这个服务器的时候,服务器就可以直接从你的电脑中找到上一次放进去的Cookie 文件,并且对其进行一些更新,但那个独一无二的编号是不会变的。
当然,对于 UserBehavior 数据源来说,我们直接可以根据 userId 来区分不同的用户。
总量统计
package come.atguigu.networkflow_analysis import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector //定义样例类 case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) case class UvCount(windowEnd: Long, count: Long ) object UniqueVisitor { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv") // 将数据转换成样例类类型,并提取timestamp定义watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}) .assignAscendingTimestamps(_.timestamp*1000L) val uvStream:DataStream[UvCount] = dataStream .filter(_.behavior == "pv") .timeWindowAll(Time.hours(1)) .apply( new UvCountResult() ) uvStream.print() env.execute("uv job") } } class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = { var idSet = Set[Long]() for( userBehavior <- input ){ idSet += userBehavior.userId } out.collect(UvCount(window.getEnd, idSet.size)) } }
增量统计
package come.atguigu.networkflow_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector //定义样例类 case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) case class UvCount(windowEnd: Long, count: Long ) object UniqueVisistorIncr { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv") // 将数据转换成样例类类型,并提取timestamp定义watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) val uvStream:DataStream[UvCount] = dataStream .filter(_.behavior == "pv") .timeWindowAll(Time.hours(1)) .aggregate(new UvCountAgg(), new UvCountResultWithIncreAgg()) uvStream.print() env.execute("uv job") } } class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = { var idSet = Set[Long]() for( userBehavior <- input ){ idSet += userBehavior.userId } out.collect(UvCount(window.getEnd, idSet.size)) } } class UvCountAgg extends AggregateFunction[UserBehavior, Set[Long], Long]{ override def add(value: UserBehavior, acc: Set[Long]): Set[Long] = acc + value.userId override def createAccumulator(): Set[Long] = Set[Long]() override def getResult(acc: Set[Long]): Long = acc.size override def merge(acc: Set[Long], acc1: Set[Long]): Set[Long] = acc ++ acc1 } class UvCountResultWithIncreAgg extends AllWindowFunction[Long, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = { out.collect( UvCount(window.getEnd, input.head)) } }
布隆过滤统计
我们把所有数据的 userId 都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,
这种做法也没什么问题;但如果我们遇到的数据量很大呢?
把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用 redis这种内存级 k-v 数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数
据大到惊人呢?比如上亿级的用户,要去重计算 UV。
如果放到 redis 中,亿级的用户 id(每个 20 字节左右的话)可能需要几 G 甚至几十 G 的空间来存储。当然放到 redis 中,用集群进行扩展也不是不可以,但明显
代价太大了。
一个更好的想法是,其实我们不需要完整地存储用户 ID 的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户
的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)。
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilisticdata structure),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存
在或者可能存在”。
它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是 0,就是 1。相比于传统的 List、Set、Map 等数据结构,它更高效、占用
空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。
我们的目标就是,利用某种方法(一般是 Hash 函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是 1,不存在则为 0。
注意这里我们用到了 redis 连接存取数据,所以需要加入 redis 客户端的依赖:
<dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> </dependencies>
package come.atguigu.networkflow_analysis import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers._ import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis //定义样例类 case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) case class UvCount(windowEnd: Long, count: Long ) object UvWithBloomFilter { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream:DataStream[String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\NetworkFlowAnalysis\src\main\resources\UserBehavior.csv") // 将数据转换成样例类类型,并提取timestamp定义watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) val uvStream:DataStream[UvCount] = dataStream .filter(_.behavior == "pv") .map( data => ("uv", data.userId)) .keyBy(_._1) .timeWindow(Time.hours(1)) .trigger(new Mytrigger()) .process(new UvCountResultWithBloomFilter()) uvStream.print() env.execute("uv job") } } class Mytrigger() extends Trigger[(String, Long), TimeWindow]{ override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult =TriggerResult.CONTINUE override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {} override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE } class UvCountResultWithBloomFilter extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{ var jedis: Jedis = _ var bloom:Bloom = _ override def open(parameters: Configuration): Unit = { jedis = new Jedis("192.168.1.122",6379) bloom = new Bloom(1<<30) } //每次一个数据,主要是要用布隆过滤器判断redis位图中对应的位置是否为1 override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { val storedKey = context.window.getEnd.toString val countMap = "countMap" var count = 0L if(jedis.hget(countMap, storedKey)!= null ){ count = jedis.hget(countMap,storedKey).toLong } // 取userId,计算hash值,判断是否在位图中 val userId = elements.last._2.toString val offset = bloom.hash(userId,61) val isExist = jedis.getbit(storedKey, offset) if(!isExist){ jedis.setbit(storedKey, offset, true) jedis.hset(countMap, storedKey, (count+1).toString) } } } //自定义一个布隆过滤器 class Bloom(size:Long) extends Serializable{ // 定义位图的大小,应该是2的整次幂 private val cap = size //实现一个hash函数 def hash(str:String, seed:Int): Long = { var result = 0 for(i <- 0 until str.length){ result = result*seed + str.charAt(i) } // 返回一个在cap范围内的一个值 (cap-1) & result } }