测试数据:
1 2 3 4 5 6
3 4 5 6 7 10
10 1 2 3 4 5
9 8 7 6 5 4
**
* 使用DataFrame实现WordCount
*/
object DataFrameWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
import spark.implicits._
val linesDF = spark.sparkContext.textFile("D:\workspace\test_data.txt").toDF("line")
linesDF.show(false)
linesDF.printSchema()
//将一行数据展开
val wordsDF = linesDF.explode("line", "word")((line: String) => line.split(" "))
wordsDF.printSchema()
wordsDF.show(200,false)
//对 "word"列进行聚合逻辑并使用count算子计算每个分组元素的个数
val wordCoungDF = wordsDF.groupBy("word").count()
wordCoungDF.show(false)
wordCoungDF.printSchema()
println(wordCoungDF.count() + "----------")
}
}
打印结果:
+------------+
|line |
+------------+
|1 2 3 4 5 6 |
|3 4 5 6 7 10|
|10 1 2 3 4 5|
|9 8 7 6 5 4 |
+------------+
root
|-- line: string (nullable = true)
root
|-- line: string (nullable = true)
|-- word: string (nullable = true)
+------------+----+
|line |word|
+------------+----+
|1 2 3 4 5 6 |1 |
|1 2 3 4 5 6 |2 |
|1 2 3 4 5 6 |3 |
|1 2 3 4 5 6 |4 |
|1 2 3 4 5 6 |5 |
|1 2 3 4 5 6 |6 |
|3 4 5 6 7 10|3 |
|3 4 5 6 7 10|4 |
|3 4 5 6 7 10|5 |
|3 4 5 6 7 10|6 |
|3 4 5 6 7 10|7 |
|3 4 5 6 7 10|10 |
|10 1 2 3 4 5|10 |
|10 1 2 3 4 5|1 |
|10 1 2 3 4 5|2 |
|10 1 2 3 4 5|3 |
|10 1 2 3 4 5|4 |
|10 1 2 3 4 5|5 |
|9 8 7 6 5 4 |9 |
|9 8 7 6 5 4 |8 |
|9 8 7 6 5 4 |7 |
|9 8 7 6 5 4 |6 |
|9 8 7 6 5 4 |5 |
|9 8 7 6 5 4 |4 |
+------------+----+
+----+-----+
|word|count|
+----+-----+
|7 |2 |
|3 |3 |
|8 |1 |
|5 |4 |
|6 |3 |
|9 |1 |
|1 |2 |
|10 |2 |
|4 |4 |
|2 |2 |
+----+-----+
root
|-- word: string (nullable = true)
|-- count: long (nullable = false)
10