DateFrame 早期叫作SchemaRDD是存放Row 对象的RDD,每个Row 对象代表一行记录。
SchemaRDD 还包含记录的结构信息(即数据字段)。SchemaRDD 看起来和普通的RDD 很像,
但是在内部,SchemaRDD 可以利用结构信息更加高效地存储数据。
此外,SchemaRDD 还支持RDD 上所没有的一些新操作,比如运行SQL 查询。SchemaRDD 可以
从外部数据源创建,也可以从查询结果或普通RDD 中创建。
spark dataframe 的几个关键点:
- 分布式的数据集
- 类似关系型数据库中的table
- 拥有丰富的操作函数,类似于 rdd 中的算子
- 一个 dataframe 可以被注册成一张数据表,然后用 sql 语言在上面操作
- 丰富的创建方式
- 已有的RDD
- 结构化数据文件
- JSON数据集
- Hive表
- 外部数据库
DataFrame API 是在 R 和 Python data frame 的设计灵感之上设计的,具有以下功能特性:
- 从KB到PB级的数据量支持;
- 多种数据格式和多种存储系统支持;
- 通过Spark SQL 的 Catalyst优化器进行先进的优化,生成代码;
- 通过Spark无缝集成所有大数据工具与基础设施;
- 为Python、Java、Scala和R语言(SparkR)API;
简单来说,dataframe 能够更方便的操作数据集,而且因为其底层是通过 spark sql 的 Catalyst优化器生成优化后的执行代码,所以其执行速度会更快。总结下来就是,使用 spark dataframe 来构建 spark app,能:
- write less : 写更少的代码
- do more : 做更多的事情
- faster : 以更快的速度
sparkshell 读取mysql
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val sqlcon=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlcon: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4e079825
scala> val mysqlDF=sqlcon.read.format("jdbc").options(Map("url"->"jdbc:mysql://localhost:3306/test","user"->"root","password"->"root","dbtable"->"role")).load()
mysqlDF: org.apache.spark.sql.DataFrame = [roleid: int, name: string ... 4 more fields]
show
:展示数据
scala> mysqlDF.show
+------+----+-------------------+----+-----+----+
|roleid|name| dateid|addr|phone| sex|
+------+----+-------------------+----+-----+----+
| 1|null|2017-11-16 14:49:11|null| null|null|
| 40|null|2017-11-13 14:50:25|null| null|null|
| 110|null|2017-11-14 14:50:47|null| null|null|
| 200|null|2017-11-14 14:49:47|null| null|null|
| 400|null|2017-11-15 14:49:56|null| null|null|
| 600|null|2017-11-15 14:50:05|null| null|null|
+------+----+-------------------+----+-----+----+
scala> mysqlDF.show(2)
+------+----+-------------------+----+-----+----+
|roleid|name| dateid|addr|phone| sex|
+------+----+-------------------+----+-----+----+
| 1|null|2017-11-16 14:49:11|null| null|null|
| 40|null|2017-11-13 14:50:25|null| null|null|
+------+----+-------------------+----+-----+----+
only showing top 2 rows
collect
:获取所有数据到数组
scala> mysqlDF.collect
res35: Array[org.apache.spark.sql.Row] = Array([1,null,2017-11-16 14:49:11.0,null,null,null], [40,null,2017-11-13 14:50:25.0,null,null,null],
[110,null,2017-11-14 14:50:47.0,null,null,null], [200,null,2017-11-14 14:49:47.0,null,null,null], [400,null,2017-11-15 14:49:56.0,null,null,null],
[600,null,2017-11-15 14:50:05.0,null,null,null])
collectAsList: 获取所有数据到List
scala> mysqlDF.collectAsList
res36: java.util.List[org.apache.spark.sql.Row] = [[1,null,2017-11-16 14:49:11.0,null,null,null], [40,null,2017-11-13 14:50:25.0,null,null,null], [110,null,2017-11-14 14:50:47.0,null,null,null], [200,null,2017-11-14 14:49:47.0,null,null,null], [400,null,2017-11-15 14:49:56.0,null,null,null], [600,null,2017-11-15 14:50:05.0,null,null,null]]
describe(cols: String*)
:获取指定字段的统计信息
这个方法可以动态的传入一个或多个String
类型的字段名,结果仍然为DataFrame
对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max
等。
scala> mysqlDF.describe("level").show
+-------+------------------+
|summary| level|
+-------+------------------+
| count| 6|
| mean|22.333333333333332|
| stddev| 34.26173764809174|
| min| 1|
| max| 91|
+-------+------------------+
first, head, take, takeAsList
:获取若干行记录
scala> mysqlDF.head
res50: org.apache.spark.sql.Row = [1,null,2017-11-16 14:49:11.0,null,null,10]
scala> mysqlDF.head(3)
res51: Array[org.apache.spark.sql.Row] = Array([1,null,2017-11-16 14:49:11.0,null,null,10], [40,null,2017-11-13 14:50:25.0,null,null,1], [110,null,2017-11-14 14:50:47.0,null,null,20])
scala> mysqlDF.take(2)
res52: Array[org.apache.spark.sql.Row] = Array([1,null,2017-11-16 14:49:11.0,null,null,10], [40,null,2017-11-13 14:50:25.0,null,null,1])
where(conditionExpr: String)
:SQL语言中where关键字后的条件
scala> mysqlDF.where("roleid=1 or roleid=400").show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 400|null|2017-11-15 14:49:56|null| null| 4|
+------+----+-------------------+----+-----+-----+
scala> mysqlDF.where("roleid in (1,400)").show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 400|null|2017-11-15 14:49:56|null| null| 4|
+------+----+-------------------+----+-----+-----+
filter
:根据字段进行筛选,和where
使用条件相同
scala> mysqlDF.filter("roleid in(1,400)").show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 400|null|2017-11-15 14:49:56|null| null| 4|
+------+----+-------------------+----+-----+-----+
select
:获取指定字段值
scala> mysqlDF.select("roleid","dateid","level").where("roleid=1 or roleid=400").show
+------+-------------------+-----+
|roleid| dateid|level|
+------+-------------------+-----+
| 1|2017-11-16 14:49:11| 10|
| 400|2017-11-15 14:49:56| 4|
+------+-------------------+-----+
selectExpr
:可以对指定字段进行特殊处理
scala> mysqlDF.selectExpr("roleid","cast(dateid as int)").show
+------+----------+
|roleid| dateid|
+------+----------+
| 1|1510814951|
| 40|1510555825|
| 110|1510642247|
| 200|1510642187|
| 400|1510728596|
| 600|1510728605|
+------+----------+
scala> mysqlDF.apply("dateid")
res91: org.apache.spark.sql.Column = dateid
scala> mysqlDF.col("dateid")
res92: org.apache.spark.sql.Column = dateid
drop
:去除指定字段,保留其他字段
返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。
scala> mysqlDF.drop("phone").drop("addr").drop("name").show
+------+-------------------+-----+
|roleid| dateid|level|
+------+-------------------+-----+
| 1|2017-11-16 14:49:11| 10|
| 40|2017-11-13 14:50:25| 1|
| 110|2017-11-14 14:50:47| 20|
| 200|2017-11-14 14:49:47| 8|
| 400|2017-11-15 14:49:56| 4|
| 600|2017-11-15 14:50:05| 91|
+------+-------------------+-----+
dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
scala> mysqlDF.dropDuplicates("name","level").show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 600|null|2017-11-15 14:50:05|null| null| 91|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 400|null|2017-11-15 14:49:56|null| null| 4|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 1|null|2017-11-16 14:49:11|null| null| 10|
+------+----+-------------------+----+-----+-----+
scala> mysqlDF.dropDuplicates("roleid","level").show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 400|null|2017-11-15 14:49:56|null| null| 4|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 600|null|2017-11-15 14:50:05|null| null| 91|
| 110|null|2017-11-14 14:50:47|null| null| 20|
| 1|null|2017-11-16 14:49:11|null| null| 10|
+------+----+-------------------+----+-----+-----+
scala> mysqlDF.show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 110|null|2017-11-14 14:50:47|null| null| 20|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 400|null|2017-11-15 14:49:56|null| null| 4|
| 600|null|2017-11-15 14:50:05|null| null| 91|
+------+----+-------------------+----+-----+-----+
limit
limit
方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take
与head
不同的是,limit
方法不是Action操作。
scala> mysqlDF.limit(3).show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 110|null|2017-11-14 14:50:47|null| null| 20|
+------+----+-------------------+----+-----+-----+
orderBy
和sort
:按指定字段排序,默认为升序
sort 同orderBy
scala> mysqlDF.orderBy(mysqlDF("level").desc).show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 600|null|2017-11-15 14:50:05|null| null| 91|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 110|null|2017-11-14 14:50:47|null| null| 20|
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 400|null|2017-11-15 14:49:56|null| null| 4|
+------+----+-------------------+----+-----+-----+
scala> mysqlDF.orderBy(-mysqlDF("level")).show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 600|null|2017-11-15 14:50:05|null| null| 91|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 110|null|2017-11-14 14:50:47|null| null| 20|
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 400|null|2017-11-15 14:49:56|null| null| 4|
+------+----+-------------------+----+-----+-----+
scala> mysqlDF.orderBy("level").show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 400|null|2017-11-15 14:49:56|null| null| 4|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 110|null|2017-11-14 14:50:47|null| null| 20|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 600|null|2017-11-15 14:50:05|null| null| 91|
+------+----+-------------------+----+-----+-----+
scala> mysqlDF.sort(mysqlDF("level").desc).show
+------+----+-------------------+----+-----+-----+
|roleid|name| dateid|addr|phone|level|
+------+----+-------------------+----+-----+-----+
| 600|null|2017-11-15 14:50:05|null| null| 91|
| 40|null|2017-11-13 14:50:25|null| null| 20|
| 110|null|2017-11-14 14:50:47|null| null| 20|
| 1|null|2017-11-16 14:49:11|null| null| 10|
| 200|null|2017-11-14 14:49:47|null| null| 8|
| 400|null|2017-11-15 14:49:56|null| null| 4|
+------+----+-------------------+----+-----+-----+
scala> loginDF.orderBy(desc("level")).show ---逆序
scala> loginDF.orderBy(asc("level")).show --顺序
group by 返回值是一个org.apache.spark.sql.GroupedData数据类型。
grouby必须结合聚合函数使用才有效。
scala> mysqlDF.groupBy("level").count().show
+-----+-----+
|level|count|
+-----+-----+
| 91| 1|
| 20| 2|
| 4| 1|
| 8| 1|
| 10| 1|
+-----+-----+
columns 返回所有列明
scala> mysqlDF.columns
res150: Array[String] = Array(roleid, name, dateid, addr, phone, level)
集成查询:agg 返回DateFrame类型
scala> mysqlDF.groupBy("sex").agg(max("level"),min("level")).show
+---+----------+----------+
|sex|max(level)|min(level)|
+---+----------+----------+
| 1| 20| 10|
| 0| 91| 4|
+---+----------+----------+
scala> roleDF.cube("sex").max("level").show
+----+----------+
| sex|max(level)|
+----+----------+
| 1| 29|
|null| 91|
| 0| 91|
+----+----------+
scala> mysqlDF.groupBy("sex").agg(max("level"),min("level")).show
+---+----------+----------+
|sex|max(level)|min(level)|
+---+----------+----------+
| 1| 20| 10|
| 0| 91| 4|
+---+----------+----------+
scala> mysqlDF.agg(Map("level"->"Avg","roleid"->"Max")).show
+----------+-----------+
|avg(level)|max(roleid)|
+----------+-----------+
| 25.5| 600|
+----------+-----------+
---cube与groupby相比多了一个总的分组聚合
scala> mysqlDF.cube("sex").agg(max("level"),min("level")).show
+----+----------+----------+
| sex|max(level)|min(level)|
+----+----------+----------+
| 1| 20| 10|
|null| 91| 4|
| 0| 91| 4|
+----+----------+----------+
scala> mysqlDF.cube("sex").agg(Map("Level"->"Min","level"->"Max")).show
+----+----------+----------+
| sex|min(Level)|max(level)|
+----+----------+----------+
| 1| 10| 20|
|null| 4| 91|
| 0| 4| 91|
+----+----------+----------+
withColumnRenamed:修改列明
scala> mysqlDF.cube("sex").agg(Map("Level"->"Min","level"->"Max")).withColumnRenamed("min(level)","minlevel").show
+----+--------+----------+
| sex|minlevel|max(level)|
+----+--------+----------+
| 1| 10| 20|
|null| 4| 91|
| 0| 4| 91|
+----+--------+----------+
scala> mysqlDF.cube("sex").agg(max("level") as "max",min("level") as "min").show
+----+---+---+
| sex|max|min|
+----+---+---+
| 1| 29| 10|
|null| 91| 4|
| 0| 91| 4|
+----+---+---+
explode 返回值是dataframe类型
将addr 按空格拆分后放在addrs中,拆分成多行
scala> mysqlDF.explode("addr","addrs"){addr:String=>addr.split(" ")}.show
warning: there was one deprecation warning; re-run with -deprecation for details
+------+----+-------------------+-------------------+---+-----+---------+
|roleid|name| dateid| addr|sex|level| addrs|
+------+----+-------------------+-------------------+---+-----+---------+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10| henan|
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10| luohe|
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10| linying|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|guangdong|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20| shenzhen|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20| beijing|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8| shandong|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8| qingdao|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4| anhui|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4| hefei|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91| hunan|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91| changsha|
+------+----+-------------------+-------------------+---+-----+---------+
scala> mysqlDF.printSchema
root
|-- roleid: integer (nullable = true)
|-- name: string (nullable = true)
|-- dateid: timestamp (nullable = true)
|-- addr: string (nullable = true)
|-- sex: integer (nullable = true)
|-- level: integer (nullable = true)
explain 打印执行计划
scala> mysqlDF.explain
== Physical Plan ==
*Scan JDBCRelation(role) [numPartitions=1] [roleid#1278,name#1279,dateid#1280,addr#1281,sex#1282,level#1283] ReadSchema: struct<roleid:int,name:string,dateid:timestamp,addr:string,sex:int,level:int>
删除所有列的空值和NaN
scala> mysqlDF.na.drop().show
+------+----+------+----+---+-----+
|roleid|name|dateid|addr|sex|level|
+------+----+------+----+---+-----+
+------+----+------+----+---+-----+
scala> mysqlDF.select("roleid","level").na.drop().show
+------+-----+
|roleid|level|
+------+-----+
| 1| 10|
| 40| 20|
| 110| 20|
| 200| 8|
| 400| 4|
| 600| 91|
+------+-----+
填充所有列的空值
scala> mysqlDF.na.fill("noname").show
+------+------+-------------------+-------------------+---+-----+
|roleid| name| dateid| addr|sex|level|
+------+------+-------------------+-------------------+---+-----+
| 1|noname|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|noname|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|noname|2017-11-14 14:50:47| beijing| 1| 20|
| 200|noname|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|noname|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|noname|2017-11-15 14:50:05| hunan changsha| 0| 91|
+------+------+-------------------+-------------------+---+-----+
scala> mysqlDF.show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 688|null| null| shanxi xiaan| 0| 55|
+------+----+-------------------+-------------------+---+-----+
scala> mysqlDF.na.drop(Array("addr","dateid")).show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
+------+----+-------------------+-------------------+---+-----+
指定列空值填充
scala> mysqlDF.na.fill("noname",Array("name")).show
+------+------+-------------------+-------------------+---+-----+
|roleid| name| dateid| addr|sex|level|
+------+------+-------------------+-------------------+---+-----+
| 1|noname|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|noname|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|noname|2017-11-14 14:50:47| beijing| 1| 20|
| 200|noname|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|noname|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|noname|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|noname|2017-11-01 17:24:34| null| 1| 29|
| 688|noname| null| shanxi xiaan| 0| 55|
+------+------+-------------------+-------------------+---+-----+
scala> mysqlDF.na.fill(Map("name"->"noname")).show
+------+------+-------------------+-------------------+---+-----+
|roleid| name| dateid| addr|sex|level|
+------+------+-------------------+-------------------+---+-----+
| 1|noname|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|noname|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|noname|2017-11-14 14:50:47| beijing| 1| 20|
| 200|noname|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|noname|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|noname|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|noname|2017-11-01 17:24:34| null| 1| 29|
| 688|noname| null| shanxi xiaan| 0| 55|
+------+------+-------------------+-------------------+---+-----+
两个DF的join
scala> roleDF.show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 688|null| null| shanxi xiaan| 0| 55|
+------+----+-------------------+-------------------+---+-----+
scala> loginDF.show
+------+-------------------+-------+-----+
|roleid| logindate|loginip|level|
+------+-------------------+-------+-----+
| 1|2017-11-16 14:49:11| null| 100|
| 110|2017-11-15 14:49:27| null| 20|
| 200|2017-11-14 14:49:47| null| 50|
| 400|2017-11-15 14:49:56| null| 30|
| 600|2017-11-15 14:50:05| null| 20|
| 40|2017-11-13 14:50:25| null| 11|
| 110|2017-11-14 14:50:47| null| 1|
| 40|2017-11-14 14:51:03| null| 40|
| 110|2017-11-16 14:51:20| null| 500|
+------+-------------------+-------+-----+
--多个字段关联的话可以使用Seq(下例中两个DF 关联的字段名须相同)
scala> loginDF.join(roleDF,Seq("roleid")).show
+------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
|roleid| logindate|loginip|level|name| dateid| addr|sex|level|
+------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
| 1|2017-11-16 14:49:11| null| 100|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|2017-11-13 14:50:25| null| 11|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 40|2017-11-14 14:51:03| null| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 400|2017-11-15 14:49:56| null| 30|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 200|2017-11-14 14:49:47| null| 50|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 110|2017-11-15 14:49:27| null| 20|null|2017-11-14 14:50:47| beijing| 1| 20|
| 110|2017-11-14 14:50:47| null| 1|null|2017-11-14 14:50:47| beijing| 1| 20|
| 110|2017-11-16 14:51:20| null| 500|null|2017-11-14 14:50:47| beijing| 1| 20|
| 600|2017-11-15 14:50:05| null| 20|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
两个DataFrame的join
操作有inner, outer, left_outer, right_outer, leftsemi
类型
scala> loginDF.join(roleDF,Seq("roleid"),"inner").show
+------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
|roleid| logindate|loginip|level|name| dateid| addr|sex|level|
+------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
| 1|2017-11-16 14:49:11| null| 100|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|2017-11-13 14:50:25| null| 11|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 40|2017-11-14 14:51:03| null| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 400|2017-11-15 14:49:56| null| 30|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 200|2017-11-14 14:49:47| null| 50|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 110|2017-11-15 14:49:27| null| 20|null|2017-11-14 14:50:47| beijing| 1| 20|
| 110|2017-11-14 14:50:47| null| 1|null|2017-11-14 14:50:47| beijing| 1| 20|
| 110|2017-11-16 14:51:20| null| 500|null|2017-11-14 14:50:47| beijing| 1| 20|
| 600|2017-11-15 14:50:05| null| 20|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
+------+-------------------+-------+-----+----+-------------------+-------------------+---+-----+
如果关联的字段名不同可以使用Column
类型来join
scala> loginDF.join(roleDF,loginDF("roleid")===roleDF("roleid"),"inner").show
+------+-------------------+-------+-----+------+----+-------------------+-------------------+---+-----+
|roleid| logindate|loginip|level|roleid|name| dateid| addr|sex|level|
+------+-------------------+-------+-----+------+----+-------------------+-------------------+---+-----+
| 1|2017-11-16 14:49:11| null| 100| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|2017-11-13 14:50:25| null| 11| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 40|2017-11-14 14:51:03| null| 40| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 400|2017-11-15 14:49:56| null| 30| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 200|2017-11-14 14:49:47| null| 50| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 110|2017-11-15 14:49:27| null| 20| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 110|2017-11-14 14:50:47| null| 1| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 110|2017-11-16 14:51:20| null| 500| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 600|2017-11-15 14:50:05| null| 20| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
+------+-------------------+-------+-----+------+----+-------------------+-------------------+---+-----+
withColumn
:往当前DataFrame中新增一列
scala> loginDF.withColumn("levelnew",loginDF("level")+100).show
+------+-------------------+-------+-----+--------+
|roleid| logindate|loginip|level|levelnew|
+------+-------------------+-------+-----+--------+
| 1|2017-11-16 14:49:11| null| 100| 200|
| 110|2017-11-15 14:49:27| null| 20| 120|
| 200|2017-11-14 14:49:47| null| 50| 150|
| 400|2017-11-15 14:49:56| null| 30| 130|
| 600|2017-11-15 14:50:05| null| 20| 120|
| 40|2017-11-13 14:50:25| null| 11| 111|
| 110|2017-11-14 14:50:47| null| 1| 101|
| 40|2017-11-14 14:51:03| null| 40| 140|
| 110|2017-11-16 14:51:20| null| 500| 600|
+------+-------------------+-------+-----+--------+
intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的
scala> roleDF.registerTempTable("tb_name")
scala> sqlcon.sql("select roleid,sex,level from tb_name where sex=0").show
+------+---+-----+
|roleid|sex|level|
+------+---+-----+
| 200| 0| 8|
| 400| 0| 4|
| 600| 0| 91|
| 688| 0| 55|
+------+---+-----+
如果第一个字段为int获取该字段的值 则使用getInt(0)
scala> roleDF.map(row=>row.getInt(0)).show
+-----+
|value|
+-----+
| 1|
| 40|
| 110|
| 200|
| 400|
| 600|
| 650|
| 688|
+-----+
如果第一个字段是int 采用getString获取就会报错
scala> roleDF.map(row=>row.getString(0)).show
17/11/17 14:55:13 ERROR executor.Executor: Exception in task 0.0 in stage 126.0 (TID 2036)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:255)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:165)
........................................
读取json 和hive 的数据
scala> import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive._
scala> val jsonDF=hivecon.jsonFile("/tmp/20171024/namejson.txt")
warning: there was one deprecation warning; re-run with -deprecation for details
jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
scala> jsonDF.registerTempTable("rolename")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> hivecon.sql("select * from rolename where age>18 or name like 'm%'").show
+---+---+-----+
|age| id| name|
+---+---+-----+
| 19| 2| jack|
| 17| 3|marry|
+---+---+-----+
scala> hivecon.sql("select * from rolename where age>18 or name like 'm%'").printSchema
root
|-- age: long (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
scala> hivecon.sql("show databases")
res78: org.apache.spark.sql.DataFrame = [databaseName: string]
scala> hivecon.sql("show databases").show
+------------+
|databaseName|
+------------+
| aaa|
| default|
| sparkhive|
| userdb|
+------------+
在Scala 中基于case class 创建DateFrame
scala> case class person(name:String,age:Int,grade:Int)
defined class person
scala> val personRDD=sc.parallelize(List(person("tianyongtao",10,8),person("xiaomage",30,5),person("xidada",50,3)))
personRDD: org.apache.spark.rdd.RDD[person] = ParallelCollectionRDD[169] at parallelize at <console>:31
scala> personRDD.collect
res89: Array[person] = Array(person(tianyongtao,10,8), person(xiaomage,30,5), person(xidada,50,3))
scala> val personDF=personRDD.toDF()
personDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> personDF.show
+-----------+---+-----+
| name|age|grade|
+-----------+---+-----+
|tianyongtao| 10| 8|
| xiaomage| 30| 5|
| xidada| 50| 3|
+-----------+---+-----+
scala> personDF.registerTempTable("person")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> hivecon.sql("select * from person").show
+-----------+---+-----+
| name|age|grade|
+-----------+---+-----+
|tianyongtao| 10| 8|
| xiaomage| 30| 5|
| xidada| 50| 3|
+-----------+---+-----+
spark UDF
用Scala编写的UDF与普通的Scala函数没有任何区别,唯一需要多执行的一个步骤是要让SQLContext注册它
scala> val sqlcon=new SQLContext(sc)
scala> def len110(str:String):Int=str.length
len110: (str: String)Int
scala> sqlcon.udf.register("len119",len110 _)
res6: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))
scala> sqlcon.sql("select len119('aaag')").show
+----------------+
|UDF:len119(aaag)|
+----------------+
| 4|
+----------------+