入门与使用参考这一片文档即可:
https://www.cnblogs.com/takemybreathaway/articles/10172339.html
方法(sql使我们定义的sql = new SQLContext(sc)) df是一个DataFrame对象 | 实例说明 |
sql.read.table(tableName) | 读取一张表的数据 |
df.where(), df.filter() |
过滤条件,相当于sql的where部分; 用法:选择出年龄字段中年龄大于20的字段。 返回值类型:DataFrame df.where("age >= 20"),df.filter("age >= 20") |
df.limit() |
限制输出的行数,对应于sql的limit 用法:限制输出一百行 返回值类型:DataFrame df.limit(100) |
df.join() |
链接操作,相当于sql的join 对于join操作,下面会单独进行介绍 |
df.groupBy() |
聚合操作,相当于sql的groupBy 用法:对于某几行进行聚合 返回值类型:DataFrame df.groupBy("id") |
df.agg() | 求聚合用的相关函数,下面会详细介绍 |
df.intersect(other:DataFrame) | 求两个DataFrame的交集 |
df.except(other:DataFrame) | 求在df中而不在other中的行 |
df.withColumn(colName:String,col:Column) |
增加一列 |
df.withColumnRenamed(exName,newName) | 对某一列的名字进行重新命名 |
df.map(), df.flatMap, df.mapPartitions(), df.foreach() df.foreachPartition() df.collect() df.collectAsList() df.repartition() df.distinct() df.count() |
这些方法都是spark的RDD的基本操作,其中在DataFrame类中也封装了这些方法,需要注意的是这些方法的返回值是RDD类型的,不是DataFrame类型的,在这些方法的使用上,一定要记清楚返回值类型,不然就容易出现错误 |
df.select() |
选取某几列元素,这个方法相当于sql的select的功能 用法:返回选择的某几列数据 返回值类型:DataFrame df.select("id","name") |
以上是两个都是一写基本的方法,下面就详细介绍一下join和agg,na,udf操作
sparkSQL的agg操作
其中sparkSQL的agg是sparkSQL聚合操作的一种表达式,当我们调用agg时,其一般情况下都是和groupBy()的一起使用的,选择操作的数据表为:
1
2
3
4
|
val pSalar = new SQLContext(sc).read.json( "salary.txt" ) val group = pSalar.groupBy( "name" ).agg( "salary" -> "avg" ) val group 2 = pSalar.groupBy( "id" , "name" ).agg( "salary" -> "avg" ) val group 3 = pSalar.groupBy( "name" ).agg(Map( "id" -> "avg" , "salary" -> "max" )) |
得到的结过如下:
group的结果 group2 group3
使用agg时需要注意的是,同一个字段不能进行两次操作比如:agg(Map("salary" -> "avg","salary" -> "max"),他只会计算max的操作,原因很简单,agg接入的参数是Map类型的key-value对,当key相同时,会覆盖掉之前的value。同时还可以直接使用agg,这样是对所有的行而言的。聚合所用的计算参数有:avg,max,min,sum,count,而不是只有例子中用到的avg