zoukankan      html  css  js  c++  java
  • 基于Spark的网站日志分析

    本文只展示核心代码,完整代码见文末链接。

    Web Log Analysis

    1. 提取需要的log信息,包括time, traffic, ip, web address
    2. 进一步解析第一步获得的log信息,如把ip转换为对应的省份,从网址中提取出访问内容和内容ID,最后将信息转换为parquet格式。

    (1)按日期和内容(video)的ID进行分组,并根据访问次数进行倒序排序。
    (2)按日期,内容(video)的ID和省份进行分组,并根据访问次数排名取前3。
    最后将(1)和(2)数据写入MySQL。

    注意:(1)写入数据库时分partition写入,而非逐条写入。
    (2)先filter出公用的df并进行cache
    (3)下面代码应该能进一步优化,例如将videoAccessTopNStat的try/catch中生成partition list和StatDAO.inserDayVideoAccessTopN(list)中生成batch应该可以合并,避免两次遍历。

    设计和编写思路:
    1.设计输入参数args(如inputPath和outputPath)
    2.设计转换的工具类,包括StructType(需要提取什么信息,分别是什么格式),parseLog(split并提取各index的信息,用try/catch包裹,设置默认输出)。其中对时间的提取可另外定义一个工具类,包括inputFormat,outputFormat,getTime和parse。而对地域的提取,可另外定义一个IpUtils,引入开源代码ipdatabase。这些工具类写完后都要在自身main方法中测试。最后生成DF。
    3.filter出commonDF。
    4.实现特定的数据统计
    5.输出数据,如果写入MySQL,就另外创建一个StatDAO类,包括获取链接,分批写入数据和release链接。

    //Step One:
    
    /**
      * 将原始日志数据进行解析,返回信息包括visit time, url, traffic, ip
      * @param .log, example: 183.162.52.7 - - [10/Nov/2016:00:01:02 +0800] 
      * "POST /api3/getadv HTTP/1.1" ...
      * @return partitioned files, example: 1970-01-01 08:00:00	-
      * 	813	183.162.52.7
      */
    
    if (args.length != 2) {
      println("Usage: logCleanYarn <inputPath> <outputPath>")
      System.exit(1)
    }
    
    val Array(inputPath, outputPath) = args
    
    val spark = SparkSession.builder().getOrCreate()
    
    val access = spark.sparkContext.textFile(inputPath)
    
    //access.take(10).foreach(println)
    
    val splited = access.map(line => {
    
       val splits = line.split(" ")
       val ip = splits(0)
       val time = splits(3) + " " + splits(4)
       val url = splits(11).replaceAll(""", "") //remove quotation mark
       val traffic = splits(9)
    // (ip, DataUtils.parse(time), url, traffic)
    
       DataUtils.parse(time) + "	" + url + "	" + traffic + "	" + ip
        })
    
    splited.saveAsTextFile(outputPath)
    
    spark.stop()
    
    /**
      * 用于解析日志时间
      */
    object DataUtils {
    
      //input_format: [10/Nov/2016:00:01:02 +0800]
      val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:SS Z", Locale.ENGLISH)
    
      //output_format: yyyy-MM-dd HH:mm:ss
      val TARGET_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
    
      def getTime(time: String) = {
        try {
          YYYYMMDDHHMM_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
        } catch {
          case _ => 0l
        }
      }
        
      /**
      * example: [10/Nov/2016:00:01:02 +0800] ==> 2016-11-10 00:01:00
      */
      def parse(time: String) = {
        TARGET_FORMAT.format(new Date(getTime(time)))
      }
    
    //  def main(args: Array[String]): Unit = {
    //    println(parse("[10/Nov/2016:00:01:02 +0800]"))
    //  }
    }
    
    //Step Two:
    
    /**
      * 将第一步解析出来的数据转化为DataFrame,并保存为一份parquet文件。
      */
    
    if (args.length != 2) {
      println("Usage: logCleanYarn <inputPath> <outputPath>")
      System.exit(1)
    }
    
    val Array(inputPath, outputPath) = args
    
    val spark = SparkSession.builder().getOrCreate()
    
    val access = spark.sparkContext.textFile(inputPath)
    
    // access.take(10).foreach(println)
    
    val accessDF = spark.createDataFrame(access.map(line => AccessConvertUtil.parseLog(line)), AccessConvertUtil.struct)
    
    // accessDF.printSchema()
    // accessDF.show(false)
    
    accessDF.coalesce(1).write.format("parquet").partitionBy("day")
          .save(outputPath)
    
    spark.stop()
    
    /**
      * 工具类,定义了schema和进一步解析log的方法
      */
    object AccessConvertUtil {
    
      val struct = StructType(Seq(
        StructField("url", StringType),
        StructField("cmsType", StringType),
        StructField("cmsId", IntegerType),
        StructField("traffic", IntegerType),
        StructField("ip", StringType),
        StructField("city", StringType),
        StructField("time", StringType),
        StructField("day", StringType)
      ))
    
      /**
        * 进一步解析log,如转化数据类型,解析网址,ip映射具体省份,最后以Row输出
        */
      def parseLog(log: String) = {
    
        try{
          val splited = log.split("	")
    
          val url = splited(1)
          val traffic = splited(2).toInt
          val ip = splited(3)
    
          // 网址:"http://www.xxx.com/article/101"中article为网页内容,101为article的ID
          val domain = "http://www.xxx.com/"
          val cms = url.substring(url.indexOf(domain) + domain.length)
          val cmsTypeId = cms.split("/")
    
          var cmsType = ""
          var cmsId = 0
          if (cmsTypeId.length > 1) {
            cmsType = cmsTypeId(0)
            cmsId = cmsTypeId(1).toInt
          }
    
          val city = IpUtils.getCity(ip)
          val time = splited(0)
          val day = time.substring(0, 10).replaceAll("-", "")
    
          Row(url, cmsType, cmsId, traffic, ip, city, time, day)
        } catch {
          case _ => {
            Row(null, null, null, null, null, null, null, null)
          }
        }
      }
    }
    
    
    /**
      * Ip工具类,将IP映射为省份,利用开源代码ipdatabase
      * https://github.com/wzhe06/ipdatabase
      */
    object IpUtils {
    
      def getCity(ip: String) = {
        IpHelper.findRegionByIp(ip)
      }
    
      def main(args: Array[String]): Unit = {
        println(getCity("58.30.15.255"))
      }
    }
    
    //Step Three:
    
    /**
      * 在第二步的结果数据中,按日期和video的ID进行分组,并根据访问次数进行倒序排序。
      * 最后将数据写入MySQL。
      */
    
    if (args.length != 2) {
      println("Usage: logCleanYarn <inputPath> <day>")
      System.exit(1)
    }
    
    val Array(inputPath, day) = args
    
    val spark = SparkSession.builder()
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
      .getOrCreate()
    
    val accessDF = spark.read.format("parquet").load(inputPath)
    
    //    accessDF.printSchema()
    //    accessDF.show(false)
    
    //预先筛选和cache后面两个函数要复用的df
    import spark.implicits._
    val commonDF = accessDF.filter($"day" === day && $"cmsType" === "video")
    commonDF.cache()
    
    //删除已有的内容,避免重复
    StatDAO.deleteData(day)
    
    //groupBy video
    videoAccessTopNStat(spark, commonDF)
    
    //groupBy city
    cityAccessTopNStat(spark, commonDF)
    
    commonDF.unpersist(true)
    
    //    videoAccessTopDF.show(false)
    
    spark.stop()
    
    /**
      * 两个样例类,用于储存不同数据类型,应用于下面两个方法。
      */
    case class DayVideoAccessStat(day: String, cmsId: Long, times: Long)
    case class DayCityVideoAccessStat(day: String, cmsId: Long, city: String, times: Long, timesRank: Int)
    
    /**
      * 按内容ID分组后排序,并把结果写到Mysql
      */
    def videoAccessTopNStat(spark: SparkSession, comDF: DataFrame): Unit = {
    
      import spark.implicits._
      val videoAccessTopNStat = comDF
        .groupBy($"day", $"cmsId")
        .agg(count("cmsId").as("times"))
        .orderBy(desc("times"))
    
      try {
        videoAccessTopNStat.foreachPartition(partitionOfRecords =>{
          val list = new ListBuffer[DayVideoAccessStat]
    
          partitionOfRecords.foreach(info => {
            val day = info.getAs[String]("day")
            val cmsId = info.getAs[Long]("cmsId")
            val times = info.getAs[Long]("times")
    
            list.append(DayVideoAccessStat(day, cmsId, times))
          })
    
          StatDAO.inserDayVideoAccessTopN(list)
        })
      } catch {
        case e:Exception => e.printStackTrace()
      }
    }
    
    /**
      * 按内容ID和省份分组后排名,并把结果写到Mysql
      */
    def cityAccessTopNStat(spark: SparkSession, comDF: DataFrame): Unit = {
    
      import spark.implicits._
    
      val videoAccessTopNStat = comDF
        .groupBy($"day", $"city", $"cmsId")
        .agg(count("cmsId").as("times"))
    
      val windowSpec = Window.partitionBy($"city").orderBy(desc("times"))
      val videoAccessTopNStatDF = videoAccessTopNStat.select(expr("*"), rank().over(windowSpec).as("times_rank"))
        .filter($"times_rank" <= 3)
    
      try {
        videoAccessTopNStatDF.foreachPartition(partitionOfRecords => {
          val list = new ListBuffer[DayCityVideoAccessStat]
    
          partitionOfRecords.foreach(info => {
            val day = info.getAs[String]("day")
            val cmsId = info.getAs[Long]("cmsId")
            val city = info.getAs[String]("city")
            val times = info.getAs[Long]("times")
            val timesRank = info.getAs[Int]("times_rank")
    
            list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
          })
    
          StatDAO.inserDayCityVideoAccessTopN(list)
        })
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
    
    /**
      * 分组后排序方法
      */
    def videoAccessSortedStat(spark: SparkSession, accessDF: DataFrame) : Unit = {
      import spark.implicits._
        
      val sortedStat= accessDF
        .filter($"day" === "20170511" && $"cmsType" === "video")
        .groupBy($"day", $"cmsId")
        .agg(count("cmsId").as("times"))
        .orderBy(desc("times"))
        
      // 分块创建存储每条信息的list,并调用函数将数据写到到MySQL
      try {
          sortedStat.foreachPartition(partitionOfRecords =>{
            val list = new ListBuffer[DayVideoAccessStat]
    
            partitionOfRecords.foreach(info => {
              val day = info.getAs[String]("day")
              val cmsId = info.getAs[Long]("cmsId")
              val times = info.getAs[Long]("times")
    
              list.append(DayVideoAccessStat(day, cmsId, times))
            })
    
            StatDAO.inserDayVideoAccessSortedStat(list)
          })
       } catch {
      case e:Exception => e.printStackTrace()
     }
    }
    
    //Step Three:
    
    /**
      * 工具类,提供两类方法:
      * 1.连接数据库,将数据写入MySQL,并释放连接的方法。
      * 2.删除MySQL中已存在的(相同entry的数据)
      */
    object StatDAO {
    
      def inserDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {
    
        var connection: Connection = null
        var pstmt: PreparedStatement = null
    
        try{
          connection = MySQLUtils.getConnect()
    
          val sql = "insert into day_video_access_topn_stat(day, cms_id, times) values (?, ?, ?)"
          val pstmt = connection.prepareStatement(sql)
    
          connection.setAutoCommit(false)
    
          for (ele <- list) {
            pstmt.setString(1, ele.day)
            pstmt.setLong(2, ele.cmsId)
            pstmt.setLong(3, ele.times)
    
            pstmt.addBatch()
          }
    
          pstmt.executeBatch()
          connection.commit()
    
        } catch {
          case e:Exception => e.printStackTrace()
        } finally {
          MySQLUtils.release(connection, pstmt)
        }
      }
    
      def inserDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {
    
        var connection: Connection = null
        var pstmt: PreparedStatement = null
    
        try{
          connection = MySQLUtils.getConnect()
    
          val sql = "insert into day_video_city_access_topn_stat(day, cms_id, city, times, times_rank) values (?, ?, ?, ?, ?)"
          val pstmt = connection.prepareStatement(sql)
    
          connection.setAutoCommit(false)
    
          for (ele <- list) {
            pstmt.setString(1, ele.day)
            pstmt.setLong(2, ele.cmsId)
            pstmt.setString(3, ele.city)
            pstmt.setLong(4, ele.times)
            pstmt.setInt(5, ele.timesRank)
    
            pstmt.addBatch()
          }
    
          pstmt.executeBatch()
          connection.commit()
    
        } catch {
          case e:Exception => e.printStackTrace()
        } finally {
          MySQLUtils.release(connection, pstmt)
        }
      }
    
      def deleteData(day: String): Unit = {
    
        val tables = Array("day_video_access_topn_stat", "day_video_city_access_topn_stat")
        var connection: Connection = null
        var pstmt: PreparedStatement = null
    
        try {
          connection = MySQLUtils.getConnect()
    
          for (table <- tables) {
            val sql = s"delete from $table where day = ?"
            val pstmt = connection.prepareStatement(sql)
            pstmt.setString(1, day)
            pstmt.executeUpdate()
    
          }
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          MySQLUtils.release(connection, pstmt)
        }
    
      }
    }
    
    /**
      * 工具类,包含连接数据库和释放连接的方法。
      */
    object MySQLUtils {
    
      def getConnect() = {
          DriverManager.getConnection("jdbc:mysql://localhost:3306/log_project","root", "password")
      }
    
      def release(connection: Connection, pstmt: PreparedStatement): Unit ={
        try{
          if (pstmt != null) {
            pstmt.close()
          }
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (connection != null) {
            connection.close()
          }
        }
      }
    
      def main(args: Array[String]): Unit = {
        println(getConnect())
      }
    }
    

    参考:
    大数据 Spark SQL慕课网日志分析
    GitHub源码

  • 相关阅读:
    python网络编程-动态导入和断言
    python网络编程-socket“粘包”(小数据发送问题)
    python网络编程-socket样例
    python网络编程-socket
    python基础-实现进度条功能,for和yield实现
    精通特征工程笔记(一)
    zcmu 1540第k大数
    论文笔记 : NCF( Neural Collaborative Filtering)
    论文笔记:DeepCF
    论文笔记: Deep Learning based Recommender System: A Survey and New Perspectives
  • 原文地址:https://www.cnblogs.com/code2one/p/9872597.html
Copyright © 2011-2022 走看看