{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
object explodeTest { def main(args: Array[String]): Unit = { val sparks = SparkSession.builder.master("local[4]").appName("test1").getOrCreate val sc = sparks.sparkContext val df= sparks.read.json("file:///C:\Users\imp\Desktop\bo-kong\data\josn") df.show() //spark 读取json 数据 /**+---+--------------------+-------+ |age| myScore| name| +---+--------------------+-------+ | 25| [[19,23], [58,50]]|Michael| | 30|[[29,33], [38,52]...| Andy| | 19| [[39,43], [28,53]]| Justin| | 25| [[19,23], [58,50]]|Michael| | 30|[[29,33], [38,52]...| Andy| | 19| [[39,43], [28,53]]| Justin| | 25| [[19,23], [58,50]]|Michael| | 30|[[29,33], [38,52]...| Andy| | 19| [[39,43], [28,53]]| Justin| +---+--------------------+-------+ * * * */ //使用spark.sql.functions._ explode函数进行压平操作 行转列 import org.apache.spark.sql.functions._ val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore") val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2") dfScore.show() df.createOrReplaceTempView("df") //u.answer, '' /** * * * * +-------+-------+ * | name|myScore| * +-------+-------+ * |Michael|[19,23]| * |Michael|[58,50]| * | Andy|[29,33]| * | Andy|[38,52]| * | Andy|[88,71]| * | Justin|[39,43]| * | Justin|[28,53]| * |Michael|[19,23]| * |Michael|[58,50]| * | Andy|[29,33]| * | Andy|[38,52]| * | Andy|[88,71]| * | Justin|[39,43]| * | Justin|[28,53]| * |Michael|[19,23]| * |Michael|[58,50]| * | Andy|[29,33]| * | Andy|[38,52]| * | Andy|[88,71]| * | Justin|[39,43]| * +-------+-------+ * only showing top 20 rows */ } }
数据
aa
bb
cc
dd
ee
ff
dataframe增加index主键列
case class Log(map:scala.collection.mutable.Map[String,String],ID: Long) import sparks.implicits._ val data2 = sc.parallelize(Seq((Map("uuid"->"sxexx","ip"->"192.168")),Map("uuid"->"man","ip"->"192.168.10.1"))).zipWithIndex() .map(i=>(i._1,i._2)) data2.collect().foreach(print(_)) /** * 先创造一个Rdd[map] 使用zipWithIndex 看看效果 第二个元素为id主键 * * * (Map(uuid -> sxexx, ip -> 192.168),0) * (Map(uuid -> man, ip -> 192.168.10.1),1) */ val data= sc.textFile("file:///C:\Users\imp\Desktop\bo-kong\data\data") .zipWithIndex().toDF("id","value") data.show() /** * 使用上面的数据的得出结果 * +---+-----+ * | id|value| * +---+-----+ * | aa| 0| * | bb| 1| * | cc| 2| * | dd| 3| * | ee| 4| * | ff| 5| * +---+-----+ */