zoukankan      html  css  js  c++  java
  • 视频访问量实时统计项目学习

    (一)效果图
    先来两个效果图看看

    在这里插入图片描述
                       图1
    图1
                       图2

    (二)日志产生
      图1显示的效果表示的是对于某个视频网站的访问的视频类别,做的模拟统计示意效果图,比如爱奇艺视频,对于爱奇艺视频中的每个类别的视频的访问流量:
      https://list.iqiyi.com/www/2/-24------------11-1-1-iqiyi--.html  对应的是爱奇艺视频中的电视剧中的古装剧
      https://list.iqiyi.com/www/2/-20------------11-1-1-iqiyi--.html  对应的是爱奇艺视频中的电视剧中的言情剧
      https://list.iqiyi.com/www/2/-23------------11-1-1-iqiyi--.html  对应的是爱奇艺视频下的电视剧中的家庭剧
      https://list.iqiyi.com/www/1/----------------iqiyi--.html 对应的是爱奇艺下的电影栏目
      根据这样的信息,我们可以分析了解到对于某一个视频网站下的视频分类大概的网址后缀 ‘www/2’ ‘www/1’有这样的类似的分类。
      因此可以通过程序来产生类似这样的程序。(generate.py

    #coding=UTF-8
    import random
    import time
    
    url_paths = [   //
       "/www/2",
       "/www/1",
       "/www/6",
       "/www/4",
       "/www/3",
       "/www/5",
       "/pianhua/130",
       "/toukouxu/821"
    ]
    
    status_code =[404,302,200]
    
    ip_slices=[132,156,124,10,29,167,143,187,30,100]
    
    http_referers = [
    "https://www.baidu.com/s?wd={query}",
    "https://www.sogou.com/web?qu={query}",
    "http://cn.bing.com/search?q={query}",
    "https://search.yahoo.com/search?p={query}"
    ]
    
    search_keyword = [
      "猎场",
      "快乐人生",
      "极限挑战",
      "我的体育老师",
      "幸福满院"
    ]
    #ip地址
    def sample_ip():
            slice = random.sample(ip_slices,4)
            return ".".join([str(item) for item in slice])
    def sample_url():
            return random.sample(url_paths,1)[0]
    def sample_status():
        return random.sample(status_code,1)[0]
    def sample_referer():
            if random.uniform(0,1) > 0.2:
                    return "-"
            refer_str = random.sample(http_referers,1)
            #print refer_str[0]
            query_str = random.sample(search_keyword,1)
            #print query_str[0]
            return refer_str[0].format(query=query_str[0])
    
    #产生log
    def generate_log(count=10):
            time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
            #f = open("C:\code\logs","w+")
            f = open("/home/txp/logs/log","a+")
            while count >= 1:
                    query_log = "{ip}	{localtime}	"GET {url} HTTP/1.0"	{referece}	{status1}".format(ip=sample_ip(),url=sample_url(),status1=sample_status(),referece=sample_referer(),localtime=time_str)
                    #print query_log
                    f.write(query_log+"
    ")
                    count = count-1;
    
    
    if __name__ == '__main__':
            generate_log(10)  //产生10条数据  一次
            #print "1111"
    

    产生的日志

    187.30.156.143  2018-11-03 14:55:01     "GET /www/6 HTTP/1.0"   -       200
    156.132.30.124  2018-11-03 14:56:01     "GET /toukouxu/821 HTTP/1.0"    -       404
    132.124.10.29   2018-11-03 14:56:01     "GET /www/5 HTTP/1.0"   -       404
    29.187.167.156  2018-11-03 14:56:01     "GET /toukouxu/821 HTTP/1.0"    -       200
    167.156.187.143 2018-11-03 14:56:01     "GET /www/2 HTTP/1.0"   -       404
    156.143.10.100  2018-11-03 14:56:01     "GET /www/4 HTTP/1.0"   -       200
    156.10.143.29   2018-11-03 14:56:01     "GET /www/4 HTTP/1.0"   http://cn.bing.com/search?q=快乐人生    200
    

    在这里为了和真实的场景类似可以采用linux的调度器工具每一分钟产生一批数据–linux crontab
      crontab流量
    在以上各个字段中,还可以使用以下特殊字符:

    星号(*):代表所有可能的值,例如month字段如果是星号,则表示在满足其它字段的制约条件后每月都执行该命令操作。

    逗号(,):可以用逗号隔开的值指定一个列表范围,例如,“1,2,5,7,8,9”

    中杠(-):可以用整数之间的中杠表示一个整数范围,例如“2-6”表示“2,3,4,5,6”

    正斜线(/):可以用正斜线指定时间的间隔频率,例如“0-23/2”表示每两小时执行一次。同时正斜线可以和星号一起使用,例如*/10,如果用在minute字段,表示每十分钟执行一次。

    在linux中直接运行可能会产生乱码 需要在log产生文件头上加上 #!/usr/bin/python
    因此定义一个执行文件来执行产生日志的脚本文件 logs_generator.sh
    其内容为 python /home/txp/logs/generator.py
    crondtab中的最后一行定义为: * * * * * txp source /etc/profile;~/logs/logs_generator.sh
    那么启动调度服务吧------> service crond start

    (三)日志收集
    (1)首先我们的架构如图
    在这里插入图片描述

    (2)flume监控log的产生,并且发给kafak,以及配置文件的编写

    a1.sources=src1
    a1.channels=ch1
    a1.sinks=k1
    
    #定义 sources
    a1.sources.src1.type=exec
    a1.sources.src1.command=tail -F /home/txp/logs/log
    a1.sources.src1.channels=ch1
    
    #定义channels
    a1.channels.ch1.type=memory
    a1.channels.ch1.capacity=1000
    
    #定义sinks
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic=flumeTopic
    a1.sinks.k1.brokerList=s202:9092
    a1.sinks.k1.batchSize=20
    a1.sinks.k1.requiredAcks=1
    a1.sinks.k1.channels=ch1
    
    #绑定channels
    a1.sources.src1.channels=ch1
    a1.sinks.k1.channel=ch1
    

    (3)
    Spark Streaming 是 Spark Core API 的扩展, 它支持弹性的, 高吞吐的, 容错的实时数据流的处理. 数据可以通过多种数据源获取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也可以通过例如 map, reduce, join, window 等的高级函数组成的复杂算法处理.
    在这里插入图片描述
    这里kafka源和sparkStreaming结合,进行实时的操作

        val streamingContext = new StreamingContext("local[*]", "StatStreamingApp", Seconds(5))
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "s202:9092,s203:9092,s204:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("flumeTopic")
        val logs = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        ).map(_.value())
    

    SparkStreaming从kafka中得到数据,首先要进行格式的确定
                       
    源日志为
    29.187.124.167  2018-11-03 13:12:01   “GET /www/6 HTTP/1.0”  https://search.yahoo.com/search?p=幸福满院   200
    156.124.187.29  2018-11-03 13:12:01   “GET /pianhua/130 HTTP/1.0”   -  302

    期望得到的数据  (1)以/www开头 (2)要有类目编号比如这里的6

    var cleanLog = logs.map(line => {
          var infos = line.split("	")
          var url = infos(2).split(" ")(1)
          var categoryId = 0
          if (url.startsWith("/www")) {
            categoryId = url.split("/")(2).toInt
          }
          ClickLog(infos(0), DateUtils.parseToMin(infos(1)), categoryId, infos(3), infos(4).toInt)
        }).filter(log => log.categoryId != 0)
    

    DateUtils完成的是时间格式的转换

    接下来主要完成的是统计实时的当天的视频中每个类别的访问量

    cleanLog.map(log => {
          //获取log里面的时间 类别
          (log.time.substring(0, 8) + "_" + log.categoryId, 1)
        }).reduceByKey(_ + _).foreachRDD(rdd => {
          rdd.foreachPartition(partitions => {
            val list = new ListBuffer[CategoryClickCount]
            partitions.foreach(pair => {
              list.append(CategoryClickCount(pair._1, pair._2))
            })
            CategoryClickCountDao.save(list)
          })
        })
    

    CategoryClickCountDao主要完成的是将日志保存到hbase中,从hbase中获取数据等等操作

    cleanLog.map(log => {
          val url_refer=log.refer
          var host=""
          if(url_refer=="-"){
              host="AiQiYi"  //来自爱奇艺视频自己
          }else{
            val url = url_refer.replace("//", "/")
            val splits = url.split("/")
            if (splits.length > 2) {
              host = splits(1) //得到www.sogou.com
            }
          }
          (host, log.time)
        }).filter(x => x._1 != "").map(x => {
          (x._2.substring(0, 8) + "_" + x._1, 1)
        }).reduceByKey(_ + _).foreachRDD(rdd => {
          rdd.foreachPartition(partitions => {
            val list = new ListBuffer[CategorySerachCount]
            partitions.foreach(pair => {
              list.append(CategorySerachCount(pair._1, pair._2))
            })
            CategorySearchCountDao.save(list)
          })
        })
    

    这段代码完成的是将收集的日志分析得到视频访问的来源,是来自于直接访问(如在爱奇艺中直接得到),还是其他的搜索引擎得到的,转换得到一定的格式存储到hbase数据库中。
    以上完成日志收集到hbase中。

    (4)可视化展示
    主要采用的是echart中的饼图来完成,配合使用springboot完成前台展示。
    展示的部分不再叙述

    代码可参考:github代码
    代码中 SparkStudy为访问数据的实时收集
        SparkWebLearning为前台展示实时展示

  • 相关阅读:
    luffy-短信接口频率限制,以及前端发送,和注册
    腾讯云短信开发
    luffy-登录注册页面
    vue之vue-cookies 获取、设置以及删除指定的cookies
    pycharm操作git
    luffy的前端主页
    vue路由跳转的方式
    django 中的Model继承
    跨域请求CORS详解
    vue:实现图书购物车
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10259512.html
Copyright © 2011-2022 走看看