zoukankan      html  css  js  c++  java
  • spark 系列之三 RDD,DataFrame与DataSet之间的转化

    本篇介绍 RDD,DataFrame与DataSet之间的转化

    在Object中构建 SparkSession

    object SparkRDD_DF {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder()
          .appName("DatasetAPP")
          .master("local")
          .getOrCreate()
    
        val rd = new Rdd2DF(sparkSession)
        rd.RDD2DF_1()
        rd.RDD2DF_2()
      }
    }

    第一种方式:

    这种方式适合你再体验已经知道了数据的字段类型和字段名称的情况下。

    需要先提前构建一个 Info的 case class

    case class Info(id: Int, name: String, age: Int)

    然后

      def RDD2DF_1():Unit ={
        //RDD ==> DataFrame 方式一
        val sparkSession:SparkSession = this.sparkSession
        import sparkSession.implicits._
        sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
          .map(_.split(","))
          .map(line => Info(line(0).toInt, line(1), line(2).toInt))
          .toDF()
          .show()
      }

    第二种方式:

    适合你提前不知道数据的字段,以及字段类型

      def RDD2DF_2():Unit ={
        //RDD ==> DataFrame 方式二
        val peopleRDD = sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
          .map(_.split(","))
          .map(line => Row(line(0).toInt, line(1), line(2).toInt))
    
    
        val structType =  StructType(Array(StructField("id", IntegerType, true),
          StructField("name", StringType, true),
          StructField("age", IntegerType, true)))
    
        sparkSession.createDataFrame(peopleRDD, structType).show()
      }

    DataFrame 与DataSet之间的转化

    object SparkDataset {
      case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SparkDataset").master("local").getOrCreate()
        //csv文件路径
        val path = "file:///D:/software_download/spark_text/sales.csv"
        //导入隐式转换
        import spark.implicits._
        //spark如何解析csv文件(外部数据源功能)
        val csv_dataframe = spark.read
                                 .option("header","true")
                                 .option("inferSchema","true").csv(path)
    
        //把csv_dataframe转换成Dataset
        //csv_dataframe.select("amountPaid","itemId").show()
    
        val csv_dataset = csv_dataframe.as[Sales]
        csv_dataset.map(line => (line.itemId,line.amountPaid)).show
        spark.stop()
      }
    }

    DataFrame和RDD 的简单使用,前面已经介绍过。

    我们知道 通过DataFrame 可以方便的使用一些算子,以及把DataFrame注册成临时表,方便我们使用sql语句进行查询,大大降低了学习的成本

    那么既生瑜,何生亮,既然有了DataFrame,那我们还要DataSet干嘛呢?

    下一遍,我们深入的讲一下DataSet的出现的意义,以及使用场景:)

  • 相关阅读:
    autocomplete
    ORM组件 ELinq (一)首航之旅
    ORM组件 ELinq 系列
    Jet 驱动对CRUD的支持
    ORM组件 ELinq 更新日志
    年度开源力作ORM组件 ELinq诞生了
    Excel 连接字符串详解
    国内开源ORM组件 ELinq正式版发布
    Firebird 问题总结
    ORM组件 ELinq (二) 映射配置之Table
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14203026.html
Copyright © 2011-2022 走看看