zoukankan      html  css  js  c++  java
  • Spark中利用Scala进行数据清洗(代码)

    2019-05-07 18:56:18
    1

      package com.amoscloud.log.analyze 2 3 import java.text.SimpleDateFormat 4 import java.util.Date 5 6 import org.apache.spark.rdd.RDD 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object LogAnalyze1 { 10 def main(args: Array[String]): Unit = { 11 12 13 val conf = new SparkConf().setMaster("local[2]").setAppName("LogAnalyze2") 14 val sc = new SparkContext(conf) 15 16 val data = sc.textFile("C:\Users\Administrator\Desktop\HTTP.txt") 17 data.cache() 18 // 1.(手机号,归属地,设备品牌,设备型号,连接时长) 19 // analyze1(data) 20 // 2.(时间段秒,访问流量) 21 analyze2(data) 22 // 3.(品牌,Array[(String,Int)]((型号1,个数1),(型号2,个数2))) 23 // analyze(data) 24 } 25 26 private def analyze(data: RDD[String]) = { 27 data.filter(_.split(",").length >= 72) 28 .map(x => { 29 val arr = x.split(",") 30 val brand = arr(70) 31 val model = arr(71) 32 ((brand, model), 1) 33 }) 34 .reduceByKey(_ + _) 35 .map(t => { 36 val k = t._1 37 (k._1, (k._2, t._2)) 38 }) 39 .groupByKey() 40 .collect() 41 .foreach(println) 42 } 43 44 private def analyze2(data: RDD[String]) = { 45 data.map(x => { 46 val arr = x.split(",") 47 val time = arr(16).take(arr(16).length - 4) 48 val flow = arr(7).toLong 49 (time, flow) 50 }) 51 .reduceByKey(_ + _) 52 // .map(x => (x._1, (x._2 / 1024.0).formatted("%.3f") + "KB")) 53 .map(x => (x._1, x._2)) 54 .collect() 55 .foreach(println) 56 } 57 58 private def analyze1(data: RDD[String]) = { 59 data 60 .filter(_.split(",").length >= 72) 61 .map(x => { 62 val arr = x.split(",") 63 val phoneNum = arr(3).takeRight(11) 64 val local = arr(61) + arr(62) + arr(63) 65 val brand = arr(70) 66 val model = arr(71) 67 val connectTime = timeDiff(arr(16), arr(17)) 68 (phoneNum + "|" + local + "|" + brand + "|" + model, connectTime) 69 // 1.(手机号,归属地,设备品牌,设备型号,连接时长) 70 }) 71 .reduceByKey(_ + _) 72 .map(t => (t._1, formatTime(t._2))) 73 .collect() 74 .foreach(println) 75 } 76 77 def timeDiff(time1: String, time2: String): Long = { 78 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 79 val timeStamp2 = sdf.parse(time2.take(time2.length - 4)).getTime + time2.takeRight(3).toLong 80 val timeStamp1 = sdf.parse(time1.take(time1.length - 4)).getTime + time1.takeRight(3).toLong 81 timeStamp2 - timeStamp1 82 } 83 84 85 def formatTime(time: Long): String = { 86 val timeS = time / 1000 87 val s = timeS % 60 88 val m = timeS / 60 % 60 89 val h = timeS / 60 / 60 % 24 90 h + ":" + m + ":" + s 91 } 92 93 }

    2:写spark程序统计iis网站请求日志中 每天每个小时段成功访问ip的数量

    package com.amoscloud.log.analyze
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    object LogAnalyze {
      def main(args: Array[String]): Unit = {
        //    写spark程序统计iis网站请求日志中 每天每个小时段成功访问ip的数量
    
        //获取sc
        val conf = new SparkConf().setAppName("LogAnalyze").setMaster("local[2]")
        val sc = new SparkContext(conf)
    
        //读取数据
        val log: RDD[String] = sc.textFile("C:\Users\Administrator\Desktop\iis网站请求日志")
    
    
        //将日志中,日期,时间,IP和响应码 保留
        log
          .filter(_.split("\s").length > 10)
    
          .map(line => {
            val strings = line.split("\s+")
            //RDD[(String,String,String,String)]
            (strings(0), strings(1).split(":")(0), strings(8), strings(10))
          })
          //RDD[(String,String,String,String)]
          .filter(_._4 == "200")
          //RDD[(日期|时间,IP)]
          .map(t => (t._1 + "|" + t._2, t._3))
          //RDD[(日期|时间,Iterable[IP])]
    
          .groupByKey()
          .map(t => (t._1, t._2.toList.size, t._2.toList.distinct.size))
    
          .collect()
          .foreach(t => {
            val spl = t._1.split("\|")
            printf("%s	%s	%d	%d
    ", spl(0), spl(1), t._2, t._3)
          })
    
    
        //数据按照 日期和时间进行分区  相同key的数据都在同一个分区中
        //      .partitionBy(new HashPartitioner(48))
        //      .mapPartitions((iter: Iterator[(String, String)]) => {
        //        val set = mutable.HashSet[String]()
        //        var count = 0
        //        var next = ("", "")
        //        while (iter.hasNext) {
        //          next = iter.next()
        //          count += 1
        //          set.add(next._2)
        //        }
        //        ((next._1, count, set.size) :: Nil).iterator
        //      })
        //      .filter(_._1.nonEmpty)
      }
    }

                                                        更灵活的运用spark算子,意味着写更少的代码

                                                                    2019-05-07 19:06:57

  • 相关阅读:
    mv 命令 简要
    mv 命令
    rmdir 命令
    rm 命令简要
    rm 命令
    mkdir 命令
    pwd 命令
    远程工具(SSH Secure)连接Centos出现中文乱码问题的解决办法
    (4)剑指Offer之链表相关编程题
    (4)剑指Offer之链表相关编程题
  • 原文地址:https://www.cnblogs.com/Vowzhou/p/10827349.html
Copyright © 2011-2022 走看看