zoukankan      html  css  js  c++  java
  • Spark- 流量日志分析

    日志生成

    package zx.Utils
    
    import java.io.{File, FileWriter}
    import java.util.Calendar
    import org.apache.commons.lang.time.{DateUtils, FastDateFormat}
    
    import scala.collection.mutable.ArrayBuffer
    import scala.util.Random
    
    /**
     * Created by 166 on 2017/9/6.
     */
    case class FlowLog(time:String,ip:String,upFlow:Long,downFlow:Long) extends Serializable{
      override def toString: String = {
        s"$time	$ip	$upFlow	$downFlow"
      }
    }
    object CreateLog {
      val ip_buffer: StringBuilder = new StringBuilder
      private val fs: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
      var startTime:String="2015-1-12 12:12:12"
      val instance: Calendar = Calendar.getInstance
      val ipPool:ArrayBuffer[String]=getIp  //ipPool    取得20个ip
    
      //取得20个ip地址
      private [this] def getIp:ArrayBuffer[String]={
        val arrayBuffer: ArrayBuffer[String] = ArrayBuffer()
        ip_buffer.clear()
        for(i<-0 to 20){
          ip_buffer.append(Random.nextInt(255)).append(".")
            .append(Random.nextInt(255)).append(".")
            .append(Random.nextInt(255)).append(".")
            .append(Random.nextInt(255))
          arrayBuffer+=ip_buffer.toString()
          ip_buffer.clear()
        }
        arrayBuffer
      }
    
      def getTime:String={
        instance.setTime(DateUtils.parseDate(startTime,Array("yyyy-MM-dd HH:mm:ss")))
        instance.add(Calendar.MINUTE,Random.nextInt(200))
        val newTime: String = fs.format(instance.getTime)
        startTime=newTime
        newTime
      }
    
      def getFlow:Long={
        Random.nextInt(800)
      }
    
      //从ip地址池中取出一个ip
      def getIP:String={
        ipPool(Random.nextInt(ipPool.size))
      }
    
      //把日志写入文件
      def write2file(fr:FileWriter,context:String)={
          fr.write(context)
        fr.write(System.lineSeparator())
        fr.flush()
        "SUCCESS"
      }
    
      def main(args: Array[String]) {
        val file: File = new File("C:\Users\166\Desktop\Data\Log","click_flow.log")
        if(file.exists()){
          file.delete()
          val fw: FileWriter = new FileWriter(file)
          for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString))
          fw.close()
        }else{
          val fw: FileWriter = new FileWriter(file)
          for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString))
          fw.close()
        }
      }
    }

    算出每个用户的上行流量总和 和下行流量的总和

    package zx.sparkStream
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    /**需求:算出每个用户的上行流量总和 和下行流量的总和
     * Created by rz on 2017/9/6.
     */
    case class ResultTuple()
    case class ClickFlow(remoteUser:String,tupleFlow:(Long,Long))
    object SparkOffLine {
      def main(args: Array[String]) {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val sc: SparkContext = new SparkContext(new SparkConf().setAppName("SparkOffLine").setMaster("local[*]"))
        val rdd: RDD[String] = sc.textFile("C:\Users\166\Desktop\Data\Log\click_flow.log")
        val rdd1:RDD[(String,ClickFlow)]=rdd.map(data=>{
          val datas:Array[String]= data.split("	")
          (datas(1),ClickFlow(datas(1),(datas(2).toLong,datas(3).toLong)))
    
        })
        val rdd2:RDD[(String,ClickFlow)]=rdd1.reduceByKey((x,y)=>{
          val x_upFlow: Long = x.tupleFlow._1
          val y_upFlow: Long = y.tupleFlow._1
          val x_dowmFlow: Long = x.tupleFlow._2
          val y_downFlow: Long = y.tupleFlow._2
          ClickFlow(x.remoteUser,(x_upFlow+y_upFlow,x_dowmFlow+y_downFlow))
        })
    
        println(rdd2.collect().toBuffer)
      }
    }
  • 相关阅读:
    JVM-Java程序性能监控-初级篇
    一段获取app性能指标的py脚本
    一段从TXT导入excel的py脚本
    matplotlib根据Y轴数量伸缩画图的py脚本
    jsonpath读取json数据格式公用方法!!!
    python安装插件包注意事项
    Jenkins持续集成
    Jenkins简介&邮箱配置
    unittest框架扩展(基于代码驱动)自动化-下
    unittest框架扩展(自动生成用例)自动化-上
  • 原文地址:https://www.cnblogs.com/RzCong/p/7823010.html
Copyright © 2011-2022 走看看