Data streaming转为DataFrame,不能直接一步转到DF,需要先转为RDD,然后再转到DF,我们用流式处理数据后,再通过spark sql实时获取我们想要的结果。
1.首先老规矩,创建spark上下文对象,spark SQL和spark Streaming,再创建个socket在Linux端打入数据。
1 val conf = new SparkConf().setAppName("Demo04DSToRDDToDF").setMaster("local[2]") 2 3 conf.set("spark.sql.shuffle.partitions", "1") 4 val sc = new SparkContext(conf) 5 val sqlContext = new SQLContext(sc) 6 7 val ssc = new StreamingContext(sc,Durations.seconds(5)) 8 9 val lines = ssc.socketTextStream("master",8888)
2.首先用foreachRDD方法把spark streaming转为RDD
1 /** 2 * 3 * DS -> RDD -> DF 4 * 5 */ 6 lines.foreachRDD(rdd => { 7 val stuRDD = rdd.map(line => { 8 val split = line.split(",") 9 (split(0),split(1),split(2).toInt,split(3),split(4)) 10 })
3.导入sqlContext隐式转换,将RDD To成DF,同时传入column对应RDD返回值
1 //导入隐式转换,将RDD转换为DF 2 import sqlContext.implicits._ 3 4 val stuDF = stuRDD.toDF("id","name","age","gender","clazz") 5 //注册成表 6 stuDF.registerTempTable("student") 7 8 val result = sqlContext.sql("select clazz,count(1) from student group by clazz") 9 10 result.show() 11 result.write.mode(SaveMode.Append).json("Spark/data/dsondf")
总结,这里的sqlContext必须自己创建好,原来我还以为是导包的时候,类都已经封装好了的,直接import就行了,报的数组下标越界,懵了半天。