zoukankan      html  css  js  c++  java
  • 024 关于spark中日志分析案例

    1.四个需求

      需求一:求contentsize的平均值、最小值、最大值

      需求二:请各个不同返回值的出现的数据 ===> wordCount程序

      需求三:获取访问次数超过N次的IP地址

      需求四:获取访问次数最多的前K个endpoint的值 ==> TopN

    2.主程序LogAnalyzer.scala

      1 package com.ibeifeng.bigdata.spark.core.log
      2 
      3 import org.apache.spark.rdd.RDD
      4 import org.apache.spark.{SparkConf, SparkContext}
      5 
      6 /**
      7   * Apache日志分析
      8   * Created by ibf on 01/15.
      9   */
     10 object LogAnalyzer {
     11   def main(args: Array[String]): Unit = {
     12     val conf = new SparkConf()
     13       .setAppName("log-analyzer")
     14       .setMaster("local[*]")
     15       .set("spark.eventLog.enabled", "true")
     16       .set("spark.eventLog.dir", "hdfs://hadoop-senior01:8020/spark-history")
     17     val sc = SparkContext.getOrCreate(conf)
     18 
     19     // ================日志分析具体代码==================
     20     // HDFS上日志存储路径
     21     val path = "/beifeng/spark/access/access.log"
     22 
     23     // 创建rdd
     24     val rdd = sc.textFile(path)
     25 
     26     // rdd转换,返回进行后续操作
     27     val apacheAccessLog: RDD[ApacheAccessLog] = rdd
     28       // 过滤数据
     29       .filter(line => ApacheAccessLog.isValidateLogLine(line))
     30       .map(line => {
     31         // 对line数据进行转换操作
     32         ApacheAccessLog.parseLogLine(line)
     33       })
     34 
     35     // 对多次时候用的rdd进行cache
     36     apacheAccessLog.cache()
     37 
     38     // 需求一:求contentsize的平均值、最小值、最大值
     39     /*
     40     * The average, min, and max content size of responses returned from the server.
     41     * */
     42     val contentSizeRDD: RDD[Long] = apacheAccessLog
     43       // 提取计算需要的字段数据
     44       .map(log => (log.contentSize))
     45 
     46     // 对重复使用的RDD进行cache
     47     contentSizeRDD.cache()
     48 
     49     // 开始计算平均值、最小值、最大值
     50     val totalContentSize = contentSizeRDD.sum()
     51     val totalCount = contentSizeRDD.count()
     52     val avgSize = 1.0 * totalContentSize / totalCount
     53     val minSize = contentSizeRDD.min()
     54     val maxSize = contentSizeRDD.max()
     55 
     56     // 当RDD不使用的时候,进行unpersist
     57     contentSizeRDD.unpersist()
     58 
     59     // 结果输出
     60     println(s"ContentSize Avg:${avgSize}, Min: ${minSize}, Max: ${maxSize}")
     61 
     62     // 需求二:请各个不同返回值的出现的数据 ===> wordCount程序
     63     /*
     64     * A count of response code's returned.
     65     * */
     66     val responseCodeResultRDD = apacheAccessLog
     67       // 提取需要的字段数据, 转换为key/value键值对,方便进行reduceByKey操作
     68       // 当连续出现map或者flatMap的时候,将多个map/flatMap进行合并
     69       .map(log => (log.responseCode, 1))
     70       // 使用reduceByKey函数,按照key进行分组后,计算每个key出现的次数
     71       .reduceByKey(_ + _)
     72 
     73     // 结果输出
     74     println(s"""ResponseCode :${responseCodeResultRDD.collect().mkString(",")}""")
     75 
     76     // 需求三:获取访问次数超过N次的IP地址
     77     // 需求三额外:对IP地址进行限制,部分黑名单IP地址不统计
     78     /*
     79     * All IPAddresses that have accessed this server more than N times.
     80     * 1. 计算IP地址出现的次数 ===> WordCount程序
     81     * 2. 数据过滤
     82     * */
     83     val blackIP = Array("200-55-104-193.dsl.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
     84     // 由于集合比较大,将集合的内容广播出去
     85     val broadCastIP = sc.broadcast(blackIP)
     86     val N = 10
     87     val ipAddressRDD = apacheAccessLog
     88       // 过滤IP地址在黑名单中的数据
     89       .filter(log => !broadCastIP.value.contains(log.ipAddress))
     90       // 获取计算需要的IP地址数据,并将返回值转换为Key/Value键值对类型
     91       .map(log => (log.ipAddress, 1L))
     92       // 使用reduceByKey函数进行聚合操作
     93       .reduceByKey(_ + _)
     94       // 过滤数据,要求IP地址必须出现N次以上
     95       .filter(tuple => tuple._2 > N)
     96     // 获取满足条件IP地址, 为了展示方便,将下面这行代码注释
     97     //      .map(tuple => tuple._1)
     98 
     99     // 结果输出
    100     println(s"""IP Address :${ipAddressRDD.collect().mkString(",")}""")
    101 
    102     // 需求四:获取访问次数最多的前K个endpoint的值 ==> TopN
    103     /*
    104     * The top endpoints requested by count.
    105     * 1. 先计算出每个endpoint的出现次数
    106     * 2. 再进行topK的一个获取操作,获取出现次数最多的前K个值
    107     * */
    108     val K = 10
    109     val topKValues = apacheAccessLog
    110       // 获取计算需要的字段信息,并返回key/value键值对
    111       .map(log => (log.endpoint, 1))
    112       // 获取每个endpoint对应的出现次数
    113       .reduceByKey(_ + _)
    114       // 获取前10个元素, 而且使用我们自定义的排序类
    115       .top(K)(LogSortingUtil.TupleOrdering)
    116     // 如果只需要endpoint的值,不需要出现的次数,那么可以通过map函数进行转换
    117     //      .map(_._1)
    118 
    119     // 结果输出
    120     println(s"""TopK values:${topKValues.mkString(",")}""")
    121 
    122 
    123     // 对不在使用的rdd,去除cache
    124     apacheAccessLog.unpersist()
    125 
    126     // ================日志分析具体代码==================
    127 
    128     sc.stop()
    129   }
    130 }

    3.需要的辅助类一(返回匹配的日志)

     1 package com.ibeifeng.bigdata.spark.core.log
     2 
     3 import scala.util.matching.Regex
     4 
     5 /**
     6   * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
     7   * Created by ibf on 01/15.
     8   */
     9 case class ApacheAccessLog(
    10                             ipAddress: String, // IP地址
    11                             clientId: String, // 客户端唯一标识符
    12                             userId: String, // 用户唯一标识符
    13                             serverTime: String, // 服务器时间
    14                             method: String, // 请求类型/方式
    15                             endpoint: String, // 请求的资源
    16                             protocol: String, // 请求的协议名称
    17                             responseCode: Int, // 请求返回值:比如:200、401
    18                             contentSize: Long // 返回的结果数据大小
    19                           )
    20 
    21 /**
    22   * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
    23   * on 01/15.
    24   * 提供一些操作Apache Log的工具类供SparkCore使用
    25   */
    26 object ApacheAccessLog {
    27   // Apache日志的正则
    28   val PARTTERN: Regex =
    29   """^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) (d+)""".r
    30 
    31   /**
    32     * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false
    33     *
    34     * @param line
    35     * @return
    36     */
    37   def isValidateLogLine(line: String): Boolean = {
    38     val options = PARTTERN.findFirstMatchIn(line)
    39 
    40     if (options.isEmpty) {
    41       false
    42     } else {
    43       true
    44     }
    45   }
    46 
    47   /**
    48     * 解析输入的日志数据
    49     *
    50     * @param line
    51     * @return
    52     */
    53   def parseLogLine(line: String): ApacheAccessLog = {
    54     if (!isValidateLogLine(line)) {
    55       throw new IllegalArgumentException("参数格式异常")
    56     }
    57 
    58     // 从line中获取匹配的数据
    59     val options = PARTTERN.findFirstMatchIn(line)
    60 
    61     // 获取matcher
    62     val matcher = options.get
    63 
    64     // 构建返回值
    65     ApacheAccessLog(
    66       matcher.group(1), // 获取匹配字符串中第一个小括号中的值
    67       matcher.group(2),
    68       matcher.group(3),
    69       matcher.group(4),
    70       matcher.group(5),
    71       matcher.group(6),
    72       matcher.group(7),
    73       matcher.group(8).toInt,
    74       matcher.group(9).toLong
    75     )
    76   }
    77 }

    4.需要的辅助类二(自定义的一个二元组的比较器,方便进行TopN)

     1 package com.ibeifeng.bigdata.spark.core.log
     2 
     3 /**
     4   * Created by ibf on 01/15.
     5   */
     6 object LogSortingUtil {
     7 
     8   /**
     9     * 自定义的一个二元组的比较器
    10     */
    11   object TupleOrdering extends scala.math.Ordering[(String, Int)] {
    12     override def compare(x: (String, Int), y: (String, Int)): Int = {
    13       // 按照出现的次数进行比较,也就是按照二元组的第二个元素进行比较
    14       x._2.compare(y._2)
    15     }
    16   }
    17 
    18 }
  • 相关阅读:
    DEDECMS里面DEDE函数解析
    dede数据库类使用方法 $dsql
    DEDE数据库语句 DEDESQL命令批量替换 SQL执行语句
    织梦DedeCms网站更换域名后文章图片路径批量修改
    DSP using MATLAB 示例 Example3.12
    DSP using MATLAB 示例 Example3.11
    DSP using MATLAB 示例 Example3.10
    DSP using MATLAB 示例Example3.9
    DSP using MATLAB 示例Example3.8
    DSP using MATLAB 示例Example3.7
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6428989.html
Copyright © 2011-2022 走看看