日志生成
package zx.Utils import java.io.{File, FileWriter} import java.util.Calendar import org.apache.commons.lang.time.{DateUtils, FastDateFormat} import scala.collection.mutable.ArrayBuffer import scala.util.Random /** * Created by 166 on 2017/9/6. */ case class FlowLog(time:String,ip:String,upFlow:Long,downFlow:Long) extends Serializable{ override def toString: String = { s"$time $ip $upFlow $downFlow" } } object CreateLog { val ip_buffer: StringBuilder = new StringBuilder private val fs: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") var startTime:String="2015-1-12 12:12:12" val instance: Calendar = Calendar.getInstance val ipPool:ArrayBuffer[String]=getIp //ipPool 取得20个ip //取得20个ip地址 private [this] def getIp:ArrayBuffer[String]={ val arrayBuffer: ArrayBuffer[String] = ArrayBuffer() ip_buffer.clear() for(i<-0 to 20){ ip_buffer.append(Random.nextInt(255)).append(".") .append(Random.nextInt(255)).append(".") .append(Random.nextInt(255)).append(".") .append(Random.nextInt(255)) arrayBuffer+=ip_buffer.toString() ip_buffer.clear() } arrayBuffer } def getTime:String={ instance.setTime(DateUtils.parseDate(startTime,Array("yyyy-MM-dd HH:mm:ss"))) instance.add(Calendar.MINUTE,Random.nextInt(200)) val newTime: String = fs.format(instance.getTime) startTime=newTime newTime } def getFlow:Long={ Random.nextInt(800) } //从ip地址池中取出一个ip def getIP:String={ ipPool(Random.nextInt(ipPool.size)) } //把日志写入文件 def write2file(fr:FileWriter,context:String)={ fr.write(context) fr.write(System.lineSeparator()) fr.flush() "SUCCESS" } def main(args: Array[String]) { val file: File = new File("C:\Users\166\Desktop\Data\Log","click_flow.log") if(file.exists()){ file.delete() val fw: FileWriter = new FileWriter(file) for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString)) fw.close() }else{ val fw: FileWriter = new FileWriter(file) for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString)) fw.close() } } }
算出每个用户的上行流量总和 和下行流量的总和
package zx.sparkStream import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /**需求:算出每个用户的上行流量总和 和下行流量的总和 * Created by rz on 2017/9/6. */ case class ResultTuple() case class ClickFlow(remoteUser:String,tupleFlow:(Long,Long)) object SparkOffLine { def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc: SparkContext = new SparkContext(new SparkConf().setAppName("SparkOffLine").setMaster("local[*]")) val rdd: RDD[String] = sc.textFile("C:\Users\166\Desktop\Data\Log\click_flow.log") val rdd1:RDD[(String,ClickFlow)]=rdd.map(data=>{ val datas:Array[String]= data.split(" ") (datas(1),ClickFlow(datas(1),(datas(2).toLong,datas(3).toLong))) }) val rdd2:RDD[(String,ClickFlow)]=rdd1.reduceByKey((x,y)=>{ val x_upFlow: Long = x.tupleFlow._1 val y_upFlow: Long = y.tupleFlow._1 val x_dowmFlow: Long = x.tupleFlow._2 val y_downFlow: Long = y.tupleFlow._2 ClickFlow(x.remoteUser,(x_upFlow+y_upFlow,x_dowmFlow+y_downFlow)) }) println(rdd2.collect().toBuffer) } }