本篇介绍 RDD,DataFrame与DataSet之间的转化
在Object中构建 SparkSession
object SparkRDD_DF { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder() .appName("DatasetAPP") .master("local") .getOrCreate() val rd = new Rdd2DF(sparkSession) rd.RDD2DF_1() rd.RDD2DF_2() } }
第一种方式:
这种方式适合你再体验已经知道了数据的字段类型和字段名称的情况下。
需要先提前构建一个 Info的 case class
case class Info(id: Int, name: String, age: Int)
然后
def RDD2DF_1():Unit ={ //RDD ==> DataFrame 方式一 val sparkSession:SparkSession = this.sparkSession import sparkSession.implicits._ sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt") .map(_.split(",")) .map(line => Info(line(0).toInt, line(1), line(2).toInt)) .toDF() .show() }
第二种方式:
适合你提前不知道数据的字段,以及字段类型
def RDD2DF_2():Unit ={ //RDD ==> DataFrame 方式二 val peopleRDD = sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt") .map(_.split(",")) .map(line => Row(line(0).toInt, line(1), line(2).toInt)) val structType = StructType(Array(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true))) sparkSession.createDataFrame(peopleRDD, structType).show() }
DataFrame 与DataSet之间的转化
object SparkDataset { case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double) def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SparkDataset").master("local").getOrCreate() //csv文件路径 val path = "file:///D:/software_download/spark_text/sales.csv" //导入隐式转换 import spark.implicits._ //spark如何解析csv文件(外部数据源功能) val csv_dataframe = spark.read .option("header","true") .option("inferSchema","true").csv(path) //把csv_dataframe转换成Dataset //csv_dataframe.select("amountPaid","itemId").show() val csv_dataset = csv_dataframe.as[Sales] csv_dataset.map(line => (line.itemId,line.amountPaid)).show spark.stop() } }
DataFrame和RDD 的简单使用,前面已经介绍过。
我们知道 通过DataFrame 可以方便的使用一些算子,以及把DataFrame注册成临时表,方便我们使用sql语句进行查询,大大降低了学习的成本
那么既生瑜,何生亮,既然有了DataFrame,那我们还要DataSet干嘛呢?
下一遍,我们深入的讲一下DataSet的出现的意义,以及使用场景:)