输入输出转化工具类
package com.rz.mobile_tag.log import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} /** * 访问日志转换(输入==>输出)工具类 */ object AccessConvertUtil { // 定义的输出字段 val structType = StructType( Array( StructField("url", StringType), StructField("cmsType", StringType), StructField("cmsId", LongType), StructField("traffic", LongType), StructField("ip", StringType), StructField("city", StringType), StructField("time", StringType), StructField("day", StringType) ) ) /** * 根据输入的每一行信息转换成输出的样式 * @param log 输入的每一行记录信息 */ def parseLog(log:String)={ try{ val splits = log.split(" ",-1) val url = splits(1) val traffic = splits(2).toLong val ip = splits(3) val domain = "http://www.rz.com/" val cms = url.substring(url.indexOf(domain)+domain.length) val cmsTypeId = cms.split("/") var cmsType = "" var cmsId = 0l if (cmsTypeId.length>1){ cmsType = cmsTypeId(0) cmsId = cmsTypeId(1).toLong } val city="" val time = splits(0) val day = time.substring(0, 10).replaceAll("-","") // 这个Row里面的字段要和Struct中的字段对应上 Row(url, cmsType, cmsId, traffic, ip, city, time, day) }catch { case e:Exception =>{ Row(0) } } } }
读取数据,清洗输出目标数据
package com.rz.mobile_tag.log import org.apache.spark.sql.{SaveMode, SparkSession} /** * 使用Spark完成我们的数据清洗操作 */ object SparkStatCleanJob { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}") .master("local[2]") .getOrCreate() val accessRDD = spark.sparkContext.textFile(args(0)) // debug查看数据 // accessRDD.take(10).foreach(println) val accessDF = spark.createDataFrame(accessRDD.map(log =>AccessConvertUtil.parseLog(log)),AccessConvertUtil.structType) // accessDF.printSchema() // accessDF.show(false) accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(args(1)) spark.stop() } }