1.Spark SQL 基本操作
将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:
(1) 查询所有数据;
scala> val df = spark.read.json("file:///usr/local/spark/sparklab/employee.json") df: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field] scala> df.show() +----+---+-----+ | age| id| name| +----+---+-----+ | 36| 1| Ella| | 29| 2| Bob| | 29| 3| Jack| | 28| 4| Jim| | 28| 4| Jim| |null| 5|Damon| |null| 5|Damon| +----+---+-----+
(2) 查询所有数据,并去除重复的数据;
scala> df.distinct().show() +----+---+-----+ | age| id| name| +----+---+-----+ | 36| 1| Ella| | 29| 3| Jack| |null| 5|Damon| | 29| 2| Bob| | 28| 4| Jim| +----+---+-----+
(3) 查询所有数据,打印时去除 id 字段;
scala> df.drop("id").show() +----+-----+ | age| name| +----+-----+ | 36| Ella| | 29| Bob| | 29| Jack| | 28| Jim| | 28| Jim| |null|Damon| |null|Damon| +----+-----+
(4) 筛选出 age>30 的记录;
scala> df.filter(df("age") > 30 ).show() +---+---+-----+ |age| id| name| +---+---+-----+ | 36| 1| Ella| +---+---+-----+
(5) 将数据按 age 分组;

scala> df.groupBy("name").count().show()
+-----+-----+
| name|count|
+-----+-----+
|Damon| 2|
| Ella| 1|
| Jim| 2|
| Jack| 1|
| Bob| 1|
+-----+-----+
(6) 将数据按 name 升序排列;
scala> df.sort(df("name").asc).show()
+----+---+-----+
| age| id| name|
+----+---+-----+
| 36| 1| Ella|
| 29| 2| Bob|
|null| 5|Damon|
|null| 5|Damon|
| 29| 3| Jack|
| 28| 4| Jim|
| 28| 4| Jim|
+----+---+-----+
+----+---+-----+
| age| id| name|
+----+---+-----+
| 36| 1| Ella|
| 29| 2| Bob|
|null| 5|Damon|
|null| 5|Damon|
| 29| 3| Jack|
| 28| 4| Jim|
| 28| 4| Jim|
+----+---+-----+

(7) 取出前 3 行数据;
df.take(3) 或 scala> df.head(3
8) 查询所有记录的 name 列,并为其取别名为 username;
df.select(df("name").as("username")).show()
(9) 查询年龄 age 的平均值;
df.agg("age"->"avg")
(10) 查询年龄 age 的最小值。
df.agg("age"->"min")