//提交代码包 // /usr/local/spark/bin$ spark-submit --class "getkv" /data/chun/sparktes.jar import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext object split { def main(args:Array[String]) { val cf = new SparkConf().setAppName("ass").setMaster("local") val sc = new SparkContext(cf) val sqlContext = new SQLContext(sc) val hc = new HiveContext(sc) val format=new java.text.SimpleDateFormat("yyyy-MM-dd") val date=format.format(new java.util.Date().getTime-9*24*60*60*1000) val lg= sc.textFile("hdfs://master:9000/data/"+date+"/*/*.gz") val filed1=lg.map(l=>(l.split("android_id":"").last.split(""").head.toString, l.split("anylst_ver":").last.split(",").head.toString, l.split("area":"").last.split(""").head, l.split("build_CPU_ABI":"").last.split(""").head, l.split("build_board":"").last.split(""").head, l.split("build_model":"").last.split(""").head, l.split(""city":"").last.split(""").head, l.split("country":"").last.split(""").head, l.split("cpuCount":").last.split(",").head, l.split("cpuName":"").last.split(""").head, l.split("custom_uuid":"").last.split(""").head, l.split("cid":"").last.split(""").head, l.split("definition":"").last.split(""").head, l.split("firstTitle":"").last.split(""").head, l.split("modeType":"").last.split(""").head, l.split("pageName":"").last.split(""").head, l.split("playIndex":"").last.split(""").head, l.split("rectime":").last.split(",").head, l.split("time":"").last.split(""").head)) //val F1=filed1.toDF("custom_uuid","region","screenHeight","screenWidth","serial_number","touchMode","umengChannel","vercode","vername","wlan0_mac","rectime","time") val scoreDataFrame1 = hc.createDataFrame(filed1).toDF("android_id","anylst_ver","area","build_CPU_ABI","build_board","build_model","city","country","cpuCount","cpuName","custom_uuid","cid","definition","firstTitle","modeType","pageName","playIndex","rectime","time") scoreDataFrame1.write.mode(SaveMode.Append).saveAsTable("test.f1") val filed2=lg.map(l=>(l.split("custom_uuid":"").last.split(""").head, l.split("playType":"").last.split(""").head, l.split("prevName":"").last.split(""").head, l.split("prevue":").last.split(",").head, l.split("siteName":"").last.split(""").head, l.split("title":"").last.split(""").head, l.split("uuid":"").last.split(""").head, l.split("vod_seek":"").last.split(""").head, l.split("device_id":"").last.split(""").head, l.split("device_name":"").last.split(""").head, l.split("dpi":").last.split(",").head, l.split("eth0_mac":"").last.split(""").head, l.split("ip":"").last.split(""").head, l.split("ipaddr":"").last.split(""").head, l.split("isp":"").last.split(""").head, l.split("largeMem":").last.split(",").head, l.split("limitMem":").last.split(",").head, l.split("packageName":"").last.split(""").head, l.split("rectime":").last.split(",").head, l.split("time":"").last.split(""").head)) import sqlContext.implicits._ val scoreDataFrame2 = hc.createDataFrame(filed2).toDF("custom_uuid","playType","prevName","prevue","siteName","title","uuid","vod_seek","device_id","device_name","dpi","eth0_mac","ip","ipaddr","isp","largeMem","limitMem","packageName","rectime","time") scoreDataFrame2.write.mode(SaveMode.Append).saveAsTable("test.f2") // val filed3=lg.map(l=>(l.split("custom_uuid":"").last.split(""").head, l.split("region":"").last.split(""").head, l.split("screenHeight":").last.split(",").head, l.split("screenWidth":").last.split(",").head, l.split("serial_number":"").last.split(""").head, l.split("touchMode":").last.split(",").head, l.split("umengChannel":"").last.split(""").head, l.split("vercode":").last.split(",").head, l.split("vername":"").last.split(""").head, l.split("wlan0_mac":"").last.split(""").head, l.split("rectime":").last.split(",").head, l.split("time":"").last.split(""").head )) import sqlContext.implicits._ val scoreDataFrame3= hc.createDataFrame(filed3).toDF("custom_uuid","region","screenHeight","screenWidth","serial_number","touchMode","umengChannel","vercode","vername","wlan0_mac","rectime","time") scoreDataFrame3.write.mode(SaveMode.Append).saveAsTable("test.f3") } }