本篇主要是Dataframe的相关使用
RDD在我们的使用过程,每一行存储的是一个RDD对象。
RDD中有很多算子,可以供我们使用。比如最简单的wordcount,我们只需要简单的三个算子就可以完成hadoop写若干行代码才能完成的事,开发效率大大提升。
我们上一讲提到的算子有map,reduceByKey,flatMap,groupByKey,mapValues,sortBy,sortByKey
但是把数据的一行作为一个对象(黑盒),总觉得不够灵活,于是又引进了DataFrame,这个跟python pandas 里面的DataFrame对象有点类似。
先说明RDD和DataFrame在构建上的区别
上代码:
/**
* 创建sparksession 对象
*/
val sparkSession = SparkSession.builder()
.master("local")
.appName("wordCount")
.getOrCreate()
/**
* 引入隐式转换
*/
import sparkSession.implicits._
val inputFile = "file:///D:/software_download/spark_text/word.txt"
/**
* 这种方式是把文件先读成DataFrame 然后转化成 Dataset,再转化成RDD,再使用RDD的算子
*/
sparkSession.read.option("charset","UTF-8")
.text(inputFile)
.as[String]
.rdd
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
.foreach(println)
/**
* 这种方式是把文件直接读成RDD,然后使用RDD的算子。
*/
sparkSession.sparkContext.textFile(inputFile)
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
.foreach(println)
上面代码中as[String]的操作是把dataframe 转换成了 dataset ,具体dataFrame和dataset有什么区别,可以先看下这篇文章。https://zhuanlan.zhihu.com/p/29830732
下面是DataFrame的一些常用操作。
/**
* 这种方式是把文件直接读成RDD,然后使用RDD的算子。
*/
sparkSession.sparkContext.textFile(inputFile)
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
.foreach(println)
/**
* 下面来读取一个csv文件,实践一些DataFrame的常规操作
*/
//第一种方式
val inputcsvFile="file:///D:/software_download/meta-nlp-competitor-car-model-mentioned-20201202.csv"
val df = sparkSession.read.format("csv")
.option("header","true")
.option("charset","UTF-8")
.load(inputcsvFile)
//第二种方式
val df1 = sparkSession.read
.option("header","true")
.option("charset","UTF-8")
.csv(inputcsvFile)
df.printSchema()
// 像写sql一样读取 brandName,modelID,modelName,mainBrandName,haveOnSale 字段,并把字段brandName 改名为brandName_bn
val dataFrame = df.select(df("brandName") as ("brandName_bn"), df("modelID"), df("modelName"), df("mainBrandName"), df("haveOnSale"))
dataFrame.show(5)
/**
*把df注册成car_model表,使用sql语句查询,实现与上面相同的功能
*/
df.createOrReplaceTempView("car_model")
val sql_df = sparkSession.sql(
"""select
|brandName as brandName_bn,modelID,modelName,mainBrandName,haveOnSale
|from car_model
|""".stripMargin)
sql_df.show(5)
/**
* dataframe 的一些常规操作
*/
dataFrame.filter(df("brandName").contains("汽车") && df("modelID")>5000).show()
dataFrame.filter(df("brandName").contains("汽车") and df("modelID")>5000).sort(df("modelID").desc).show()
dataFrame.filter(df("brandName").contains("汽车")).filter(df("modelID")>5000).show()
/**
* 更改字段类型,由于读取csv的默认的类型都是String类型,但是有时候我们需要转换成其它类型进行操作
* 比如我们把modelID类型转换成long型
*/
dataFrame.printSchema()
val dataFrame_change = dataFrame.withColumn("modelID", col("modelID").cast(LongType))
dataFrame_change.printSchema()
上一篇的问题解答:
val rdd = sparkSession.sparkContext.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
/**
* x._1 表示value中的第一个值,v._2 表示value中的第二个值
*/
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).foreach(println)
下一讲:讲DataFrame与RDD之间的一些转化操作