zoukankan      html  css  js  c++  java
  • spark 系列之三 RDD,DataFrame与DataSet之间的转化

    本篇介绍 RDD,DataFrame与DataSet之间的转化

    在Object中构建 SparkSession

    object SparkRDD_DF {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder()
          .appName("DatasetAPP")
          .master("local")
          .getOrCreate()
    
        val rd = new Rdd2DF(sparkSession)
        rd.RDD2DF_1()
        rd.RDD2DF_2()
      }
    }

    第一种方式:

    这种方式适合你再体验已经知道了数据的字段类型和字段名称的情况下。

    需要先提前构建一个 Info的 case class

    case class Info(id: Int, name: String, age: Int)

    然后

      def RDD2DF_1():Unit ={
        //RDD ==> DataFrame 方式一
        val sparkSession:SparkSession = this.sparkSession
        import sparkSession.implicits._
        sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
          .map(_.split(","))
          .map(line => Info(line(0).toInt, line(1), line(2).toInt))
          .toDF()
          .show()
      }

    第二种方式:

    适合你提前不知道数据的字段,以及字段类型

      def RDD2DF_2():Unit ={
        //RDD ==> DataFrame 方式二
        val peopleRDD = sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
          .map(_.split(","))
          .map(line => Row(line(0).toInt, line(1), line(2).toInt))
    
    
        val structType =  StructType(Array(StructField("id", IntegerType, true),
          StructField("name", StringType, true),
          StructField("age", IntegerType, true)))
    
        sparkSession.createDataFrame(peopleRDD, structType).show()
      }

    DataFrame 与DataSet之间的转化

    object SparkDataset {
      case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SparkDataset").master("local").getOrCreate()
        //csv文件路径
        val path = "file:///D:/software_download/spark_text/sales.csv"
        //导入隐式转换
        import spark.implicits._
        //spark如何解析csv文件(外部数据源功能)
        val csv_dataframe = spark.read
                                 .option("header","true")
                                 .option("inferSchema","true").csv(path)
    
        //把csv_dataframe转换成Dataset
        //csv_dataframe.select("amountPaid","itemId").show()
    
        val csv_dataset = csv_dataframe.as[Sales]
        csv_dataset.map(line => (line.itemId,line.amountPaid)).show
        spark.stop()
      }
    }

    DataFrame和RDD 的简单使用,前面已经介绍过。

    我们知道 通过DataFrame 可以方便的使用一些算子,以及把DataFrame注册成临时表,方便我们使用sql语句进行查询,大大降低了学习的成本

    那么既生瑜,何生亮,既然有了DataFrame,那我们还要DataSet干嘛呢?

    下一遍,我们深入的讲一下DataSet的出现的意义,以及使用场景:)

  • 相关阅读:
    ZOJ 2588 Burning Bridges
    POJ 1966 ZOJ 2182 Cable TV Network
    HDU 5348 MZL's endless loop
    HDU 5352 MZL's City
    Tarjan算法求解无向连通图的割点、割边、点双连通分量和边双连通分量的模板
    ZOJ 1119 SPF
    HDU 3452 Bonsai
    HDU 1520 Anniversary party
    POJ 2239 Selecting Courses
    POJ 1144 Network
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14203026.html
Copyright © 2011-2022 走看看