zoukankan      html  css  js  c++  java
  • 038 spark中使用sparksql对日志进行分析(属于小案例)

    一:使用sparksql开发

    1.sparksql开发的两种方式

      HQL:SQL语句开发

        eq : sqlContext.sql("xxxx")

      DSL : sparkSql中DataFrame的API调用方式

        eq:val df=sqlContext.xxx

           df.select("number")

    二:HQL的开发案例

    1.新建目录上传日志

      

    2.开启服务

      

    三:书写程序

    1.描述

      这个程序一共包括两个部分。

      所以写的是两个程序。

    2.程序一:对日志的描述--ApacheAccessLog

     1 package com.ibeifeng.bigdata.spark.log
     2 
     3 import scala.util.matching.Regex
     4 
     5 /**
     6    * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
     7    * Created by ibf on 01/15.
     8    */
     9 case class ApacheAccessLog(
    10                              ipAddress: String, // IP地址
    11                              clientId: String, // 客户端唯一标识符
    12                              userId: String, // 用户唯一标识符
    13                              serverTime: String, // 服务器时间
    14                              method: String, // 请求类型/方式
    15                              endpoint: String, // 请求的资源
    16                              protocol: String, // 请求的协议名称
    17                              responseCode: Int, // 请求返回值:比如:200、401
    18                              contentSize: Long // 返回的结果数据大小
    19                            )
    20 
    21 /**
    22    * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
    23    * Created by ibf on 01/15.
    24    * 提供一些操作Apache Log的工具类供SparkCore使用
    25    */
    26 object ApacheAccessLog {
    27    // Apache日志的正则
    28    val PARTTERN: Regex =
    29    """^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) (d+)""".r
    30 
    31    /**
    32      * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false
    33      *
    34      * @param line
    35      * @return
    36      */
    37    def isValidateLogLine(line: String): Boolean = {
    38      val options = PARTTERN.findFirstMatchIn(line)
    39 
    40      if (options.isEmpty) {
    41        false
    42      } else {
    43        true
    44      }
    45    }
    46 
    47    /**
    48      * 解析输入的日志数据
    49      *
    50      * @param line
    51      * @return
    52      */
    53    def parseLogLine(line: String): ApacheAccessLog = {
    54      if (!isValidateLogLine(line)) {
    55        throw new IllegalArgumentException("参数格式异常")
    56      }
    57 
    58      // 从line中获取匹配的数据
    59      val options = PARTTERN.findFirstMatchIn(line)
    60 
    61      // 获取matcher
    62      val matcher = options.get
    63 
    64      // 构建返回值
    65      ApacheAccessLog(
    66        matcher.group(1), // 获取匹配字符串中第一个小括号中的值
    67        matcher.group(2),
    68        matcher.group(3),
    69        matcher.group(4),
    70        matcher.group(5),
    71        matcher.group(6),
    72        matcher.group(7),
    73        matcher.group(8).toInt,
    74        matcher.group(9).toLong
    75      )
    76    }
    77  }

    3.程序二:针对需求进行--LogAnalysis

     1 package com.ibeifeng.bigdata.spark.log
     2 
     3 import com.ibeifeng.bigdata.spark.core.ApacheAccessLog
     4 import org.apache.spark.sql.{DataFrame, SQLContext}
     5 import org.apache.spark.{SparkContext, SparkConf}
     6 /**
     7  * Created by Administrator on 2017/4/25.
     8  */
     9 object LogAnalysis {
    10   def main(args: Array[String]):Unit={
    11     //sqlContext
    12     val conf=new SparkConf()
    13       .setMaster("local[*]")
    14       .setAppName("log-analysis-sparksql")
    15     val sc=SparkContext.getOrCreate(conf)
    16     val sqlContext=new SQLContext(sc)
    17     import sqlContext.implicits._                //如果不写,下面的转换不成功
    18 
    19     //transform
    20     val path="/spark/logs/input"
    21     val rdd=sc.textFile(path)
    22     val apacheAccessDataFrame=rdd
    23       .filter(line=>ApacheAccessLog.isValidateLogLine(line))
    24       .map(line => {
    25         ApacheAccessLog.parseLogLine(line)
    26     }).toDF()                                    //rdd转换为DataFrame
    27 
    28     //register temptable
    29     apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
    30     sqlContext.sql("select * from log_analysis_temp_table limit 1").show()
    31 
    32     //需求一:求contentSize的平均值,最大值以及最小值
    33     val resultDataFrame1=sqlContext.sql(
    34       """
    35         |SELECT
    36         |AVG(contentSize) as avg_contentSize,
    37         |MAX(contentSize) as max_contentSize,
    38         |MIN(contentSize) as min_contentSize
    39         |FROM log_analysis_temp_table
    40       """.stripMargin)
    41     resultDataFrame1.show()
    42 
    43     //save                                         //save as HDFS
    44     val resultRdd=resultDataFrame1.map(row=>{
    45       val avgSize=row.getAs[Double]("avg_contentSize")
    46       val minSize=row.getAs[Long]("min_contentSize")
    47       val maxSize=row.getAs[Long]("max_contentSize")
    48       (avgSize,minSize,maxSize)
    49     })
    50     resultRdd.saveAsTextFile(s"/spark/logs/output/sql_${System.currentTimeMillis()}")
    51 
    52     //需求二:求各个返回值出现的数据个数
    53     val resultDataFrame2=sqlContext.sql(
    54     """
    55       |SELECT
    56       |responseCode AS code,
    57       |COUNT(1) AS count
    58       |FROM log_analysis_temp_table
    59       |GROUP BY responseCode
    60     """.stripMargin
    61     )
    62     resultDataFrame2.show()
    63 
    64     //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
    65     val blackIP=Array("200-55-104-193.ds1.prima.net.ar","10.0.0.153","208-38-57-205.ip.cal.radiant.net")
    66     val N=10
    67     val resultDataFrame3=sqlContext.sql(
    68     s"""
    69       |SELECT
    70       |ipAddress AS ip,
    71       |COUNT(1) AS count
    72       |FROM log_analysis_temp_table
    73       |WHERE not(ipAddress in(${blackIP.map(ip=>s"'${ip}'").mkString(",")}))
    74       |GROUP BY ipAddress
    75       |HAVING count>${N}
    76     """.stripMargin)
    77     resultDataFrame3.show()
    78 
    79     //需求四:求访问次数最多的前k个endpoint的值
    80     val k=10
    81     val resultDataFrame4=sqlContext.sql(
    82     s"""
    83        |SELECT
    84        |  t.endpoint,
    85        |  t.count
    86        |FROM(
    87        |SELECT
    88        |  endpoint,
    89        |  COUNT(1) AS count
    90        |FROM log_analysis_temp_table
    91        |GROUP BY endpoint) t
    92        |ORDER BY t.count DESC
    93        |limit ${k}
    94      """.stripMargin)
    95     resultDataFrame4.show()
    96   }
    97 }

    4.运行结果

      

      

      

      

      

      

      

  • 相关阅读:
    ASP.NET Aries 高级开发教程:如何写WebAPI接口
    ASP.NET Aries 高级开发教程:行内编辑事件怎么新增数据到后台(番外篇)
    ASP.NET Aries 高级开发教程:表单检测字段是否已存(番外篇)
    Gemini.Workflow 双子工作流正式上线(支持.NET Core)
    Gemini.Workflow 双子工作流入门教程五:业务表单开发
    Gemini.Workflow 双子工作流入门教程四:流程应用
    Gemini.Workflow 双子工作流入门教程三:定义流程:流程节点、迁移条件参数配置
    Gemini.Workflow 双子工作流入门教程二:定义流程:流程节点介绍
    Gemini.Workflow 双子工作流入门教程一:定义流程:流程图属性
    CYQ.Data 支持分布式数据库(主从备)高可用及负载调试
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6764833.html
Copyright © 2011-2022 走看看